Back to Blog
4 min read

Configuring Custom Endpoints in Azure IoT Hub

Custom endpoints in IoT Hub allow you to route messages to various Azure services beyond the built-in Event Hub endpoint. This enables sophisticated data processing architectures tailored to your needs.

Supported Endpoint Types

IoT Hub supports four types of custom endpoints:

  1. Event Hubs: High-throughput event streaming
  2. Service Bus Queues: Reliable message queuing
  3. Service Bus Topics: Pub/sub messaging
  4. Azure Blob Storage: Long-term archival

Event Hub Endpoint

# Create Event Hub namespace and hub
az eventhubs namespace create \
    --name myEventHubNamespace \
    --resource-group myResourceGroup \
    --location eastus \
    --sku Standard

az eventhubs eventhub create \
    --name telemetryHub \
    --namespace-name myEventHubNamespace \
    --resource-group myResourceGroup \
    --partition-count 4 \
    --message-retention 7

# Get connection string
EVENTHUB_CONN=$(az eventhubs eventhub authorization-rule keys list \
    --resource-group myResourceGroup \
    --namespace-name myEventHubNamespace \
    --eventhub-name telemetryHub \
    --name RootManageSharedAccessKey \
    --query primaryConnectionString -o tsv)

# Add as IoT Hub endpoint
az iot hub routing-endpoint create \
    --hub-name myIoTHub \
    --resource-group myResourceGroup \
    --endpoint-name telemetryEventHub \
    --endpoint-type eventhub \
    --endpoint-resource-group myResourceGroup \
    --endpoint-subscription-id $SUBSCRIPTION_ID \
    --connection-string "$EVENTHUB_CONN"

Service Bus Queue Endpoint

# Create Service Bus namespace and queue
az servicebus namespace create \
    --name myServiceBusNamespace \
    --resource-group myResourceGroup \
    --location eastus \
    --sku Standard

az servicebus queue create \
    --name alertsQueue \
    --namespace-name myServiceBusNamespace \
    --resource-group myResourceGroup \
    --max-size 1024 \
    --default-message-time-to-live P14D

# Get connection string
SERVICEBUS_CONN=$(az servicebus namespace authorization-rule keys list \
    --resource-group myResourceGroup \
    --namespace-name myServiceBusNamespace \
    --name RootManageSharedAccessKey \
    --query primaryConnectionString -o tsv)

# Add as IoT Hub endpoint
az iot hub routing-endpoint create \
    --hub-name myIoTHub \
    --resource-group myResourceGroup \
    --endpoint-name alertsQueue \
    --endpoint-type servicebusqueue \
    --endpoint-resource-group myResourceGroup \
    --endpoint-subscription-id $SUBSCRIPTION_ID \
    --connection-string "$SERVICEBUS_CONN;EntityPath=alertsQueue"

Service Bus Topic Endpoint

# Create Service Bus topic
az servicebus topic create \
    --name deviceEvents \
    --namespace-name myServiceBusNamespace \
    --resource-group myResourceGroup

# Create subscriptions for different consumers
az servicebus topic subscription create \
    --name analyticsSubscription \
    --topic-name deviceEvents \
    --namespace-name myServiceBusNamespace \
    --resource-group myResourceGroup

az servicebus topic subscription create \
    --name alertingSubscription \
    --topic-name deviceEvents \
    --namespace-name myServiceBusNamespace \
    --resource-group myResourceGroup \
    --filter-sql-expression "priority = 'high'"

# Add as IoT Hub endpoint
az iot hub routing-endpoint create \
    --hub-name myIoTHub \
    --resource-group myResourceGroup \
    --endpoint-name deviceEventsTopic \
    --endpoint-type servicebustopic \
    --endpoint-resource-group myResourceGroup \
    --endpoint-subscription-id $SUBSCRIPTION_ID \
    --connection-string "$SERVICEBUS_CONN;EntityPath=deviceEvents"

Blob Storage Endpoint

