Back to Blog
5 min read

Eventstreams in Fabric: Streaming Data Ingestion

Eventstreams enable real-time data ingestion into Fabric. Today we’ll explore how to set up and manage streaming data flows using Eventstreams.

What are Eventstreams?

# Eventstreams provide:
eventstream_features = {
    "sources": [
        "Azure Event Hubs",
        "Azure IoT Hub",
        "Custom App (endpoint)",
        "Sample data (testing)"
    ],
    "transformations": [
        "Filter",
        "Aggregate",
        "Group by",
        "Union"
    ],
    "destinations": [
        "KQL Database",
        "Lakehouse",
        "Custom App"
    ]
}

# Use cases:
use_cases = [
    "IoT sensor data",
    "Application logs",
    "Clickstream data",
    "Financial transactions",
    "Social media feeds"
]

Creating an Eventstream

1. In your workspace, click "+ New" > "Eventstream"
2. Name your eventstream (e.g., "sensor_data_stream")
3. Click "Create"
4. You enter the Eventstream editor

Configuring Sources

Event Hubs Source

# Event Hubs configuration
event_hubs_config = {
    "connection_string": "Endpoint=sb://namespace.servicebus.windows.net/...",
    "event_hub_name": "telemetry-hub",
    "consumer_group": "$Default",
    "data_format": "JSON",
    "compression": "None"  # or Gzip
}

# Create Event Hub source:
# 1. In Eventstream, click "+ Add source"
# 2. Select "Event Hub"
# 3. Create new connection or use existing
# 4. Configure settings
# 5. Click "Add"

Custom App Source

# Custom App provides an endpoint for direct streaming
# After adding Custom App source:

endpoint_info = {
    "endpoint_url": "https://eventstream-endpoint.fabric.microsoft.com/...",
    "event_hub_name": "auto-generated",
    "sas_key": "provided in connection string",
    "sample_code": "See below"
}

# Python code to send events
import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
import json

async def send_events():
    connection_string = "Endpoint=sb://..."  # From Custom App
    producer = EventHubProducerClient.from_connection_string(connection_string)

    async with producer:
        batch = await producer.create_batch()

        event_data = {
            "device_id": "sensor001",
            "temperature": 25.5,
            "humidity": 60.0,
            "timestamp": "2023-07-21T10:30:00Z"
        }

        batch.add(EventData(json.dumps(event_data)))
        await producer.send_batch(batch)

asyncio.run(send_events())

Sample Data Source (for Testing)

# Built-in sample data for testing
sample_data_options = {
    "bicycles": "Rental bicycle events",
    "stock": "Stock market quotes",
    "refinery": "Industrial sensor data"
}

# 1. Click "+ Add source"
# 2. Select "Sample data"
# 3. Choose a sample dataset
# 4. Configure refresh rate

Transformations

Filter Transformation

# Add filter in Eventstream:
# 1. Click "+ Add transformation" > "Filter"
# 2. Connect to source
# 3. Define filter condition

# Example: Filter for high temperature
# Field: temperature
# Operator: Greater than
# Value: 30

Aggregate Transformation

# Rolling aggregations:
# 1. Add "Aggregate" transformation
# 2. Configure:

aggregate_config = {
    "group_by": ["device_id"],
    "window_type": "Tumbling",  # or Sliding, Session
    "window_duration": "5 minutes",
    "aggregations": [
        {"field": "temperature", "function": "Avg", "alias": "avg_temp"},
        {"field": "temperature", "function": "Max", "alias": "max_temp"},
        {"field": "*", "function": "Count", "alias": "reading_count"}
    ]
}

Group By Transformation

# Grouping without aggregation:
# Useful for partitioning stream

group_by_config = {
    "group_by_fields": ["device_type", "location"],
    "output": "Grouped events for downstream processing"
}

Configuring Destinations

KQL Database Destination

# Stream to KQL for real-time analytics
kql_destination = {
    "database": "telemetry_db",
    "table": "sensor_readings",
    "ingestion_mapping": "json_mapping",
    "format": "JSON"
}

# Steps:
# 1. Add destination > KQL Database
# 2. Select workspace and database
# 3. Create or select table
# 4. Configure column mapping
# 5. Connect transformation output to destination

Lakehouse Destination

# Stream to Lakehouse for batch processing
lakehouse_destination = {
    "lakehouse": "analytics_lakehouse",
    "table": "streaming_sensor_data",
    "format": "Delta",
    "write_mode": "Append"
}

# Good for:
# - Historical analysis
# - Combining with batch data
# - Power BI reporting

Complete Eventstream Example

# End-to-end streaming pipeline:

pipeline = {
    "source": {
        "type": "Event Hub",
        "name": "iot_events",
        "format": "JSON"
    },
    "transformations": [
        {
            "type": "Filter",
            "condition": "event_type == 'sensor_reading'"
        },
        {
            "type": "Aggregate",
            "window": "5 minutes tumbling",
            "group_by": "device_id",
            "aggregations": ["avg(value)", "max(value)", "count()"]
        }
    ],
    "destinations": [
        {
            "type": "KQL Database",
            "purpose": "Real-time dashboards"
        },
        {
            "type": "Lakehouse",
            "purpose": "Historical analysis"
        }
    ]
}

Monitoring Eventstreams

# Monitor stream health in Eventstream editor:
monitoring_metrics = {
    "events_in": "Events received from source",
    "events_out": "Events sent to destination",
    "latency": "Processing delay",
    "errors": "Failed events",
    "backlog": "Events pending processing"
}

# Set up alerts:
# 1. View metrics in Eventstream
# 2. Configure thresholds
# 3. Set up notifications via Power Automate

Error Handling

# Handle streaming errors:

error_handling = {
    "dead_letter": {
        "description": "Failed events stored separately",
        "configuration": "Enable in destination settings"
    },
    "retry_policy": {
        "max_retries": 3,
        "retry_interval": "30 seconds"
    },
    "schema_errors": {
        "handling": "Skip or fail",
        "logging": "Errors logged to monitor"
    }
}

# Best practices:
# 1. Always configure dead-letter queue
# 2. Monitor error rates
# 3. Handle schema evolution gracefully
# 4. Test with sample data first

Best Practices

best_practices = {
    "schema": {
        "tip": "Define schema explicitly",
        "reason": "Prevents runtime errors"
    },
    "partitioning": {
        "tip": "Partition by high-cardinality key",
        "reason": "Better parallel processing"
    },
    "windowing": {
        "tip": "Choose appropriate window size",
        "reason": "Balance latency vs accuracy"
    },
    "checkpointing": {
        "tip": "Enable checkpointing",
        "reason": "Enables recovery from failures"
    },
    "testing": {
        "tip": "Test with sample data first",
        "reason": "Validate transformations before production"
    }
}

Tomorrow we’ll explore Real-Time Dashboards for visualizing streaming data.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.