Back to Blog
7 min read

Real-Time Analytics in Fabric: Architecture Patterns and Implementation

Real-time analytics is becoming essential for modern data platforms. Microsoft Fabric’s Real-Time Intelligence provides a complete solution for streaming data. Let’s explore the architecture patterns and implementation details.

Real-Time Intelligence Components

┌─────────────┐     ┌──────────────┐     ┌─────────────┐
│   Sources   │────►│  Eventstream │────►│ KQL Database│
│ Event Hubs  │     │  Transform   │     │  Storage    │
│ Kafka       │     │  Route       │     │  Query      │
│ IoT Hub     │     │  Enrich      │     │  Analyze    │
└─────────────┘     └──────────────┘     └─────────────┘

                    ┌──────────────────────────┴─────────┐
                    ▼                                    ▼
              ┌──────────┐                        ┌──────────┐
              │ Lakehouse│                        │Real-Time │
              │ Archive  │                        │Dashboard │
              └──────────┘                        └──────────┘

Pattern 1: Simple Event Processing

For straightforward event ingestion and analysis:

# Create eventstream via Fabric REST API
from azure.identity import DefaultAzureCredential
import requests

credential = DefaultAzureCredential()
token = credential.get_token("https://api.fabric.microsoft.com/.default").token
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}

workspace_id = "your-workspace-id"
base_url = f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}"

# Create eventstream item
eventstream_payload = {
    "displayName": "iot_telemetry",
    "type": "Eventstream",
    "description": "IoT telemetry ingestion from Event Hub"
}

response = requests.post(f"{base_url}/items", headers=headers, json=eventstream_payload)
eventstream = response.json()
print(f"Created Eventstream: {eventstream.get('id')}")

# Note: Eventstream source (Event Hub) and destination (KQL Database) configuration
# is done through the Fabric portal visual designer:
# 1. Open the eventstream in Fabric
# 2. Add Azure Event Hub as source (configure namespace, event hub, consumer group)
# 3. Add KQL Database as destination (select database and table)
# 4. Configure field mapping in the visual editor
# 5. Publish the eventstream

Query the data in KQL:

// Real-time device metrics
raw_telemetry
| where timestamp > ago(1h)
| summarize
    avg_temp = avg(temperature),
    max_temp = max(temperature),
    min_temp = min(temperature),
    reading_count = count()
    by device_id, bin(timestamp, 5m)
| order by timestamp desc

// Detect anomalies
raw_telemetry
| where timestamp > ago(24h)
| summarize avg_temp = avg(temperature), stdev_temp = stdev(temperature) by device_id
| join kind=inner (
    raw_telemetry
    | where timestamp > ago(1h)
) on device_id
| where abs(temperature - avg_temp) > 3 * stdev_temp
| project timestamp, device_id, temperature, avg_temp, stdev_temp

Pattern 2: Stream Processing with Transformations

For scenarios requiring real-time transformations, use Spark Structured Streaming:

# Stream processing with PySpark in Fabric notebooks
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

spark = SparkSession.builder.getOrCreate()

# Define schema for POS events
pos_schema = StructType() \
    .add("store_id", StringType()) \
    .add("quantity", DoubleType()) \
    .add("unit_price", DoubleType()) \
    .add("event_time", TimestampType())

# Read from Kafka
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka-cluster:9092") \
    .option("subscribe", "point-of-sale") \
    .load()

# Parse and transform
parsed_df = kafka_df \
    .select(F.from_json(F.col("value").cast("string"), pos_schema).alias("data")) \
    .select("data.*") \
    .withColumn("total_amount", F.col("quantity") * F.col("unit_price")) \
    .withColumn("tax_amount", F.col("total_amount") * 0.1) \
    .withColumn("processed_time", F.current_timestamp())

# Add region lookup (broadcast join with reference data)
store_regions = spark.read.table("store_regions")
enriched_df = parsed_df.join(F.broadcast(store_regions), "store_id", "left")

# Write all transactions to Delta table
all_sales_query = enriched_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "Files/checkpoints/all_sales") \
    .toTable("all_transactions")

