Back to Blog
5 min read

Synapse Data Explorer: Real-Time Analytics at Scale

Synapse Data Explorer brings Azure Data Explorer capabilities into Synapse Analytics, enabling real-time analytics on streaming and time-series data. It uses Kusto Query Language (KQL) for powerful data exploration.

Creating a Data Explorer Pool

# Create Data Explorer pool via CLI
az synapse kusto pool create \
    --name "analytics-pool" \
    --workspace-name "my-synapse-workspace" \
    --resource-group "analytics-rg" \
    --location "eastus" \
    --sku name="Compute optimized" size="Small"

Data Ingestion

Set up streaming ingestion:

// Create database
.create database SensorAnalytics

// Create table for IoT sensor data
.create table SensorReadings (
    Timestamp: datetime,
    SensorId: string,
    DeviceType: string,
    Temperature: real,
    Humidity: real,
    Pressure: real,
    Location: dynamic
)

// Create ingestion mapping
.create table SensorReadings ingestion json mapping 'SensorMapping' ```
[
    {"column": "Timestamp", "path": "$.timestamp", "datatype": "datetime"},
    {"column": "SensorId", "path": "$.sensorId", "datatype": "string"},
    {"column": "DeviceType", "path": "$.deviceType", "datatype": "string"},
    {"column": "Temperature", "path": "$.readings.temperature", "datatype": "real"},
    {"column": "Humidity", "path": "$.readings.humidity", "datatype": "real"},
    {"column": "Pressure", "path": "$.readings.pressure", "datatype": "real"},
    {"column": "Location", "path": "$.location", "datatype": "dynamic"}
]

// Enable streaming ingestion .alter database SensorAnalytics policy streamingingestion enable

// Create Event Hub connection for streaming .create table SensorReadings ingestion batching policy

{
    "MaximumBatchingTimeSpan": "00:00:10",
    "MaximumNumberOfItems": 500,
    "MaximumRawDataSizeMB": 1
}

## Kusto Query Language Basics

Essential KQL queries:

```kql
// Basic filtering and aggregation
SensorReadings
| where Timestamp > ago(1h)
| where DeviceType == "TemperatureSensor"
| summarize
    AvgTemp = avg(Temperature),
    MaxTemp = max(Temperature),
    MinTemp = min(Temperature),
    Readings = count()
    by SensorId
| order by AvgTemp desc

// Time-series analysis with binning
SensorReadings
| where Timestamp > ago(24h)
| summarize AvgTemperature = avg(Temperature) by bin(Timestamp, 15m), SensorId
| render timechart

// Parse JSON from dynamic column
SensorReadings
| where Timestamp > ago(1h)
| extend
    Latitude = toreal(Location.lat),
    Longitude = toreal(Location.lon),
    Building = tostring(Location.building)
| summarize ReadingCount = count() by Building
| render piechart

// Join with reference data
let DeviceInfo = datatable(SensorId: string, Zone: string, Threshold: real)
[
    "SENSOR-001", "Zone-A", 30.0,
    "SENSOR-002", "Zone-A", 28.0,
    "SENSOR-003", "Zone-B", 32.0
];
SensorReadings
| where Timestamp > ago(1h)
| join kind=inner DeviceInfo on SensorId
| where Temperature > Threshold
| project Timestamp, SensorId, Zone, Temperature, Threshold, Deviation = Temperature - Threshold

Anomaly Detection

Built-in machine learning functions:

// Time-series anomaly detection
SensorReadings
| where Timestamp > ago(7d)
| where SensorId == "SENSOR-001"
| make-series
    AvgTemp = avg(Temperature)
    on Timestamp step 1h
| extend anomalies = series_decompose_anomalies(AvgTemp, 2.5)
| mv-expand Timestamp to typeof(datetime), AvgTemp to typeof(real), anomalies to typeof(int)
| where anomalies != 0
| project Timestamp, AvgTemp, AnomalyType = iff(anomalies > 0, "High", "Low")

// Seasonal decomposition
SensorReadings
| where Timestamp > ago(30d)
| make-series AvgTemp = avg(Temperature) on Timestamp step 1h
| extend (baseline, seasonal, trend, residual) = series_decompose(AvgTemp)
| render timechart

