Back to Blog
5 min read

Synapse Real-Time Analytics in Microsoft Fabric: Streaming at Scale

Synapse Real-Time Analytics in Microsoft Fabric provides capabilities for ingesting, analyzing, and visualizing streaming and time-series data using Kusto Query Language (KQL). Today, I will explore how to build real-time analytics solutions in Fabric.

Real-Time Analytics Overview

The Real-Time Analytics workload includes:

  • KQL Database: Time-optimized data store
  • KQL Queryset: Saved queries and analytics
  • Eventstream: Stream processing pipelines
  • Real-Time Dashboards: Live visualizations
┌─────────────────────────────────────────────────────┐
│           Synapse Real-Time Analytics               │
├─────────────────────────────────────────────────────┤
│  ┌───────────┐    ┌───────────┐    ┌───────────┐   │
│  │Eventstream│───▶│KQL        │───▶│Real-Time  │   │
│  │(Ingest)   │    │Database   │    │Dashboard  │   │
│  └───────────┘    └───────────┘    └───────────┘   │
│        │                │                          │
│        │          ┌─────┴─────┐                    │
│        │          │           │                    │
│  ┌─────▼─────┐  ┌─▼───────┐ ┌─▼───────┐          │
│  │Event Hubs │  │KQL      │ │Lakehouse│          │
│  │Kafka      │  │Queryset │ │(Export) │          │
│  │IoT Hub    │  └─────────┘ └─────────┘          │
│  └───────────┘                                    │
└─────────────────────────────────────────────────────┘

Creating a KQL Database

// KQL databases are created through the Fabric UI
// Once created, you can create tables and ingest data

// Create a table for IoT telemetry
.create table device_telemetry (
    device_id: string,
    timestamp: datetime,
    temperature: real,
    humidity: real,
    pressure: real,
    battery_level: int,
    location: dynamic
)

// Create a table for web events
.create table web_events (
    event_id: string,
    user_id: string,
    session_id: string,
    event_type: string,
    page_url: string,
    timestamp: datetime,
    properties: dynamic
)

Data Ingestion

Streaming Ingestion from Event Hubs

// Create ingestion mapping
.create table device_telemetry ingestion json mapping 'telemetry_mapping'
'['
'   {"column": "device_id", "path": "$.deviceId"},'
'   {"column": "timestamp", "path": "$.timestamp"},'
'   {"column": "temperature", "path": "$.readings.temperature"},'
'   {"column": "humidity", "path": "$.readings.humidity"},'
'   {"column": "pressure", "path": "$.readings.pressure"},'
'   {"column": "battery_level", "path": "$.batteryLevel"},'
'   {"column": "location", "path": "$.location"}'
']'

// Enable streaming ingestion
.alter table device_telemetry policy streamingingestion enable

Eventstream Configuration

# Eventstream connects sources to KQL Database
# Configuration in the Fabric UI:

eventstream_config = {
    "name": "IoT Telemetry Stream",
    "source": {
        "type": "EventHub",
        "connection": "Endpoint=sb://myhub.servicebus.windows.net/;...",
        "consumerGroup": "$Default"
    },
    "transformations": [
        {
            "type": "Filter",
            "condition": "temperature IS NOT NULL"
        },
        {
            "type": "AddTimestamp",
            "field": "ingestion_time"
        }
    ],
    "destination": {
        "type": "KQLDatabase",
        "database": "TelemetryDB",
        "table": "device_telemetry"
    }
}

KQL Queries

Basic Queries

// Get recent telemetry
device_telemetry
| where timestamp > ago(1h)
| take 100

// Filter by device
device_telemetry
| where device_id == "device-001"
| where timestamp > ago(24h)
| order by timestamp desc

// Count events by type
web_events
| where timestamp > ago(1d)
| summarize event_count = count() by event_type
| order by event_count desc

Time Series Analysis

// Aggregate by time buckets
device_telemetry
| where timestamp > ago(24h)
| summarize
    avg_temp = avg(temperature),
    max_temp = max(temperature),
    min_temp = min(temperature),
    reading_count = count()
    by bin(timestamp, 1h)
