1 min read
Enhancing IoT Data with Message Enrichments
I wrote “Enhancing IoT Data with Message Enrichments” to share practical, production-minded guidance on this topic.
Understanding Message Enrichments
Message enrichments can add:
- Static values
- Device twin tags
- Device twin desired properties
- Device twin reported properties
- IoT Hub properties
Creating Message Enrichments
# Add static enrichment
az iot hub message-enrichment create \
--hub-name myIoTHub \
--resource-group myResourceGroup \
--key "region" \
--value "us-east" \
--endpoints "storageEndpoint" "eventHubEndpoint"
# Add device twin tag enrichment
az iot hub message-enrichment create \
--hub-name myIoTHub \
--resource-group myResourceGroup \
--key "deviceLocation" \
--value "\$twin.tags.location" \
--endpoints "storageEndpoint" "eventHubEndpoint"
# Add device twin property enrichment
az iot hub message-enrichment create \
--hub-name myIoTHub \
--resource-group myResourceGroup \
--key "firmwareVersion" \
--value "\$twin.properties.reported.firmwareVersion" \
--endpoints "storageEndpoint"
# Add IoT Hub name
az iot hub message-enrichment create \
--hub-name myIoTHub \
--resource-group myResourceGroup \
--key "hubName" \
--value "\$iothubname" \
--endpoints "storageEndpoint" "eventHubEndpoint"
Setting Up Device Twins for Enrichment
from azure.iot.hub import IoTHubRegistryManager
import json
registry_manager = IoTHubRegistryManager.from_connection_string(IOT_HUB_CONNECTION_STRING)
def setup_device_twin(device_id):
"""Configure device twin with tags for enrichment"""
twin = registry_manager.get_twin(device_id)
# Set tags for enrichment
twin.tags = {
"location": "building-1-floor-2",
"deviceType": "temperature-sensor",
"owner": "facilities",
"criticality": "high",
"installDate": "2022-01-15"
}
# Set desired properties
twin.properties.desired = {
"telemetryInterval": 60,
"alertThreshold": 30
}
registry_manager.update_twin(device_id, twin, twin.etag)
print(f"Twin updated for {device_id}")
# Update device reported properties from device side
async def report_device_properties(device_client):
"""Report device properties that can be used for enrichment"""
reported_properties = {
"firmwareVersion": "1.2.3",
"ipAddress": "192.168.1.100",
"status": "online",
"lastBootTime": "2022-07-27T08:00:00Z"
}
await device_client.patch_twin_reported_properties(reported_properties)
Available Enrichment Values
# Static value
--value "my-static-value"
# Device twin tags
--value "\$twin.tags.location"
--value "\$twin.tags.deviceType"
# Device twin desired properties
--value "\$twin.properties.desired.telemetryInterval"
# Device twin reported properties
--value "\$twin.properties.reported.firmwareVersion"
--value "\$twin.properties.reported.status"
# IoT Hub properties
--value "\$iothubname"
--value "\$connectionDeviceId"
--value "\$connectionModuleId"
Processing Enriched Messages
from azure.eventhub.aio import EventHubConsumerClient
import json
async def process_enriched_message(partition_context, event):
"""Process message with enrichments"""
# Get the original message body
body = json.loads(event.body_as_str())
# Get enriched properties from message properties
enrichments = {}
if event.properties:
enrichments = {
key.decode() if isinstance(key, bytes) else key:
value.decode() if isinstance(value, bytes) else value
for key, value in event.properties.items()
}
# Now you have device context without additional lookups
device_location = enrichments.get('deviceLocation')
device_type = enrichments.get('deviceType')
firmware = enrichments.get('firmwareVersion')
print(f"Telemetry from {device_location} ({device_type}):")
print(f" Firmware: {firmware}")
print(f" Temperature: {body.get('temperature')}")
# Route based on enriched data
if enrichments.get('criticality') == 'high' and body.get('temperature', 0) > 30:
await send_high_priority_alert(body, enrichments)
await partition_context.update_checkpoint(event)
Enrichment Scenarios
Multi-tenant Data Routing
async def route_by_tenant(event):
"""Route messages based on enriched tenant info"""
enrichments = dict(event.properties or {})
tenant_id = enrichments.get(b'tenantId', b'').decode()
# Route to tenant-specific storage
storage_clients = {
'tenant-a': TenantAStorage(),
'tenant-b': TenantBStorage(),
}
client = storage_clients.get(tenant_id, DefaultStorage())
await client.store(event.body_as_json())
Geo-distributed Processing
async def process_by_region(event):
"""Process based on enriched region info"""
enrichments = dict(event.properties or {})
region = enrichments.get(b'region', b'default').decode()
processors = {
'us-east': USEastProcessor(),
'us-west': USWestProcessor(),
'eu-west': EUWestProcessor(),
}
processor = processors.get(region, DefaultProcessor())
await processor.process(event)
Compliance Tagging
async def tag_for_compliance(event):
"""Tag data based on device compliance enrichments"""
enrichments = dict(event.properties or {})
compliance_record = {
'data': event.body_as_json(),
'device_owner': enrichments.get(b'owner', b'').decode(),
'data_classification': enrichments.get(b'dataClassification', b'general').decode(),
'retention_policy': enrichments.get(b'retentionPolicy', b'standard').decode(),
'collected_at': event.system_properties.get(b'iothub-enqueuedtime'),
}
await store_with_compliance_metadata(compliance_record)
Managing Enrichments
# List all enrichments
az iot hub message-enrichment list \
--hub-name myIoTHub \
--resource-group myResourceGroup
# Update an enrichment
az iot hub message-enrichment update \
--hub-name myIoTHub \
--resource-group myResourceGroup \
--key "region" \
--value "us-west" \
--endpoints "storageEndpoint"
# Delete an enrichment
az iot hub message-enrichment delete \
--hub-name myIoTHub \
--resource-group myResourceGroup \
--key "region"
Message enrichments simplify downstream processing by providing device context at the point of ingestion.\n\n## Takeaways\n\nAdd a concise, personal takeaway and recommended next steps here.\n