Back to Blog
6 min read

KQL Querysets in Microsoft Fabric

KQL Querysets are the analysis layer of Real-Time Intelligence in Fabric. Today I’m exploring how to write effective KQL for real-time analytics.

What are KQL Querysets?

KQL Querysets are saved, shareable collections of Kusto Query Language queries that:

  • Connect to Eventhouses
  • Can be scheduled
  • Support parameters
  • Integrate with dashboards
  • Version controlled

KQL Fundamentals

Basic Query Structure

// Table | operator1 | operator2 | ...
SensorData
| where EventTime > ago(1h)
| where DeviceId startswith "sensor-"
| summarize avg(Value) by DeviceId
| order by avg_Value desc
| take 10

Filtering Data

// Time-based filtering
SensorData
| where EventTime between (datetime(2024-06-01) .. datetime(2024-06-02))

// String filtering
SensorData
| where DeviceId has "production"
| where SensorType in ("temperature", "humidity")
| where Location.building == "HQ"

// Numeric filtering
SensorData
| where Value > 0 and Value < 100
| where Value !between (40 .. 60)  // Outside normal range

Aggregations

// Basic aggregations
SensorData
| where EventTime > ago(24h)
| summarize
    count(),
    avg(Value),
    min(Value),
    max(Value),
    stdev(Value),
    percentile(Value, 95)
    by DeviceId

// Time-based aggregations
SensorData
| where EventTime > ago(7d)
| summarize avg(Value) by bin(EventTime, 1h), DeviceId
| render timechart

Joins

// Inner join
let Devices = datatable(DeviceId: string, Location: string)
[
    "D001", "Floor 1",
    "D002", "Floor 2"
];

SensorData
| join kind=inner Devices on DeviceId
| summarize avg(Value) by Location

// Left outer join
SensorData
| join kind=leftouter Devices on DeviceId
| extend LocationName = coalesce(Location, "Unknown")

// Time-based join (within time window)
SensorData
| join kind=inner (
    AlertsTable
    | where AlertTime > ago(1h)
) on DeviceId
| where abs(datetime_diff('second', EventTime, AlertTime)) < 60

Time Series Analysis

Creating Time Series

// Make series from data
SensorData
| where EventTime > ago(7d)
| make-series Value = avg(Value) default=0
    on EventTime
    step 1h
    by DeviceId

// With multiple metrics
SensorData
| where EventTime > ago(7d)
| make-series
    AvgValue = avg(Value),
    MaxValue = max(Value),
    Count = count()
    on EventTime step 1h
    by DeviceId

Time Series Functions

// Fill missing values
SensorData
| make-series Value = avg(Value) default=real(null) on EventTime step 5m
| extend Value = series_fill_linear(Value)

// Smoothing with moving average
SensorData
| make-series Value = avg(Value) on EventTime step 1m by DeviceId
| extend SmoothedValue = series_fir(Value, repeat(1, 5), true, true)

// Decomposition (trend, seasonal, residual)
SensorData
| make-series Value = avg(Value) on EventTime step 1h by DeviceId
| extend (seasonal, trend, residual) = series_decompose(Value)

Anomaly Detection

// Spike detection
SensorData
| where EventTime > ago(24h)
| make-series Value = avg(Value) on EventTime step 5m by DeviceId
| extend (anomalies, score, baseline) = series_decompose_anomalies(Value, 1.5)
| mv-expand EventTime to typeof(datetime),
            Value to typeof(real),
            anomalies to typeof(int),
            score to typeof(real),
            baseline to typeof(real)
| where anomalies != 0
| project EventTime, DeviceId, Value, AnomalyScore = score, Expected = baseline

// Changepoint detection
SensorData
| where DeviceId == "D001"
| make-series Value = avg(Value) on EventTime step 1h
| extend changepoints = series_decompose_anomalies(Value, 3, -1, 'linefit')

Forecasting

// Simple forecast
SensorData
| where DeviceId == "D001"
| where EventTime > ago(30d)
| make-series Value = avg(Value) on EventTime step 1h
| extend Forecast = series_decompose_forecast(Value, 24)  // 24 hours ahead

