Back to Blog
3 min read

Real-Time Data Streaming with Microsoft Fabric Eventstreams

Real-time data processing enables immediate insights and rapid response to business events. Microsoft Fabric Eventstreams provides a managed streaming platform that integrates seamlessly with the broader Fabric ecosystem.

Why Real-Time Matters

Batch processing introduces latency between data generation and insight availability. For fraud detection, operational monitoring, and customer engagement, this delay can mean missed opportunities or increased risk.

Setting Up Eventstreams

Create a streaming pipeline that processes events as they arrive:

from azure.eventhub import EventHubProducerClient, EventData
from azure.identity import DefaultAzureCredential
import json
from datetime import datetime

class EventstreamProducer:
    def __init__(self, eventhub_name: str, namespace: str):
        self.producer = EventHubProducerClient(
            fully_qualified_namespace=f"{namespace}.servicebus.windows.net",
            eventhub_name=eventhub_name,
            credential=DefaultAzureCredential()
        )

    async def send_events(self, events: list[dict]):
        """Send a batch of events to Eventstream."""
        async with self.producer:
            event_batch = await self.producer.create_batch()

            for event in events:
                event_data = EventData(json.dumps({
                    **event,
                    "timestamp": datetime.utcnow().isoformat(),
                    "source": "order-service"
                }))

                # Add properties for routing
                event_data.properties = {
                    "event_type": event.get("type", "unknown"),
                    "priority": event.get("priority", "normal")
                }

                try:
                    event_batch.add(event_data)
                except ValueError:
                    # Batch full, send and create new batch
                    await self.producer.send_batch(event_batch)
                    event_batch = await self.producer.create_batch()
                    event_batch.add(event_data)

            await self.producer.send_batch(event_batch)

# Usage
producer = EventstreamProducer("orders-stream", "fabric-namespace")
await producer.send_events([
    {"type": "order_placed", "order_id": "12345", "amount": 99.99},
    {"type": "order_shipped", "order_id": "12344", "carrier": "FedEx"}
])

Processing with KQL

Fabric’s Real-Time Analytics uses Kusto Query Language for stream processing:

// Create a materialized view for real-time aggregations
.create materialized-view OrderMetrics on table OrderEvents
{
    OrderEvents
    | summarize
        TotalOrders = count(),
        TotalRevenue = sum(Amount),
        AvgOrderValue = avg(Amount)
    by bin(Timestamp, 1h), Region
}

// Detect anomalies in real-time
OrderEvents
| where Timestamp > ago(5m)
| summarize OrderCount = count() by bin(Timestamp, 1m)
| extend ExpectedCount = 100  // baseline
| extend Anomaly = OrderCount > ExpectedCount * 2
                   or OrderCount < ExpectedCount * 0.5
| where Anomaly == true

// Join streams for enrichment
OrderEvents
| join kind=inner (CustomerDimension) on CustomerId
| project
    OrderId,
    CustomerName,
    CustomerSegment,
    Amount,
    Timestamp

Connecting to Lakehouses

Route processed events to Delta tables for persistence:

# Fabric notebook for stream-to-lakehouse ingestion
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

schema = StructType() \
    .add("order_id", StringType()) \
    .add("amount", DoubleType()) \
    .add("timestamp", TimestampType())

# Read from Eventstream
stream_df = spark.readStream \
    .format("kafka") \
    .option("subscribe", "orders-stream") \
    .load()

# Parse and write to lakehouse
parsed_df = stream_df \
    .select(from_json(col("value").cast("string"), schema).alias("data")) \
    .select("data.*")

parsed_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/orders") \
    .toTable("lakehouse.orders")

Real-time streaming with Fabric Eventstreams enables organizations to act on data immediately while maintaining the analytical capabilities of the broader lakehouse platform.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.