# Write high-value alerts to separate table
high_value_query = enriched_df \
    .filter(F.col("total_amount") > 1000) \
    .writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "Files/checkpoints/high_value") \
    .toTable("high_value_alerts")

Pattern 3: Windowed Aggregations

For time-based aggregations using Spark Structured Streaming:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, IntegerType, TimestampType

spark = SparkSession.builder.getOrCreate()

# Read clickstream data
clickstream_df = spark.readStream \
    .format("delta") \
    .table("raw_clickstream")

# Tumbling window aggregation (1-minute windows)
page_view_counts = clickstream_df \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        F.window("event_time", "1 minute"),
        "page_url",
        "user_segment"
    ) \
    .agg(
        F.count("*").alias("view_count"),
        F.countDistinct("user_id").alias("unique_users"),
        F.avg("time_on_page").alias("avg_time_on_page")
    ) \
    .select(
        F.col("window.start").alias("window_start"),
        F.col("window.end").alias("window_end"),
        "page_url", "user_segment",
        "view_count", "unique_users", "avg_time_on_page"
    )

# Sliding window for trends (5-minute window, 1-minute slide)
rolling_metrics = clickstream_df \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        F.window("event_time", "5 minutes", "1 minute"),
        "page_url"
    ) \
    .agg(
        F.count("*").alias("rolling_views"),
        F.countDistinct("user_id").alias("rolling_unique_users")
    )

# Write aggregations to Delta tables
page_views_query = page_view_counts.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "Files/checkpoints/page_views") \
    .toTable("page_view_aggregates")

Pattern 4: Real-Time Joins

Joining streaming data with reference data using Spark:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

# Read streaming order events
order_events = spark.readStream \
    .format("delta") \
    .table("raw_order_events")

# Load reference data (static or periodically refreshed)
# Use broadcast for small dimension tables
customer_info = spark.read.table("customers")
product_info = spark.read.table("products")

# Stream-static join with broadcast for small tables
enriched_orders = order_events \
    .join(
        F.broadcast(customer_info),
        order_events.customer_id == customer_info.customer_id,
        "left"
    ) \
    .join(
        F.broadcast(product_info),
        order_events.product_id == product_info.product_id,
        "left"
    ) \
    .select(
        order_events.order_id,
        order_events.customer_id,
        customer_info.customer_name,
        customer_info.segment,
        order_events.product_id,
        product_info.product_name,
        product_info.category,
        order_events.quantity,
        order_events.amount,
        order_events.order_time
    )

# Write enriched stream
query = enriched_orders.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "Files/checkpoints/enriched_orders") \
    .toTable("enriched_orders")

# For frequently changing reference data, use foreachBatch to refresh
def process_with_fresh_reference(batch_df, batch_id):
    # Reload reference data each batch
    customers = spark.read.table("customers")
    products = spark.read.table("products")

    enriched = batch_df \
        .join(F.broadcast(customers), "customer_id", "left") \
        .join(F.broadcast(products), "product_id", "left")

    enriched.write.mode("append").saveAsTable("enriched_orders")

Pattern 5: Real-Time Alerting

Setting up automated alerts:

// Create a function for anomaly detection
.create-or-alter function DetectAnomalies() {
    raw_telemetry
    | where timestamp > ago(5m)
    | summarize
        current_temp = avg(temperature),
        reading_count = count()
        by device_id
    | join kind=inner (
        raw_telemetry
        | where timestamp between (ago(24h) .. ago(5m))
        | summarize baseline_temp = avg(temperature), stdev_temp = stdev(temperature)
            by device_id
    ) on device_id
    | where abs(current_temp - baseline_temp) > 3 * stdev_temp
    | project
        device_id,
        current_temp,
        baseline_temp,
        deviation = abs(current_temp - baseline_temp) / stdev_temp,
        alert_time = now()
}

// Create continuous export for alerts
.create-or-alter continuous-export AlertExport
over (DetectAnomalies)
to table AlertHistory
with (intervalBetweenRuns = 1m)