// With confidence intervals
SensorData
| where DeviceId == "D001"
| make-series Value = avg(Value) on EventTime step 1h
| extend (Forecast, LowerBound, UpperBound) = series_decompose_forecast(Value, 24)
| mv-expand EventTime, Value, Forecast, LowerBound, UpperBound

Advanced Patterns

Sessionization

// Group events into sessions (30 min gap = new session)
SensorData
| where DeviceId == "D001"
| order by EventTime asc
| extend SessionBreak = iif(
    datetime_diff('minute', EventTime, prev(EventTime)) > 30,
    1, 0
)
| extend SessionId = row_cumsum(SessionBreak)
| summarize
    SessionStart = min(EventTime),
    SessionEnd = max(EventTime),
    EventCount = count(),
    AvgValue = avg(Value)
    by SessionId

Funnel Analysis

// User journey funnel
let Step1 = Events | where Action == "ViewProduct" | distinct UserId;
let Step2 = Events | where Action == "AddToCart" | distinct UserId;
let Step3 = Events | where Action == "Checkout" | distinct UserId;
let Step4 = Events | where Action == "Purchase" | distinct UserId;

print
    ViewProduct = toscalar(Step1 | count),
    AddToCart = toscalar(Step1 | join kind=inner Step2 on UserId | count),
    Checkout = toscalar(Step1 | join kind=inner Step2 on UserId | join kind=inner Step3 on UserId | count),
    Purchase = toscalar(Step1 | join kind=inner Step2 on UserId | join kind=inner Step3 on UserId | join kind=inner Step4 on UserId | count)

Correlation Analysis

// Correlate two metrics
let Metric1 = SensorData | where SensorType == "temperature" | make-series Value on EventTime step 1h;
let Metric2 = SensorData | where SensorType == "humidity" | make-series Value on EventTime step 1h;

Metric1
| join Metric2 on EventTime
| extend Correlation = series_pearson_correlation(Value, Value1)

Parameters and Functions

Parameterized Queries

// Define parameters
declare query_parameters(
    StartTime: datetime = ago(24h),
    EndTime: datetime = now(),
    DeviceFilter: string = "*"
);

SensorData
| where EventTime between (StartTime .. EndTime)
| where DeviceId has DeviceFilter or DeviceFilter == "*"
| summarize avg(Value) by DeviceId, bin(EventTime, 1h)

User-Defined Functions

// Create reusable function
.create-or-alter function
    with (docstring = "Get device statistics", folder = "Analytics")
    GetDeviceStats(DeviceId: string, TimeRange: timespan) {
        SensorData
        | where DeviceId == DeviceId
        | where EventTime > ago(TimeRange)
        | summarize
            AvgValue = avg(Value),
            MinValue = min(Value),
            MaxValue = max(Value),
            StdDev = stdev(Value),
            Count = count()
}

// Use function
GetDeviceStats("D001", 7d)

Query Optimization

Best Practices

// Good: Filter early, reduce data
SensorData
| where EventTime > ago(1h)  // Filter first
| where DeviceId == "D001"   // Then narrow
| summarize avg(Value)

// Bad: Aggregate then filter
SensorData
| summarize avg(Value) by DeviceId, bin(EventTime, 1h)
| where DeviceId == "D001"  // Too late!

// Good: Use has/contains instead of regex when possible
SensorData
| where DeviceId has "production"  // Faster

// Avoid if possible
SensorData
| where DeviceId matches regex "prod.*"  // Slower

Explain Query

// Understand query execution
.show query plan
SensorData
| where EventTime > ago(1h)
| summarize avg(Value) by DeviceId

Visualization

// Time chart
SensorData
| where EventTime > ago(24h)
| summarize avg(Value) by bin(EventTime, 15m), DeviceId
| render timechart

// Bar chart
SensorData
| where EventTime > ago(24h)
| summarize count() by DeviceId
| render barchart

// Pie chart
SensorData
| where EventTime > ago(24h)
| summarize count() by SensorType
| render piechart

// Scatter plot
SensorData
| where EventTime > ago(1h)
| project Value, EventTime
| render scatterchart

What’s Next

Tomorrow I’ll cover real-time dashboards in Fabric.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.