| order by timestamp asc

// Moving average
device_telemetry
| where timestamp > ago(7d)
| where device_id == "device-001"
| order by timestamp asc
| extend moving_avg = series_fir(temperature, dynamic([1,1,1,1,1]), false, false)
| project timestamp, temperature, moving_avg

// Detect anomalies
device_telemetry
| where timestamp > ago(24h)
| where device_id == "device-001"
| make-series temp_series = avg(temperature) on timestamp step 5m
| extend anomalies = series_decompose_anomalies(temp_series, 1.5)
| mv-expand timestamp, temp_series, anomalies
| where anomalies != 0
| project timestamp, temperature = temp_series, anomaly_score = anomalies

Aggregations and Statistics

// Percentiles
device_telemetry
| where timestamp > ago(1d)
| summarize
    p50_temp = percentile(temperature, 50),
    p90_temp = percentile(temperature, 90),
    p99_temp = percentile(temperature, 99)
    by device_id

// Histogram
device_telemetry
| where timestamp > ago(1d)
| summarize count() by bin(temperature, 5)
| render columnchart

// Correlation
device_telemetry
| where timestamp > ago(1d)
| where device_id == "device-001"
| project timestamp, temperature, humidity
| evaluate pearson_correlation_coefficient(temperature, humidity)

Sessionization

// Sessionize web events (30-minute gap)
web_events
| where timestamp > ago(7d)
| order by user_id, timestamp asc
| extend prev_timestamp = prev(timestamp, 1)
| extend prev_user = prev(user_id, 1)
| extend is_new_session = iif(
    user_id != prev_user or timestamp - prev_timestamp > 30m,
    1, 0)
| extend session_number = row_cumsum(is_new_session)
| summarize
    session_start = min(timestamp),
    session_end = max(timestamp),
    event_count = count(),
    pages_viewed = dcount(page_url)
    by user_id, session_number
| extend session_duration = session_end - session_start

Funnel Analysis

// Conversion funnel
let step1 = web_events | where event_type == "page_view" | where page_url contains "/products" | distinct user_id;
let step2 = web_events | where event_type == "add_to_cart" | distinct user_id;
let step3 = web_events | where event_type == "checkout_started" | distinct user_id;
let step4 = web_events | where event_type == "purchase_completed" | distinct user_id;
print
    step1_count = toscalar(step1 | count),
    step2_count = toscalar(step2 | where user_id in (step1) | count),
    step3_count = toscalar(step3 | where user_id in (step2) | count),
    step4_count = toscalar(step4 | where user_id in (step3) | count)
| extend
    step1_to_step2 = round(100.0 * step2_count / step1_count, 2),
    step2_to_step3 = round(100.0 * step3_count / step2_count, 2),
    step3_to_step4 = round(100.0 * step4_count / step3_count, 2),
    overall_conversion = round(100.0 * step4_count / step1_count, 2)

Real-Time Dashboards

// Dashboard tile: Current active devices
device_telemetry
| where timestamp > ago(5m)
| summarize last_seen = max(timestamp) by device_id
| count
| render card with (title="Active Devices")

// Dashboard tile: Temperature heatmap
device_telemetry
| where timestamp > ago(1h)
| summarize avg_temp = avg(temperature) by device_id, bin(timestamp, 5m)
| render timechart with (title="Temperature by Device")

// Dashboard tile: Alert on high temperature
device_telemetry
| where timestamp > ago(5m)
| where temperature > 80
| project device_id, timestamp, temperature
| order by temperature desc

Integration with Lakehouse

// Export to Lakehouse for long-term storage
.export to table lakehouse://workspace/lakehouse.Lakehouse/Tables/telemetry_archive
<|
device_telemetry
| where timestamp between (ago(30d) .. ago(7d))

// Create shortcut to historical data
// (Done through Fabric UI - creates a shortcut in KQL Database to Lakehouse data)

Real-Time Analytics in Fabric provides powerful streaming capabilities for time-sensitive workloads. Tomorrow, I will cover Power BI integration in Fabric.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.