// Forecast future values
SensorReadings
| where Timestamp > ago(30d)
| where SensorId == "SENSOR-001"
| make-series AvgTemp = avg(Temperature) on Timestamp step 1h
| extend forecast = series_decompose_forecast(AvgTemp, 24)  // 24 hours ahead
| render timechart

// Pattern detection in logs
AppLogs
| where Timestamp > ago(1h)
| where Level == "Error"
| summarize Count = count() by bin(Timestamp, 5m)
| make-series ErrorCount = sum(Count) on Timestamp step 5m
| extend (anomalies, score, baseline) = series_decompose_anomalies(ErrorCount, 1.5, -1, 'linefit')
| mv-expand Timestamp, ErrorCount, anomalies, score, baseline
| where toint(anomalies) != 0

User-Defined Functions

Create reusable functions:

// Create function for temperature alerts
.create-or-alter function GetTemperatureAlerts(
    sensorId: string,
    threshold: real,
    lookback: timespan
) {
    SensorReadings
    | where Timestamp > ago(lookback)
    | where SensorId == sensorId
    | where Temperature > threshold
    | project Timestamp, SensorId, Temperature, Deviation = Temperature - threshold
    | order by Timestamp desc
}

// Use the function
GetTemperatureAlerts("SENSOR-001", 30.0, 24h)

// Create function for rolling statistics
.create-or-alter function RollingStats(
    sensorId: string,
    metric: string,
    windowSize: timespan
) {
    SensorReadings
    | where SensorId == sensorId
    | sort by Timestamp asc
    | extend
        RollingAvg = avg(Temperature) over (order by Timestamp rows between 60 preceding and current),
        RollingMax = max(Temperature) over (order by Timestamp rows between 60 preceding and current),
        RollingMin = min(Temperature) over (order by Timestamp rows between 60 preceding and current)
}

// Materialized view for common aggregations
.create materialized-view HourlySensorStats on table SensorReadings
{
    SensorReadings
    | summarize
        AvgTemperature = avg(Temperature),
        MaxTemperature = max(Temperature),
        MinTemperature = min(Temperature),
        AvgHumidity = avg(Humidity),
        ReadingCount = count()
        by SensorId, bin(Timestamp, 1h)
}

Integration with Spark

Query Data Explorer from Spark:

# PySpark code to query Data Explorer
from pyspark.sql import SparkSession

# Configure Kusto connection
kusto_options = {
    "kustoCluster": "https://mypool.eastus.kusto.azuresynapse.net",
    "kustoDatabase": "SensorAnalytics",
    "kustoTable": "SensorReadings",
    "kustoQuery": """
        SensorReadings
        | where Timestamp > ago(7d)
        | summarize AvgTemp = avg(Temperature) by SensorId, bin(Timestamp, 1h)
    """,
    "accessToken": dbutils.secrets.get(scope="kusto", key="token")
}

# Read from Data Explorer
df = spark.read \
    .format("com.microsoft.kusto.spark.synapse.datasource") \
    .options(**kusto_options) \
    .load()

# Process with Spark ML
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

# Continue with analysis...
df.show()

Real-Time Dashboard

Create Power BI dashboards:

// Query optimized for Power BI DirectQuery
let timeRange = ago(24h);
SensorReadings
| where Timestamp > timeRange
| summarize
    AvgTemperature = round(avg(Temperature), 2),
    MaxTemperature = round(max(Temperature), 2),
    MinTemperature = round(min(Temperature), 2),
    Readings = count()
    by SensorId, DeviceType, Hour = bin(Timestamp, 1h)
| order by Hour desc, SensorId asc

// KPI query for current status
SensorReadings
| where Timestamp > ago(15m)
| summarize
    CurrentTemp = round(avg(Temperature), 1),
    TempTrend = round(avg(Temperature) - avg(prev(Temperature, 1)), 2),
    AlertCount = countif(Temperature > 35)
    by SensorId

Summary

Synapse Data Explorer provides:

  • Real-time analytics on streaming data
  • Powerful KQL for data exploration
  • Built-in anomaly detection
  • Time-series analysis capabilities
  • Integration with Synapse ecosystem

Perfect for IoT, logs, and telemetry analytics.


References:

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.