Back to Blog
5 min read

Building Live Video Analytics Pipelines on Azure

Live Video Analytics (LVA) on IoT Edge enables building intelligent video applications that process live video feeds in real-time. It combines video capture, AI inference, and business logic at the edge while integrating with cloud services.

Live Video Analytics Architecture

+----------+     +----------------+     +------------------+
|  Camera  | --> |   IoT Edge     | --> |   Azure Cloud    |
|  (RTSP)  |     |   Device       |     |                  |
+----------+     +----------------+     +------------------+
                 |  LVA Module    |     |  - Media Services|
                 |  AI Module     |     |  - Storage       |
                 |  Custom Module |     |  - Event Hub     |
                 +----------------+     +------------------+

Setting Up LVA on IoT Edge

{
  "modulesContent": {
    "$edgeAgent": {
      "properties.desired": {
        "modules": {
          "lvaEdge": {
            "type": "docker",
            "status": "running",
            "restartPolicy": "always",
            "settings": {
              "image": "mcr.microsoft.com/media/live-video-analytics:2",
              "createOptions": {
                "HostConfig": {
                  "LogConfig": {
                    "Type": "json-file",
                    "Config": {
                      "max-size": "10m",
                      "max-file": "10"
                    }
                  },
                  "Binds": [
                    "/var/media/:/var/media/",
                    "/var/lib/azuremediaservices/:/var/lib/azuremediaservices/"
                  ]
                }
              }
            }
          }
        }
      }
    }
  }
}

Graph Topology for Live Analysis

import json

def create_live_inference_topology():
    """Create topology for real-time video inference."""

    topology = {
        "@apiVersion": "1.0",
        "name": "LiveInference",
        "properties": {
            "description": "Real-time object detection on live video",
            "parameters": [
                {"name": "rtspUrl", "type": "String"},
                {"name": "rtspUserName", "type": "String"},
                {"name": "rtspPassword", "type": "String"},
                {"name": "inferenceUrl", "type": "String"},
                {"name": "hubSinkOutputName", "type": "String", "default": "inferenceOutput"}
            ],
            "sources": [
                {
                    "@type": "#Microsoft.Media.MediaGraphRtspSource",
                    "name": "rtspSource",
                    "transport": "tcp",
                    "endpoint": {
                        "@type": "#Microsoft.Media.MediaGraphUnsecuredEndpoint",
                        "url": "${rtspUrl}",
                        "credentials": {
                            "@type": "#Microsoft.Media.MediaGraphUsernamePasswordCredentials",
                            "username": "${rtspUserName}",
                            "password": "${rtspPassword}"
                        }
                    }
                }
            ],
            "processors": [
                {
                    "@type": "#Microsoft.Media.MediaGraphFrameRateFilterProcessor",
                    "name": "frameRateFilter",
                    "inputs": [{"nodeName": "rtspSource"}],
                    "maximumFps": 2
                },
                {
                    "@type": "#Microsoft.Media.MediaGraphHttpExtension",
                    "name": "httpExtension",
                    "inputs": [{"nodeName": "frameRateFilter", "outputSelectors": []}],
                    "endpoint": {
                        "@type": "#Microsoft.Media.MediaGraphUnsecuredEndpoint",
                        "url": "${inferenceUrl}"
                    },
                    "image": {
                        "scale": {
                            "mode": "pad",
                            "width": "640",
                            "height": "480"
                        },
                        "format": {
                            "@type": "#Microsoft.Media.MediaGraphImageFormatBmp"
                        }
                    }
                }
            ],
            "sinks": [
                {
                    "@type": "#Microsoft.Media.MediaGraphIoTHubMessageSink",
                    "name": "hubSink",
                    "inputs": [{"nodeName": "httpExtension"}],
                    "hubOutputName": "${hubSinkOutputName}"
                }
            ]
        }
    }

    return topology

Custom AI Module for Inference