Python integration for alerts using Azure Logic Apps:

# Alerts in Fabric Real-Time Intelligence use Reflex (Data Activator)
# For custom alerting, use Logic Apps or Azure Functions

from azure.identity import DefaultAzureCredential
import requests
import json

# Option 1: Send alerts to Logic App
def send_alert_to_logic_app(alert_data: dict, logic_app_url: str):
    """Send alert to Azure Logic App for processing."""
    response = requests.post(
        logic_app_url,
        headers={"Content-Type": "application/json"},
        json=alert_data
    )
    return response.status_code == 200

# Option 2: Send to Teams webhook
def send_teams_alert(device_id: str, current_temp: float, teams_webhook_url: str):
    """Send alert to Microsoft Teams channel."""
    message = {
        "@type": "MessageCard",
        "summary": "Temperature Anomaly",
        "sections": [{
            "activityTitle": "Temperature Anomaly Detected",
            "facts": [
                {"name": "Device", "value": device_id},
                {"name": "Temperature", "value": f"{current_temp:.1f}C"}
            ],
            "markdown": True
        }]
    }
    requests.post(teams_webhook_url, json=message)

# Option 3: Use Fabric Reflex (Data Activator)
# In Fabric portal:
# 1. Create a Reflex item
# 2. Connect to your KQL database or eventstream
# 3. Define trigger conditions (e.g., deviation > 5)
# 4. Configure actions (email, Teams, Power Automate flow)

# Query KQL for anomalies and trigger alerts
credential = DefaultAzureCredential()
# Execute KQL query and process results for alerting

Real-Time Dashboards

Create live dashboards using Real-Time Dashboards in Fabric:

# Real-Time Dashboards are created in the Fabric portal
# They connect directly to KQL databases for live data

# Step 1: Create Real-Time Dashboard via REST API
from azure.identity import DefaultAzureCredential
import requests

credential = DefaultAzureCredential()
token = credential.get_token("https://api.fabric.microsoft.com/.default").token
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}

workspace_id = "your-workspace-id"
base_url = f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}"

dashboard_payload = {
    "displayName": "Live Sales Dashboard",
    "type": "RTDashboard",
    "description": "Real-time sales metrics dashboard"
}

response = requests.post(f"{base_url}/items", headers=headers, json=dashboard_payload)
dashboard = response.json()

# Step 2: Configure tiles in the Fabric portal with KQL queries:
# Example KQL for dashboard tiles:

kql_queries = {
    "total_sales_card": """
        all_transactions
        | where timestamp > ago(1h)
        | summarize total_sales = sum(total_amount)
    """,
    "sales_by_region_chart": """
        all_transactions
        | where timestamp > ago(1h)
        | summarize total_sales = sum(total_amount), tx_count = count()
            by bin(timestamp, 1m), region
        | render timechart
    """,
    "transaction_count": """
        all_transactions
        | where timestamp > ago(1h)
        | count
    """
}

# Note: Real-Time Dashboards auto-refresh every 30 seconds by default
# Configure in portal: Dashboard settings > Auto refresh interval

# Alternative: Use Power BI with DirectQuery to KQL Database
# for more customization options

Performance Optimization

// Optimize table for time-series queries
.alter table raw_telemetry policy streamingingestion enable

// Set appropriate retention
.alter table raw_telemetry policy retention softdelete = 30d

// Create materialized view for common aggregations
.create materialized-view HourlyMetrics on table raw_telemetry {
    raw_telemetry
    | summarize
        avg_temp = avg(temperature),
        max_temp = max(temperature),
        min_temp = min(temperature),
        count = count()
        by device_id, bin(timestamp, 1h)
}

// Use materialized view in queries (faster)
HourlyMetrics
| where timestamp > ago(7d)
| summarize daily_avg = avg(avg_temp) by device_id, bin(timestamp, 1d)

Real-time analytics in Fabric provides a powerful, integrated solution for streaming data. Start with simple patterns and evolve to more complex scenarios as your needs grow.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.