Back to Blog
4 min read

Working with IoT Hub Built-in Endpoints

IoT Hub’s built-in endpoints provide Event Hub-compatible interfaces for consuming device-to-cloud messages. Understanding these endpoints is essential for building real-time IoT data processing pipelines.

Understanding Built-in Endpoints

IoT Hub exposes two types of built-in endpoints:

  1. Events Endpoint: Event Hub-compatible endpoint for D2C messages
  2. Operations Monitoring: Endpoint for operational logs (deprecated in favor of diagnostic settings)

Getting Connection Information

# Get built-in endpoint connection string
az iot hub connection-string show \
    --hub-name myIoTHub \
    --default-eventhub \
    --output tsv

# Get Event Hub-compatible endpoint details
az iot hub show \
    --name myIoTHub \
    --query properties.eventHubEndpoints.events

Consuming Messages with Event Hub SDK

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
import json
import asyncio

class IoTMessageConsumer:
    def __init__(self, connection_string, storage_connection_string, container_name):
        self.connection_string = connection_string
        self.checkpoint_store = BlobCheckpointStore.from_connection_string(
            storage_connection_string,
            container_name
        )

    async def start_consuming(self):
        """Start consuming messages from IoT Hub"""
        client = EventHubConsumerClient.create_from_connection_string(
            conn_str=self.connection_string,
            consumer_group="$Default",
            checkpoint_store=self.checkpoint_store
        )

        async with client:
            await client.receive(
                on_event=self._process_event,
                on_error=self._handle_error,
                on_partition_initialize=self._on_partition_initialize,
                on_partition_close=self._on_partition_close
            )

    async def _process_event(self, partition_context, event):
        """Process a single event"""
        if event:
            # Extract system properties
            device_id = event.system_properties.get(b'iothub-connection-device-id', b'').decode()
            enqueued_time = event.system_properties.get(b'iothub-enqueuedtime')
            message_id = event.system_properties.get(b'iothub-message-id', b'').decode()

            # Get message body
            try:
                body = json.loads(event.body_as_str())
            except json.JSONDecodeError:
                body = event.body_as_str()

            # Get custom properties
            custom_props = dict(event.properties) if event.properties else {}

            print(f"Device: {device_id}")
            print(f"Partition: {partition_context.partition_id}")
            print(f"Sequence: {event.sequence_number}")
            print(f"Body: {body}")

            # Process the message
            await self._handle_message(device_id, body, custom_props)

            # Checkpoint after processing
            await partition_context.update_checkpoint(event)

    async def _handle_error(self, partition_context, error):
        """Handle errors during event processing"""
        if partition_context:
            print(f"Error in partition {partition_context.partition_id}: {error}")
        else:
            print(f"Error: {error}")

    async def _on_partition_initialize(self, partition_context):
        """Called when a partition is initialized"""
        print(f"Partition {partition_context.partition_id} initialized")

    async def _on_partition_close(self, partition_context, reason):
        """Called when a partition is closed"""
        print(f"Partition {partition_context.partition_id} closed: {reason}")

    async def _handle_message(self, device_id, body, properties):
        """Custom message handling logic"""
        # Implement your business logic here
        pass

# Usage
async def main():
    consumer = IoTMessageConsumer(
        connection_string="Endpoint=sb://...;SharedAccessKeyName=...;SharedAccessKey=...;EntityPath=...",
        storage_connection_string="DefaultEndpointsProtocol=https;AccountName=...;AccountKey=...;EndpointSuffix=...",
        container_name="checkpoints"
    )
    await consumer.start_consuming()

asyncio.run(main())

Partition Management

from azure.eventhub import EventHubConsumerClient

def get_partition_info(connection_string):
    """Get partition information for the built-in endpoint"""
    client = EventHubConsumerClient.create_from_connection_string(
        conn_str=connection_string,
        consumer_group="$Default"
    )

    with client:
        # Get partition IDs
        partition_ids = client.get_partition_ids()
        print(f"Partition IDs: {partition_ids}")

        # Get properties for each partition
        for partition_id in partition_ids:
            props = client.get_partition_properties(partition_id)
            print(f"\nPartition {partition_id}:")
            print(f"  Begin sequence: {props['beginning_sequence_number']}")
            print(f"  Last sequence: {props['last_enqueued_sequence_number']}")
            print(f"  Last offset: {props['last_enqueued_offset']}")
            print(f"  Last enqueued: {props['last_enqueued_time_utc']}")

Consumer Groups

# Create a consumer group
az iot hub consumer-group create \
    --hub-name myIoTHub \
    --name processingGroup

# List consumer groups
az iot hub consumer-group list \
    --hub-name myIoTHub \
    --output table
# Use custom consumer group
client = EventHubConsumerClient.create_from_connection_string(
    conn_str=connection_string,
    consumer_group="processingGroup"  # Custom consumer group
)

Real-time Processing with Azure Functions

import azure.functions as func
import json
import logging

def main(events: func.EventHubEvent):
    """Azure Function triggered by IoT Hub events"""

    for event in events:
        # Get message body
        body = event.get_body().decode('utf-8')
        logging.info(f'Event body: {body}')

        # Get system properties
        device_id = event.iothub_metadata.get('connection-device-id')
        enqueued_time = event.iothub_metadata.get('enqueuedtime')

        logging.info(f'Device: {device_id}, Time: {enqueued_time}')

        # Process the message
        try:
            data = json.loads(body)
            process_telemetry(device_id, data)
        except json.JSONDecodeError:
            logging.warning(f'Invalid JSON from device {device_id}')

def process_telemetry(device_id, data):
    """Process telemetry data"""
    # Implement your logic
    pass

Function bindings in function.json:

{
  "bindings": [
    {
      "type": "eventHubTrigger",
      "name": "events",
      "direction": "in",
      "eventHubName": "%IoTHubName%",
      "connection": "IoTHubConnection",
      "consumerGroup": "processingGroup",
      "cardinality": "many"
    }
  ]
}

Monitoring Built-in Endpoint

from azure.monitor.query import MetricsQueryClient
from azure.identity import DefaultAzureCredential

def query_endpoint_metrics():
    """Query metrics for the built-in endpoint"""
    credential = DefaultAzureCredential()
    client = MetricsQueryClient(credential)

    response = client.query_resource(
        resource_uri="/subscriptions/.../resourceGroups/.../providers/Microsoft.Devices/IotHubs/myIoTHub",
        metric_names=["d2c.endpoints.egress.builtIn.events", "d2c.endpoints.latency.builtIn.events"],
        timespan="PT1H"
    )

    for metric in response.metrics:
        print(f"{metric.name}: {metric.timeseries[0].data}")

The built-in endpoint provides a reliable, scalable way to consume IoT device telemetry for real-time processing.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.