Back to Blog
6 min read

Real-Time Sync Patterns in Microsoft Fabric

Real-time data synchronization is critical for modern analytics. Microsoft Fabric offers multiple patterns for achieving different levels of “real-time” - from near real-time mirroring to true streaming analytics. Let’s explore the options.

The Real-Time Spectrum

Latency Requirement          Pattern                    Fabric Feature
─────────────────────────────────────────────────────────────────────────
  Seconds          ──►   Event Streaming      ──►   Eventstream + KQL DB
                                                    Real-Time Intelligence

  Minutes          ──►   CDC/Mirroring        ──►   Database Mirroring
                                                    Data Factory CDC

  Hours            ──►   Batch ETL            ──►   Data Factory Pipelines
                                                    Dataflows Gen2

Pattern 1: True Streaming with Eventstream

For sub-second latency:

# Eventstream configuration for IoT data
# Use Fabric REST APIs for Eventstream management
import requests
from azure.identity import DefaultAzureCredential

class StreamingPipeline:
    def __init__(self, workspace_id: str):
        self.workspace_id = workspace_id
        self.credential = DefaultAzureCredential()
        self.base_url = "https://api.fabric.microsoft.com/v1"

    def _get_headers(self):
        token = self.credential.get_token("https://api.fabric.microsoft.com/.default")
        return {"Authorization": f"Bearer {token.token}", "Content-Type": "application/json"}

    def create_iot_stream(self):
        """Create streaming pipeline for IoT data via REST API."""

        # Create Eventstream via Fabric REST API
        eventstream_payload = {
            "displayName": "IoT-Telemetry-Stream",
            "description": "Streaming pipeline for IoT telemetry"
        }

        response = requests.post(
            f"{self.base_url}/workspaces/{self.workspace_id}/eventstreams",
            headers=self._get_headers(),
            json=eventstream_payload
        )

        eventstream_id = response.json().get("id")

        # Configure source and destinations in Fabric portal
        # or use additional REST API calls for sources/destinations
        # Note: Full programmatic Eventstream configuration may require
        # the Fabric portal or ARM templates for complex scenarios

        return {
            "eventstream_id": eventstream_id,
            "status": "created",
            "next_steps": [
                "Configure Event Hubs source in portal",
                "Add KQL Database destination",
                "Add Lakehouse destination"
            ]
        }

# The data flow:
# Event Hubs → Eventstream → KQL DB (real-time queries)
#                          → Lakehouse (historical analysis)

Querying Real-Time Data

// KQL query for real-time dashboard
Telemetry
| where timestamp > ago(5m)
| summarize
    avg_temp = avg(temperature),
    max_temp = max(temperature),
    reading_count = count()
    by deviceId, bin(timestamp, 1m)
| order by timestamp desc

Pattern 2: CDC with Data Factory

For minute-level latency with relational sources:

# Data Factory CDC pipeline
class CDCPipeline:
    def __init__(self, workspace_id: str):
        self.workspace_id = workspace_id

    def create_cdc_pipeline(
        self,
        source_connection: str,
        source_tables: list[str],
        target_lakehouse: str
    ):
        """Create CDC-based near real-time sync."""

        pipeline = {
            "name": "CDC-Sales-Pipeline",
            "properties": {
                "activities": [
                    {
                        "name": "CDC_Capture",
                        "type": "Copy",
                        "inputs": [{
                            "referenceName": "SqlCDCSource",
                            "type": "DatasetReference",
                            "parameters": {
                                "tableName": {"type": "Expression", "value": "@item()"}
                            }
                        }],
                        "outputs": [{
                            "referenceName": "LakehouseDelta",
                            "type": "DatasetReference",
                            "parameters": {
                                "tableName": {"type": "Expression", "value": "@item()"}
                            }
                        }],
                        "typeProperties": {
                            "source": {
                                "type": "SqlSource",
                                "sqlReaderQuery": {
                                    "type": "Expression",
                                    "value": "@concat('SELECT * FROM cdc.fn_cdc_get_all_changes_dbo_', item(), '(@{pipeline().parameters.LastLSN}, @{activity(''GetCurrentLSN'').output.firstRow.CurrentLSN}, ''all'')')"
                                }
                            },
                            "sink": {
                                "type": "DeltaSink",
                                "writeBehavior": "merge",
                                "mergeSchema": True
                            },
                            "enableStaging": False
                        }
                    }
                ],
                "parameters": {
                    "LastLSN": {"type": "string"},
                    "Tables": {"type": "array", "defaultValue": source_tables}
                }
            }
        }

        return pipeline

Setting Up CDC in SQL Server

-- Enable CDC on database
EXEC sys.sp_cdc_enable_db;

-- Enable CDC on tables
EXEC sys.sp_cdc_enable_table
    @source_schema = N'dbo',
    @source_name = N'Orders',
    @role_name = NULL,
    @supports_net_changes = 1;

EXEC sys.sp_cdc_enable_table
    @source_schema = N'dbo',
    @source_name = N'Customers',
    @role_name = NULL,
    @supports_net_changes = 1;

-- Verify CDC is enabled
SELECT name, is_cdc_enabled
FROM sys.databases
WHERE name = DB_NAME();

SELECT *
FROM sys.tables
WHERE is_tracked_by_cdc = 1;

Pattern 3: Hybrid Approach

Combine streaming and batch for different data types:

class HybridSyncArchitecture:
    """
    Architecture:
    - Streaming: Event-driven data (clicks, IoT, transactions)
    - Mirroring: Slowly changing dimensions (customers, products)
    - Batch: Large historical loads, complex transformations
    """

    def __init__(self, workspace_id: str):
        self.workspace_id = workspace_id

    def configure_streaming_tier(self):
        """Configure real-time streaming for hot data."""
        return {
            "name": "Streaming_Tier",
            "sources": ["Event Hubs", "Kafka"],
            "processing": "Eventstream",
            "destinations": ["KQL Database", "Lakehouse (append)"],
            "latency": "seconds",
            "use_cases": ["Real-time dashboards", "Alerting", "Live metrics"]
        }

    def configure_mirror_tier(self):
        """Configure mirroring for warm data."""
        return {
            "name": "Mirror_Tier",
            "sources": ["Azure SQL", "Cosmos DB"],
            "processing": "Database Mirroring",
            "destinations": ["Mirrored Database (OneLake)"],
            "latency": "minutes",
            "use_cases": ["Operational reporting", "Customer 360", "Inventory"]
        }

    def configure_batch_tier(self):
        """Configure batch for cold data."""
        return {
            "name": "Batch_Tier",
            "sources": ["Data Lake", "External APIs", "Files"],
            "processing": "Data Factory Pipelines",
            "destinations": ["Lakehouse"],
            "latency": "hours",
            "use_cases": ["Historical analysis", "ML training", "Compliance"]
        }

    def get_recommended_tier(self, data_characteristics: dict) -> str:
        """Recommend sync tier based on data characteristics."""

        latency_needed = data_characteristics.get("latency_seconds", 3600)
        volume_per_second = data_characteristics.get("events_per_second", 0)
        change_frequency = data_characteristics.get("change_frequency", "low")

        if latency_needed < 60 or volume_per_second > 100:
            return "Streaming_Tier"
        elif change_frequency in ["high", "medium"] and latency_needed < 600:
            return "Mirror_Tier"
        else:
            return "Batch_Tier"

Pattern 4: Fan-Out Architecture

Distribute data to multiple destinations:

class FanOutSync:
    """
    Single source, multiple destinations with different latency requirements.
    """

    def create_fan_out_pipeline(self):
        """
        Source: Order Events from Event Hubs

        Destinations:
        1. KQL DB - Real-time fraud detection (seconds)
        2. Lakehouse - Analytics (minutes)
        3. Cosmos DB - Order tracking API (seconds)
        4. Power BI - Dashboard refresh (minutes)
        """

        pipeline_config = {
            "source": {
                "type": "EventHubs",
                "topic": "orders"
            },
            "eventstream": {
                "name": "Orders-FanOut",
                "transformations": [
                    {"name": "parse", "type": "ParseJson"},
                    {"name": "enrich", "type": "Lookup", "reference": "Customers"}
                ]
            },
            "destinations": [
                {
                    "name": "fraud_detection",
                    "type": "KQLDatabase",
                    "table": "OrdersRealtime",
                    "purpose": "Real-time fraud rules"
                },
                {
                    "name": "analytics",
                    "type": "Lakehouse",
                    "table": "orders",
                    "purpose": "Historical analysis",
                    "batch_interval": "1 minute"
                },
                {
                    "name": "api_store",
                    "type": "CosmosDB",
                    "container": "orders",
                    "purpose": "Order tracking API"
                },
                {
                    "name": "reporting",
                    "type": "DirectLake",
                    "semantic_model": "Sales",
                    "purpose": "Executive dashboard"
                }
            ]
        }

        return pipeline_config

Monitoring Real-Time Pipelines

class RealtimeMonitor:
    def __init__(self):
        self.metrics = {
            "throughput": [],
            "latency": [],
            "errors": []
        }

    def monitor_eventstream(self, eventstream_id: str):
        """Monitor Eventstream health."""

        # Get metrics from Fabric API
        metrics = self.get_eventstream_metrics(eventstream_id)

        alerts = []

        # Check throughput
        if metrics["events_per_second"] < metrics["expected_eps"] * 0.5:
            alerts.append({
                "severity": "warning",
                "message": f"Throughput below 50% of expected",
                "current": metrics["events_per_second"],
                "expected": metrics["expected_eps"]
            })

        # Check latency
        if metrics["processing_latency_ms"] > 5000:
            alerts.append({
                "severity": "critical",
                "message": f"Processing latency > 5 seconds",
                "current_ms": metrics["processing_latency_ms"]
            })

        # Check error rate
        error_rate = metrics["errors"] / max(metrics["events_processed"], 1)
        if error_rate > 0.01:
            alerts.append({
                "severity": "critical",
                "message": f"Error rate > 1%",
                "rate": error_rate
            })

        return alerts

    def get_eventstream_metrics(self, eventstream_id: str) -> dict:
        # Placeholder for actual API call
        return {
            "events_per_second": 1000,
            "expected_eps": 1200,
            "processing_latency_ms": 250,
            "events_processed": 100000,
            "errors": 5
        }

Choosing the Right Pattern

RequirementPatternFabric Feature
< 1 second latencyStreamingEventstream + KQL DB
1-5 minutesCDC/MirroringDatabase Mirroring
Change trackingCDCData Factory CDC
Schema evolutionMirroringBuilt-in handling
High volume eventsStreamingEventstream
Complex transformsBatch/StreamingDataflows/Spark

Conclusion

Real-time sync in Fabric isn’t one-size-fits-all. Choose streaming for true real-time needs, mirroring for operational data, and batch for historical loads.

The key is understanding your latency requirements and matching them to the appropriate pattern. Often, a hybrid approach serves organizations best.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.