Back to Blog
4 min read

IoT Hub Message Routing and Custom Endpoints

Message routing in Azure IoT Hub enables you to send device-to-cloud messages to different endpoints based on message properties. This is essential for building event-driven IoT architectures.

Understanding Message Sources

IoT Hub can route messages from multiple sources:

  • Device Telemetry Messages: Regular D2C messages
  • Device Twin Change Events: When twin properties change
  • Device Lifecycle Events: Connect, disconnect, create, delete
  • Digital Twin Change Events: For IoT Plug and Play devices

Creating Custom Endpoints

# Create Event Hub endpoint
az iot hub routing-endpoint create \
    --hub-name myIoTHub \
    --resource-group myResourceGroup \
    --endpoint-name telemetryEventHub \
    --endpoint-type eventhub \
    --endpoint-resource-group myResourceGroup \
    --endpoint-subscription-id $SUBSCRIPTION_ID \
    --connection-string "$EVENTHUB_CONNECTION_STRING"

# Create Service Bus Queue endpoint
az iot hub routing-endpoint create \
    --hub-name myIoTHub \
    --resource-group myResourceGroup \
    --endpoint-name alertsQueue \
    --endpoint-type servicebusqueue \
    --endpoint-resource-group myResourceGroup \
    --endpoint-subscription-id $SUBSCRIPTION_ID \
    --connection-string "$SERVICEBUS_CONNECTION_STRING"

# Create Blob Storage endpoint
az iot hub routing-endpoint create \
    --hub-name myIoTHub \
    --resource-group myResourceGroup \
    --endpoint-name archiveStorage \
    --endpoint-type azurestoragecontainer \
    --endpoint-resource-group myResourceGroup \
    --endpoint-subscription-id $SUBSCRIPTION_ID \
    --connection-string "$STORAGE_CONNECTION_STRING" \
    --container-name iot-archive \
    --encoding json \
    --batch-frequency-in-seconds 60 \
    --max-chunk-size-in-bytes 104857600 \
    --file-name-format "{iothub}/{partition}/{YYYY}/{MM}/{DD}/{HH}/{mm}"

Routing Queries

Basic Routing Rules

# Route all messages to storage
az iot hub route create \
    --hub-name myIoTHub \
    --resource-group myResourceGroup \
    --route-name allToStorage \
    --source devicemessages \
    --endpoint-name archiveStorage \
    --condition "true" \
    --enabled true

# Route high-temperature alerts
az iot hub route create \
    --hub-name myIoTHub \
    --resource-group myResourceGroup \
    --route-name temperatureAlerts \
    --source devicemessages \
    --endpoint-name alertsQueue \
    --condition '$body.temperature > 30' \
    --enabled true

# Route specific device types
az iot hub route create \
    --hub-name myIoTHub \
    --resource-group myResourceGroup \
    --route-name sensorData \
    --source devicemessages \
    --endpoint-name telemetryEventHub \
    --condition '$twin.tags.deviceType = "sensor"' \
    --enabled true

Advanced Query Syntax

-- Message body conditions
$body.temperature > 30 AND $body.humidity < 40

-- Application properties
$messageType = 'alert'

-- System properties
$connectionDeviceId = 'device001'

-- Twin tags
$twin.tags.location = 'building1'

-- Twin desired properties
$twin.properties.desired.alertThreshold < $body.temperature

-- Combined conditions
$body.temperature > $twin.properties.desired.maxTemp
    AND $twin.tags.critical = true
    AND $contentType = 'application/json'

Sending Messages with Routing Properties

from azure.iot.device import IoTHubDeviceClient, Message
import json

async def send_routable_message(client, telemetry, message_type):
    """Send message with routing properties"""

    # Create message
    message = Message(json.dumps(telemetry))

    # Set content type for body-based routing
    message.content_type = "application/json"
    message.content_encoding = "utf-8"

    # Set custom properties for routing
    message.custom_properties = {
        "messageType": message_type,
        "priority": "high" if telemetry.get("temperature", 0) > 30 else "normal",
        "deviceLocation": "building1-floor2"
    }

    await client.send_message(message)
    print(f"Sent {message_type} message")

# Usage
telemetry = {
    "temperature": 35.5,
    "humidity": 45.0,
    "timestamp": "2022-07-24T10:30:00Z"
}
await send_routable_message(client, telemetry, "telemetry")

# Alert message
alert = {
    "alertType": "highTemperature",
    "value": 35.5,
    "threshold": 30.0,
    "timestamp": "2022-07-24T10:30:00Z"
}
await send_routable_message(client, alert, "alert")

Processing Routed Messages

Event Hub Consumer

from azure.eventhub.aio import EventHubConsumerClient
import json

async def process_telemetry_events():
    """Process telemetry routed to Event Hub"""

    async def on_event(partition_context, event):
        body = json.loads(event.body_as_str())

        # Get device ID from system properties
        device_id = event.system_properties.get(b'iothub-connection-device-id', b'').decode()

        print(f"Device: {device_id}")
        print(f"Temperature: {body.get('temperature')}")
        print(f"Custom Props: {dict(event.properties or {})}")

        # Process telemetry
        await save_to_database(device_id, body)

        await partition_context.update_checkpoint(event)

    client = EventHubConsumerClient.create_from_connection_string(
        conn_str=EVENTHUB_CONNECTION_STRING,
        consumer_group="$Default"
    )

    async with client:
        await client.receive(on_event=on_event, starting_position="-1")

Service Bus Queue Consumer

from azure.servicebus.aio import ServiceBusClient
import json

async def process_alerts():
    """Process alerts routed to Service Bus Queue"""

    async with ServiceBusClient.from_connection_string(SERVICEBUS_CONNECTION_STRING) as client:
        receiver = client.get_queue_receiver(queue_name="alerts")

        async with receiver:
            async for message in receiver:
                body = json.loads(str(message))

                print(f"Alert received: {body}")

                # Process alert
                await handle_alert(body)

                # Complete the message
                await receiver.complete_message(message)

async def handle_alert(alert):
    """Handle an alert message"""
    if alert.get("alertType") == "highTemperature":
        # Send notification
        await send_notification(
            f"High temperature alert: {alert.get('value')}°C"
        )

Fallback Route

# Enable fallback route to built-in endpoint
az iot hub update \
    --name myIoTHub \
    --resource-group myResourceGroup \
    --set properties.routing.fallbackRoute.isEnabled=true

# Messages not matching any route go to built-in endpoint

Testing Routes

# Test a routing query
az iot hub message-route test \
    --hub-name myIoTHub \
    --resource-group myResourceGroup \
    --route-name temperatureAlerts \
    --body '{"temperature": 35}' \
    --app-properties '{"messageType": "telemetry"}'

Message routing is the foundation for building sophisticated, event-driven IoT architectures on Azure.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.