Back to Blog
5 min read

Fabric Real-Time Analytics: Stream Processing at Scale

Real-Time Analytics in Fabric brings streaming data capabilities with sub-second latency. Today we’ll explore this powerful feature for handling real-time data workloads.

What is Real-Time Analytics?

# Real-Time Analytics components
rta_components = {
    "kql_database": "Time-series optimized database",
    "eventstream": "Streaming data ingestion",
    "kql_queryset": "Saved KQL queries",
    "real_time_dashboard": "Live visualizations"
}

# Use cases
use_cases = [
    "IoT telemetry analysis",
    "Application log analytics",
    "Clickstream analysis",
    "Financial market data",
    "Security event monitoring",
    "Real-time operational dashboards"
]

Creating a KQL Database

1. In your workspace, click "+ New" > "KQL Database"
2. Name your database (e.g., "telemetry_db")
3. Click "Create"

KQL Basics

Querying Data

// Basic query structure
// database.table | operators

// Select all records from last hour
telemetry
| where ingestion_time() > ago(1h)

// Filter and project columns
telemetry
| where device_type == "sensor"
| where temperature > 30
| project device_id, temperature, timestamp

// Aggregations
telemetry
| where timestamp > ago(24h)
| summarize
    avg_temp = avg(temperature),
    max_temp = max(temperature),
    min_temp = min(temperature),
    reading_count = count()
    by device_id
| order by avg_temp desc

Time-Based Analysis

// Time-series aggregation with bin()
telemetry
| where timestamp > ago(24h)
| summarize avg_temperature = avg(temperature) by bin(timestamp, 5m)
| render timechart

// Rolling windows
telemetry
| where timestamp > ago(1h)
| summarize
    avg_temp = avg(temperature),
    max_temp = max(temperature)
    by bin(timestamp, 1m), device_id
| order by timestamp desc

// Moving average
telemetry
| where timestamp > ago(24h)
| summarize avg_temp = avg(temperature) by bin(timestamp, 5m)
| serialize
| extend moving_avg = series_fir(avg_temp, repeat(1, 12), true, true)

Joins and Lookups

// Join with dimension table
telemetry
| where timestamp > ago(1h)
| join kind=inner (
    device_metadata
    | project device_id, device_name, location
) on device_id
| project timestamp, device_name, location, temperature

// Lookup for enrichment
telemetry
| where timestamp > ago(1h)
| lookup kind=leftouter device_metadata on device_id
| project timestamp, device_name, temperature

Anomaly Detection

// Built-in anomaly detection
telemetry
| where timestamp > ago(7d)
| summarize readings = make_list(temperature) by device_id
| extend anomalies = series_decompose_anomalies(readings, 1.5)
| mv-expand readings, anomalies
| where anomalies == 1
| project device_id, anomaly_value = readings

// Statistical anomaly detection
telemetry
| where timestamp > ago(24h)
| summarize
    avg_temp = avg(temperature),
    stdev_temp = stdev(temperature)
    by device_id
| extend
    upper_bound = avg_temp + (3 * stdev_temp),
    lower_bound = avg_temp - (3 * stdev_temp)
| join kind=inner (
    telemetry
    | where timestamp > ago(1h)
) on device_id
| where temperature > upper_bound or temperature < lower_bound
| project device_id, temperature, avg_temp, timestamp

Creating Tables

// Create a table
.create table telemetry (
    device_id: string,
    device_type: string,
    temperature: real,
    humidity: real,
    timestamp: datetime
)

// Create table mapping for JSON ingestion
.create table telemetry ingestion json mapping 'telemetry_mapping'
'['
'  {"column": "device_id", "path": "$.deviceId", "datatype": "string"},'
'  {"column": "device_type", "path": "$.deviceType", "datatype": "string"},'
'  {"column": "temperature", "path": "$.temperature", "datatype": "real"},'
'  {"column": "humidity", "path": "$.humidity", "datatype": "real"},'
'  {"column": "timestamp", "path": "$.timestamp", "datatype": "datetime"}'
']'

// Set retention policy
.alter-merge table telemetry policy retention

’{’ ’ “SoftDeletePeriod”: “30.00:00:00”,’ ’ “Recoverability”: “Enabled”’ ’}‘

Ingesting Data

Manual Ingestion

// Inline ingestion (for testing)
.ingest inline into table telemetry <|
"device001", "sensor", 25.5, 60.0, datetime(2023-07-19 10:00:00)
"device002", "sensor", 26.0, 58.5, datetime(2023-07-19 10:00:00)

// Ingest from storage
.ingest into table telemetry (
    h'https://storage.blob.core.windows.net/container/data.json'
) with (
    format = 'json',
    ingestionMappingReference = 'telemetry_mapping'
)

Eventstream Ingestion

# Eventstream configuration
eventstream_sources = {
    "event_hubs": "Azure Event Hubs",
    "kafka": "Apache Kafka",
    "custom_app": "Custom application endpoint",
    "sample_data": "Built-in sample data"
}

# Create Eventstream in Fabric:
# 1. New > Eventstream
# 2. Add source (e.g., Event Hub)
# 3. Add destination (KQL Database)
# 4. Configure mapping
# 5. Start the stream

Real-Time Dashboard

// Create queries for dashboard tiles

// Tile 1: Current reading count
telemetry
| where timestamp > ago(5m)
| summarize count()

// Tile 2: Temperature trend
telemetry
| where timestamp > ago(1h)
| summarize avg_temp = avg(temperature) by bin(timestamp, 1m)
| render timechart

// Tile 3: Device status
telemetry
| where timestamp > ago(5m)
| summarize last_reading = max(timestamp) by device_id
| extend status = iff(last_reading > ago(2m), "Online", "Offline")
| summarize count() by status

// Tile 4: Top devices by reading count
telemetry
| where timestamp > ago(1h)
| summarize readings = count() by device_id
| top 10 by readings
| render barchart

Integration with Other Fabric Items

// Query KQL database from Spark notebook
// Use the Kusto Spark connector

// In PySpark:
df = spark.read \
    .format("com.microsoft.kusto.spark.synapse.datasource") \
    .option("kustoCluster", "https://kqldb.fabric.microsoft.com") \
    .option("kustoDatabase", "telemetry_db") \
    .option("kustoQuery", "telemetry | where timestamp > ago(1h)") \
    .load()

// Write Spark data to KQL
df.write \
    .format("com.microsoft.kusto.spark.synapse.datasource") \
    .option("kustoCluster", "https://kqldb.fabric.microsoft.com") \
    .option("kustoDatabase", "telemetry_db") \
    .option("kustoTable", "processed_telemetry") \
    .mode("Append") \
    .save()

Best Practices

best_practices = {
    "schema_design": [
        "Use appropriate data types",
        "Consider ingestion timestamp",
        "Plan for query patterns"
    ],
    "ingestion": [
        "Batch small events",
        "Use streaming for real-time",
        "Configure appropriate retention"
    ],
    "querying": [
        "Always filter by time first",
        "Use summarize for aggregations",
        "Leverage materialized views for common queries"
    ],
    "monitoring": [
        "Track ingestion latency",
        "Monitor query performance",
        "Set up alerts for anomalies"
    ]
}

Tomorrow we’ll dive deeper into KQL databases and advanced patterns.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.