# Create storage account and container
az storage account create \
    --name myiotarchive \
    --resource-group myResourceGroup \
    --location eastus \
    --sku Standard_LRS

az storage container create \
    --name telemetry-archive \
    --account-name myiotarchive

# Get connection string
STORAGE_CONN=$(az storage account show-connection-string \
    --name myiotarchive \
    --resource-group myResourceGroup \
    --query connectionString -o tsv)

# Add as IoT Hub endpoint with custom file naming
az iot hub routing-endpoint create \
    --hub-name myIoTHub \
    --resource-group myResourceGroup \
    --endpoint-name archiveStorage \
    --endpoint-type azurestoragecontainer \
    --endpoint-resource-group myResourceGroup \
    --endpoint-subscription-id $SUBSCRIPTION_ID \
    --connection-string "$STORAGE_CONN" \
    --container-name telemetry-archive \
    --encoding json \
    --batch-frequency-in-seconds 300 \
    --max-chunk-size-in-bytes 104857600 \
    --file-name-format "{iothub}/{partition}/{YYYY}/{MM}/{DD}/{HH}/{mm}"

Processing Messages from Custom Endpoints

Event Hub Consumer

from azure.eventhub.aio import EventHubConsumerClient
import json

async def process_eventhub_messages():
    """Process messages from Event Hub endpoint"""
    client = EventHubConsumerClient.create_from_connection_string(
        conn_str=EVENTHUB_CONNECTION_STRING,
        consumer_group="$Default",
        eventhub_name="telemetryHub"
    )

    async def on_event(partition_context, event):
        data = json.loads(event.body_as_str())
        device_id = event.system_properties.get(b'iothub-connection-device-id', b'').decode()

        # Process telemetry
        await process_telemetry(device_id, data)
        await partition_context.update_checkpoint(event)

    async with client:
        await client.receive(on_event=on_event, starting_position="-1")

Service Bus Queue Consumer

from azure.servicebus.aio import ServiceBusClient
import json

async def process_queue_messages():
    """Process messages from Service Bus Queue endpoint"""
    async with ServiceBusClient.from_connection_string(SERVICEBUS_CONNECTION_STRING) as client:
        receiver = client.get_queue_receiver(queue_name="alertsQueue")

        async with receiver:
            async for message in receiver:
                data = json.loads(str(message))

                # Get IoT Hub metadata from application properties
                device_id = message.application_properties.get(b'iothub-connection-device-id', b'').decode()

                # Process alert
                await handle_alert(device_id, data)

                await receiver.complete_message(message)

Service Bus Topic Consumer

async def process_topic_messages():
    """Process messages from Service Bus Topic endpoint"""
    async with ServiceBusClient.from_connection_string(SERVICEBUS_CONNECTION_STRING) as client:
        receiver = client.get_subscription_receiver(
            topic_name="deviceEvents",
            subscription_name="analyticsSubscription"
        )

        async with receiver:
            async for message in receiver:
                data = json.loads(str(message))
                await process_analytics(data)
                await receiver.complete_message(message)

Reading from Blob Storage

from azure.storage.blob import BlobServiceClient
import json

def read_archived_data(date, hour):
    """Read archived IoT data from blob storage"""
    blob_service = BlobServiceClient.from_connection_string(STORAGE_CONNECTION_STRING)
    container = blob_service.get_container_client("telemetry-archive")

    # List blobs for specific time period
    prefix = f"myIoTHub/0/{date.year:04d}/{date.month:02d}/{date.day:02d}/{hour:02d}"

    messages = []
    for blob in container.list_blobs(name_starts_with=prefix):
        blob_client = container.get_blob_client(blob)
        content = blob_client.download_blob().readall().decode('utf-8')

        # Parse JSON lines format
        for line in content.strip().split('\n'):
            if line:
                messages.append(json.loads(line))

    return messages

Custom endpoints enable you to build flexible, scalable IoT data processing architectures tailored to your specific requirements.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.