Skip to content
Back to Blog
1 min read

Real-Time Analytics in Fabric: Architecture Patterns and Implementation

I wrote “Real-Time Analytics in Fabric: Architecture Patterns and Implementation” to share practical, production-minded guidance on this topic.

Real-Time Intelligence Components

┌─────────────┐     ┌──────────────┐     ┌─────────────┐
│   Sources   │────►│  Eventstream │────►│ KQL Database│
│ Event Hubs  │     │  Transform   │     │  Storage    │
│ Kafka       │     │  Route       │     │  Query      │
│ IoT Hub     │     │  Enrich      │     │  Analyze    │
└─────────────┘     └──────────────┘     └─────────────┘
                                                │
                    ┌──────────────────────────┴─────────┐
                    ▼                                    ▼
              ┌──────────┐                        ┌──────────┐
              │ Lakehouse│                        │Real-Time │
              │ Archive  │                        │Dashboard │
              └──────────┘                        └──────────┘

Pattern 1: Simple Event Processing

For straightforward event ingestion and analysis:

# Create eventstream via Fabric REST API
from azure.identity import DefaultAzureCredential
import requests

credential = DefaultAzureCredential()
token = credential.get_token("https://api.fabric.microsoft.com/.default").token
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}

workspace_id = "your-workspace-id"
base_url = f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}"

# Create eventstream item
eventstream_payload = {
    "displayName": "iot_telemetry",
    "type": "Eventstream",
    "description": "IoT telemetry ingestion from Event Hub"
}

response = requests.post(f"{base_url}/items", headers=headers, json=eventstream_payload)
eventstream = response.json()
print(f"Created Eventstream: {eventstream.get('id')}")

# Note: Eventstream source (Event Hub) and destination (KQL Database) configuration
# is done through the Fabric portal visual designer:
# 1. Open the eventstream in Fabric
# 2. Add Azure Event Hub as source (configure namespace, event hub, consumer group)
# 3. Add KQL Database as destination (select database and table)
# 4. Configure field mapping in the visual editor
# 5. Publish the eventstream

Query the data in KQL:

// Real-time device metrics
raw_telemetry
| where timestamp > ago(1h)
| summarize
    avg_temp = avg(temperature),
    max_temp = max(temperature),
    min_temp = min(temperature),
    reading_count = count()
    by device_id, bin(timestamp, 5m)
| order by timestamp desc

// Detect anomalies
raw_telemetry
| where timestamp > ago(24h)
| summarize avg_temp = avg(temperature), stdev_temp = stdev(temperature) by device_id
| join kind=inner (
    raw_telemetry
    | where timestamp > ago(1h)
) on device_id
| where abs(temperature - avg_temp) > 3 * stdev_temp
| project timestamp, device_id, temperature, avg_temp, stdev_temp

Pattern 2: Stream Processing with Transformations

For scenarios requiring real-time transformations, use Spark Structured Streaming:

# Stream processing with PySpark in Fabric notebooks
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

spark = SparkSession.builder.getOrCreate()

# Define schema for POS events
pos_schema = StructType() \
    .add("store_id", StringType()) \
    .add("quantity", DoubleType()) \
    .add("unit_price", DoubleType()) \
    .add("event_time", TimestampType())

# Read from Kafka
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka-cluster:9092") \
    .option("subscribe", "point-of-sale") \
    .load()

# Parse and transform
parsed_df = kafka_df \
    .select(F.from_json(F.col("value").cast("string"), pos_schema).alias("data")) \
    .select("data.*") \
    .withColumn("total_amount", F.col("quantity") * F.col("unit_price")) \
    .withColumn("tax_amount", F.col("total_amount") * 0.1) \
    .withColumn("processed_time", F.current_timestamp())

# Add region lookup (broadcast join with reference data)
store_regions = spark.read.table("store_regions")
enriched_df = parsed_df.join(F.broadcast(store_regions), "store_id", "left")

# Write all transactions to Delta table
all_sales_query = enriched_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "Files/checkpoints/all_sales") \
    .toTable("all_transactions")

