5 min read
KQL Databases: Advanced Patterns and Techniques
KQL (Kusto Query Language) databases are optimized for time-series data and analytics. Today we’ll explore advanced KQL patterns and database management techniques.
KQL Database Architecture
// KQL databases are optimized for:
// - Fast ingestion (millions of events/second)
// - Time-based queries
// - Aggregations and analytics
// - Full-text search
// Key concepts:
// - Tables: Store data
// - Functions: Reusable query logic
// - Materialized views: Pre-computed aggregations
// - Policies: Control data lifecycle
Advanced Query Patterns
Complex Aggregations
// Multi-level aggregation
events
| where timestamp > ago(7d)
| summarize
event_count = count(),
unique_users = dcount(user_id),
total_value = sum(value)
by event_type, bin(timestamp, 1h)
| summarize
hourly_avg_events = avg(event_count),
peak_hour_events = max(event_count),
total_unique_users = sum(unique_users)
by event_type
// Percentile calculations
telemetry
| where timestamp > ago(24h)
| summarize
p50 = percentile(response_time, 50),
p90 = percentile(response_time, 90),
p95 = percentile(response_time, 95),
p99 = percentile(response_time, 99)
by service_name
| order by p99 desc
// Histogram
telemetry
| where timestamp > ago(1h)
| summarize count() by bin(response_time, 10)
| render columnchart
Time Series Functions
// Time series creation and analysis
telemetry
| where timestamp > ago(7d)
| make-series avg_temp = avg(temperature) on timestamp step 1h by device_id
| extend (anomalies, score, baseline) = series_decompose_anomalies(avg_temp, 2.5)
// Forecasting
telemetry
| where timestamp > ago(30d)
| make-series value = avg(temperature) on timestamp step 1h
| extend forecast = series_decompose_forecast(value, 24)
// Seasonal decomposition
telemetry
| where timestamp > ago(30d)
| make-series readings = avg(temperature) on timestamp step 1h
| extend (baseline, seasonal, trend, residual) = series_decompose(readings)
| project timestamp, readings, baseline, seasonal, trend
Pattern Matching
// Sequence detection
events
| where timestamp > ago(24h)
| order by user_id, timestamp asc
| serialize
| extend prev_event = prev(event_type, 1)
| where prev_event == "view_product" and event_type == "add_to_cart"
| summarize conversion_count = count() by bin(timestamp, 1h)
// Sessionization
events
| where timestamp > ago(24h)
| order by user_id, timestamp asc
| serialize
| extend time_since_last = timestamp - prev(timestamp, 1, datetime(null))
| extend new_session = iff(time_since_last > 30m or isnull(time_since_last), 1, 0)
| extend session_id = row_cumsum(new_session)
| summarize
session_start = min(timestamp),
session_end = max(timestamp),
event_count = count()
by user_id, session_id
Joins and Correlations
// Temporal join (events within time window)
let requests = http_requests | where timestamp > ago(1h);
let errors = error_logs | where timestamp > ago(1h);
requests
| join kind=leftouter (
errors
| where level == "Error"
) on $left.request_id == $right.request_id
| summarize
total_requests = count(),
failed_requests = countif(isnotnull(error_message))
by bin(timestamp, 5m)
// Fuzzy join by time window
telemetry
| where timestamp > ago(1h)
| join kind=inner (
alerts
| where timestamp > ago(1h)
| extend alert_window_start = timestamp - 5m
| extend alert_window_end = timestamp + 5m
) on device_id
| where telemetry.timestamp between (alert_window_start .. alert_window_end)
Stored Functions
// Create a stored function
.create-or-alter function with (folder = "analytics", docstring = "Get device metrics for time range")
GetDeviceMetrics(start_time: datetime, end_time: datetime, device: string = "") {
telemetry
| where timestamp between (start_time .. end_time)
| where device == "" or device_id == device
| summarize
avg_temp = avg(temperature),
max_temp = max(temperature),
min_temp = min(temperature),
readings = count()
by device_id, bin(timestamp, 1h)
}
// Use the function
GetDeviceMetrics(ago(24h), now(), "device001")
// Parameterized function with defaults
.create-or-alter function GetRecentAlerts(hours: int = 24) {
alerts
| where timestamp > ago(hours * 1h)
| summarize count() by severity, alert_type
| order by count_ desc
}
Materialized Views
// Create materialized view for pre-aggregated data
.create materialized-view with (backfill = true) hourly_device_stats on table telemetry {
telemetry
| summarize
avg_temperature = avg(temperature),
max_temperature = max(temperature),
min_temperature = min(temperature),
reading_count = count()
by device_id, bin(timestamp, 1h)
}
// Query the materialized view (fast!)
hourly_device_stats
| where timestamp > ago(7d)
| summarize daily_avg = avg(avg_temperature) by device_id, bin(timestamp, 1d)
// Disable/enable materialized view
.alter materialized-view hourly_device_stats disable
.alter materialized-view hourly_device_stats enable
Data Management
Retention Policies
// Set retention policy
.alter table telemetry policy retention
’{ “SoftDeletePeriod”: “90.00:00:00”, “Recoverability”: “Enabled” }‘
// View current policies
.show table telemetry policy retention
Caching Policies
// Configure hot cache
.alter table telemetry policy caching
’{ “DataHotSpan”: “7.00:00:00”, “IndexHotSpan”: “14.00:00:00” }‘
Partitioning
// Create partition policy for better performance
.alter table telemetry policy partitioning
’{ “PartitionKeys”: [ { “ColumnName”: “device_id”, “Kind”: “Hash”, “Properties”: { “Function”: “XxHash64”, “MaxPartitionCount”: 128 } } ] }‘
Performance Optimization
// Performance tips:
// 1. Always filter by time first
// Good:
telemetry | where timestamp > ago(1h) | where device_id == "d001"
// Less optimal:
telemetry | where device_id == "d001" | where timestamp > ago(1h)
// 2. Project only needed columns
telemetry
| where timestamp > ago(1h)
| project device_id, temperature, timestamp // Only needed columns
// 3. Use summarize instead of distinct for counting
// Good:
telemetry | summarize device_count = dcount(device_id)
// Less optimal:
telemetry | distinct device_id | count
// 4. Leverage materialized views for repeated queries
// 5. Use partition elimination
telemetry
| where device_id == "specific_device" // Partition key filter
| where timestamp > ago(1h)
Monitoring Database Health
// Check table statistics
.show table telemetry details
// Ingestion metrics
.show table telemetry ingestion statistics
// Query statistics
.show queries
| where StartedOn > ago(1h)
| summarize count() by bin(StartedOn, 5m), State
| render timechart
// Extent (shard) information
.show table telemetry extents
| summarize count(), sum(OriginalSize), sum(CompressedSize) by bin(MinCreatedOn, 1d)
Tomorrow we’ll explore Eventstreams for streaming data ingestion.