# inference_server.py
from flask import Flask, request, jsonify
import cv2
import numpy as np
import onnxruntime as ort

app = Flask(__name__)

# Load ONNX model
session = ort.InferenceSession("model.onnx")
input_name = session.get_inputs()[0].name

CLASSES = ["person", "car", "truck", "bicycle", "motorcycle"]

def preprocess_image(image_bytes):
    """Preprocess image for inference."""
    nparr = np.frombuffer(image_bytes, np.uint8)
    img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
    img = cv2.resize(img, (640, 480))
    img = img.astype(np.float32) / 255.0
    img = np.transpose(img, (2, 0, 1))
    img = np.expand_dims(img, axis=0)
    return img

def postprocess_results(outputs, threshold=0.5):
    """Convert model outputs to detections."""
    detections = []

    # Assuming YOLO-style output
    for detection in outputs[0]:
        confidence = detection[4]
        if confidence > threshold:
            class_id = int(np.argmax(detection[5:]))
            class_conf = detection[5 + class_id]

            if class_conf > threshold:
                x, y, w, h = detection[0:4]
                detections.append({
                    "type": "entity",
                    "entity": {
                        "tag": {
                            "value": CLASSES[class_id],
                            "confidence": float(class_conf)
                        },
                        "box": {
                            "l": float(x - w/2),
                            "t": float(y - h/2),
                            "w": float(w),
                            "h": float(h)
                        }
                    }
                })

    return detections

@app.route('/score', methods=['POST'])
def score():
    """Process image and return detections."""
    try:
        image_bytes = request.get_data()
        input_tensor = preprocess_image(image_bytes)

        outputs = session.run(None, {input_name: input_tensor})
        detections = postprocess_results(outputs)

        return jsonify({"inferences": detections})

    except Exception as e:
        return jsonify({"error": str(e)}), 500

@app.route('/health', methods=['GET'])
def health():
    return jsonify({"status": "healthy"})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5001)

Managing Graph Instances

import asyncio
from azure.iot.hub import IoTHubRegistryManager
from azure.iot.hub.models import CloudToDeviceMethod

class LVAController:
    def __init__(self, connection_string: str, device_id: str, module_id: str = "lvaEdge"):
        self.registry = IoTHubRegistryManager(connection_string)
        self.device_id = device_id
        self.module_id = module_id

    def invoke_method(self, method_name: str, payload: dict) -> dict:
        """Invoke a direct method on the LVA module."""

        method = CloudToDeviceMethod(
            method_name=method_name,
            payload=payload
        )

        response = self.registry.invoke_device_module_method(
            self.device_id,
            self.module_id,
            method
        )

        return response.payload

    def set_topology(self, topology: dict) -> dict:
        """Set a graph topology."""

        return self.invoke_method(
            "GraphTopologySet",
            topology
        )

    def create_instance(self, instance_name: str, topology_name: str,
                       parameters: dict) -> dict:
        """Create a graph instance."""

        instance = {
            "@apiVersion": "1.0",
            "name": instance_name,
            "properties": {
                "topologyName": topology_name,
                "description": f"Instance of {topology_name}",
                "parameters": [
                    {"name": k, "value": v}
                    for k, v in parameters.items()
                ]
            }
        }

        return self.invoke_method("GraphInstanceSet", instance)

    def activate_instance(self, instance_name: str) -> dict:
        """Activate a graph instance to start processing."""

        return self.invoke_method(
            "GraphInstanceActivate",
            {"@apiVersion": "1.0", "name": instance_name}
        )

    def deactivate_instance(self, instance_name: str) -> dict:
        """Deactivate a graph instance."""

        return self.invoke_method(
            "GraphInstanceDeactivate",
            {"@apiVersion": "1.0", "name": instance_name}
        )

    def delete_instance(self, instance_name: str) -> dict:
        """Delete a graph instance."""

        return self.invoke_method(
            "GraphInstanceDelete",
            {"@apiVersion": "1.0", "name": instance_name}
        )

    def list_instances(self) -> dict:
        """List all graph instances."""

        return self.invoke_method(
            "GraphInstanceList",
            {"@apiVersion": "1.0"}
        )