# Write high-value alerts to separate table
high_value_query = enriched_df \
    .filter(F.col("total_amount") > 1000) \
    .writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "Files/checkpoints/high_value") \
    .toTable("high_value_alerts")

Pattern 3: Windowed Aggregations

For time-based aggregations using Spark Structured Streaming:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, IntegerType, TimestampType

spark = SparkSession.builder.getOrCreate()

# Read clickstream data
clickstream_df = spark.readStream \
    .format("delta") \
    .table("raw_clickstream")

# Tumbling window aggregation (1-minute windows)
page_view_counts = clickstream_df \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        F.window("event_time", "1 minute"),
        "page_url",
        "user_segment"
    ) \
    .agg(
        F.count("*").alias("view_count"),
        F.countDistinct("user_id").alias("unique_users"),
        F.avg("time_on_page").alias("avg_time_on_page")
    ) \
    .select(
        F.col("window.start").alias("window_start"),
        F.col("window.end").alias("window_end"),
        "page_url", "user_segment",
        "view_count", "unique_users", "avg_time_on_page"
    )

# Sliding window for trends (5-minute window, 1-minute slide)
rolling_metrics = clickstream_df \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        F.window("event_time", "5 minutes", "1 minute"),
        "page_url"
    ) \
    .agg(
        F.count("*").alias("rolling_views"),
        F.countDistinct("user_id").alias("rolling_unique_users")
    )

# Write aggregations to Delta tables
page_views_query = page_view_counts.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "Files/checkpoints/page_views") \
    .toTable("page_view_aggregates")

Pattern 4: Real-Time Joins

Joining streaming data with reference data using Spark:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

# Read streaming order events
order_events = spark.readStream \
    .format("delta") \
    .table("raw_order_events")

# Load reference data (static or periodically refreshed)
# Use broadcast for small dimension tables
customer_info = spark.read.table("customers")
product_info = spark.read.table("products")

# Stream-static join with broadcast for small tables
enriched_orders = order_events \
    .join(
        F.broadcast(customer_info),
        order_events.customer_id == customer_info.customer_id,
        "left"
    ) \
    .join(
        F.broadcast(product_info),
        order_events.product_id == product_info.product_id,
        "left"
    ) \
    .select(
        order_events.order_id,
        order_events.customer_id,
        customer_info.customer_name,
        customer_info.segment,
        order_events.product_id,
        product_info.product_name,
        product_info.category,
        order_events.quantity,
        order_events.amount,
        order_events.order_time
    )

# Write enriched stream
query = enriched_orders.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "Files/checkpoints/enriched_orders") \
    .toTable("enriched_orders")

# For frequently changing reference data, use foreachBatch to refresh
def process_with_fresh_reference(batch_df, batch_id):
    # Reload reference data each batch
    customers = spark.read.table("customers")
    products = spark.read.table("products")

    enriched = batch_df \
        .join(F.broadcast(customers), "customer_id", "left") \
        .join(F.broadcast(products), "product_id", "left")

    enriched.write.mode("append").saveAsTable("enriched_orders")

Pattern 5: Real-Time Alerting

Setting up automated alerts:

// Create a function for anomaly detection
.create-or-alter function DetectAnomalies() {
    raw_telemetry
    | where timestamp > ago(5m)
    | summarize
        current_temp = avg(temperature),
        reading_count = count()
        by device_id
    | join kind=inner (
        raw_telemetry
        | where timestamp between (ago(24h) .. ago(5m))
        | summarize baseline_temp = avg(temperature), stdev_temp = stdev(temperature)
            by device_id
    ) on device_id
    | where abs(current_temp - baseline_temp) > 3 * stdev_temp
    | project
        device_id,
        current_temp,
        baseline_temp,
        deviation = abs(current_temp - baseline_temp) / stdev_temp,
        alert_time = now()
}

// Create continuous export for alerts
.create-or-alter continuous-export AlertExport
over (DetectAnomalies)
to table AlertHistory
with (intervalBetweenRuns = 1m)

Python integration for alerts using Azure Logic Apps:

# Alerts in Fabric Real-Time Intelligence use Reflex (Data Activator)
# For custom alerting, use Logic Apps or Azure Functions

from azure.identity import DefaultAzureCredential
import requests
import json

# Option 1: Send alerts to Logic App
def send_alert_to_logic_app(alert_data: dict, logic_app_url: str):
    """Send alert to Azure Logic App for processing."""
    response = requests.post(
        logic_app_url,
        headers={"Content-Type": "application/json"},
        json=alert_data
    )
    return response.status_code == 200

# Option 2: Send to Teams webhook
def send_teams_alert(device_id: str, current_temp: float, teams_webhook_url: str):
    """Send alert to Microsoft Teams channel."""
    message = {
        "@type": "MessageCard",
        "summary": "Temperature Anomaly",
        "sections": [{
            "activityTitle": "Temperature Anomaly Detected",
            "facts": [
                {"name": "Device", "value": device_id},
                {"name": "Temperature", "value": f"{current_temp:.1f}C"}
            ],
            "markdown": True
        }]
    }
    requests.post(teams_webhook_url, json=message)

# Option 3: Use Fabric Reflex (Data Activator)
# In Fabric portal:
# 1. Create a Reflex item
# 2. Connect to your KQL database or eventstream
# 3. Define trigger conditions (e.g., deviation > 5)
# 4. Configure actions (email, Teams, Power Automate flow)

# Query KQL for anomalies and trigger alerts
credential = DefaultAzureCredential()
# Execute KQL query and process results for alerting

Real-Time Dashboards

Create live dashboards using Real-Time Dashboards in Fabric:

# Real-Time Dashboards are created in the Fabric portal
# They connect directly to KQL databases for live data

# Step 1: Create Real-Time Dashboard via REST API
from azure.identity import DefaultAzureCredential
import requests

credential = DefaultAzureCredential()
token = credential.get_token("https://api.fabric.microsoft.com/.default").token
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}

workspace_id = "your-workspace-id"
base_url = f"https://api.fabric.microsoft.com/v1/workspaces/{workspace_id}"

dashboard_payload = {
    "displayName": "Live Sales Dashboard",
    "type": "RTDashboard",
    "description": "Real-time sales metrics dashboard"
}

response = requests.post(f"{base_url}/items", headers=headers, json=dashboard_payload)
dashboard = response.json()

# Step 2: Configure tiles in the Fabric portal with KQL queries:
# Example KQL for dashboard tiles:

kql_queries = {
    "total_sales_card": """
        all_transactions
        | where timestamp > ago(1h)
        | summarize total_sales = sum(total_amount)
    """,
    "sales_by_region_chart": """
        all_transactions
        | where timestamp > ago(1h)
        | summarize total_sales = sum(total_amount), tx_count = count()
            by bin(timestamp, 1m), region
        | render timechart
    """,
    "transaction_count": """
        all_transactions
        | where timestamp > ago(1h)
        | count
    """
}

# Note: Real-Time Dashboards auto-refresh every 30 seconds by default
# Configure in portal: Dashboard settings > Auto refresh interval

# Alternative: Use Power BI with DirectQuery to KQL Database
# for more customization options

Performance Optimization

// Optimize table for time-series queries
.alter table raw_telemetry policy streamingingestion enable

// Set appropriate retention
.alter table raw_telemetry policy retention softdelete = 30d

// Create materialized view for common aggregations
.create materialized-view HourlyMetrics on table raw_telemetry {
    raw_telemetry
    | summarize
        avg_temp = avg(temperature),
        max_temp = max(temperature),
        min_temp = min(temperature),
        count = count()
        by device_id, bin(timestamp, 1h)
}

// Use materialized view in queries (faster)
HourlyMetrics
| where timestamp > ago(7d)
| summarize daily_avg = avg(avg_temp) by device_id, bin(timestamp, 1d)

Real-time analytics in Fabric provides a powerful, integrated solution for streaming data. Start with simple patterns and evolve to more complex scenarios as your needs grow.\n\n## Takeaways\n\nAdd a concise, personal takeaway and recommended next steps here.\n

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.