# Use the controller
controller = LVAController(
    "your-iot-hub-connection-string",
    "your-edge-device-id"
)

# Set up topology
topology = create_live_inference_topology()
controller.set_topology(topology)

# Create and start instance
controller.create_instance(
    "camera1-inference",
    "LiveInference",
    {
        "rtspUrl": "rtsp://camera1.local/stream",
        "rtspUserName": "admin",
        "rtspPassword": "password",
        "inferenceUrl": "http://inference:5001/score"
    }
)

controller.activate_instance("camera1-inference")

Event-Based Recording

def create_event_recording_topology():
    """Record video when AI detects specific events."""

    topology = {
        "@apiVersion": "1.0",
        "name": "EventTriggeredRecording",
        "properties": {
            "parameters": [
                {"name": "rtspUrl", "type": "String"},
                {"name": "inferenceUrl", "type": "String"},
                {"name": "assetNamePattern", "type": "String", "default": "event-${System.DateTime}"}
            ],
            "sources": [
                {
                    "@type": "#Microsoft.Media.MediaGraphRtspSource",
                    "name": "rtspSource",
                    "endpoint": {
                        "@type": "#Microsoft.Media.MediaGraphUnsecuredEndpoint",
                        "url": "${rtspUrl}"
                    }
                }
            ],
            "processors": [
                {
                    "@type": "#Microsoft.Media.MediaGraphFrameRateFilterProcessor",
                    "name": "frameRateFilter",
                    "inputs": [{"nodeName": "rtspSource"}],
                    "maximumFps": 5
                },
                {
                    "@type": "#Microsoft.Media.MediaGraphHttpExtension",
                    "name": "httpExtension",
                    "inputs": [{"nodeName": "frameRateFilter"}],
                    "endpoint": {
                        "@type": "#Microsoft.Media.MediaGraphUnsecuredEndpoint",
                        "url": "${inferenceUrl}"
                    },
                    "image": {
                        "format": {"@type": "#Microsoft.Media.MediaGraphImageFormatJpeg"}
                    }
                },
                {
                    "@type": "#Microsoft.Media.MediaGraphSignalGateProcessor",
                    "name": "signalGate",
                    "inputs": [
                        {"nodeName": "httpExtension"},
                        {"nodeName": "rtspSource"}
                    ],
                    "activationEvaluationWindow": "PT1S",
                    "activationSignalOffset": "-PT10S",
                    "minimumActivationTime": "PT30S",
                    "maximumActivationTime": "PT5M"
                }
            ],
            "sinks": [
                {
                    "@type": "#Microsoft.Media.MediaGraphAssetSink",
                    "name": "assetSink",
                    "inputs": [{"nodeName": "signalGate"}],
                    "assetNamePattern": "${assetNamePattern}",
                    "localMediaCachePath": "/var/lib/azuremediaservices/tmp/",
                    "localMediaCacheMaximumSizeMiB": "2048"
                },
                {
                    "@type": "#Microsoft.Media.MediaGraphIoTHubMessageSink",
                    "name": "hubSink",
                    "inputs": [{"nodeName": "httpExtension"}],
                    "hubOutputName": "inferenceOutput"
                }
            ]
        }
    }

    return topology

Best Practices

  1. Frame Rate Control: Limit FPS to reduce compute load
  2. Resolution Tuning: Balance quality with processing speed
  3. Buffering: Use signal gates for pre-event recording
  4. Error Handling: Implement retry logic for camera disconnects
  5. Resource Management: Monitor edge device CPU/memory
  6. Inference Optimization: Use INT8 quantized models for edge

Live Video Analytics enables real-time video intelligence at the edge, perfect for security, retail analytics, and industrial monitoring applications.

Michael John Pena

Michael John Pena

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.