Back to Blog
6 min read

Real-Time AI: Streaming Inference for Live Data Applications

Real-time AI processes data as it arrives, enabling immediate insights and actions. For data applications, this means AI-powered alerts, live dashboards, and automated responses. Let’s explore streaming inference patterns.

Real-Time AI Architecture

Event Source          Processing              AI Inference           Action
    │                     │                       │                    │
    ▼                     ▼                       ▼                    ▼
┌────────┐         ┌───────────┐           ┌───────────┐        ┌──────────┐
│ Event  │────────►│ Stream    │──────────►│ AI Model  │───────►│ Response │
│ Hubs   │         │ Processing│           │ Endpoint  │        │ Action   │
└────────┘         └───────────┘           └───────────┘        └──────────┘
                         │                       │
                         ▼                       ▼
                   ┌───────────┐           ┌───────────┐
                   │ Feature   │           │ Context   │
                   │ Store     │           │ (RAG)     │
                   └───────────┘           └───────────┘

Streaming with Azure Functions

import azure.functions as func
from azure.ai.foundry import AIFoundryClient
from azure.identity import DefaultAzureCredential
import json

app = func.FunctionApp()

# Initialize AI client outside handler for connection reuse
ai_client = AIFoundryClient(
    project="realtime-ai",
    credential=DefaultAzureCredential()
)

@app.event_hub_message_trigger(
    arg_name="events",
    event_hub_name="transactions",
    connection="EventHubConnection"
)
async def process_transactions(events: func.EventHubEvent):
    """Process transactions in real-time with AI."""

    for event in events:
        transaction = json.loads(event.get_body().decode())

        # Fast classification using smaller model
        classification = await classify_transaction(transaction)

        if classification["risk_score"] > 0.8:
            # High risk - detailed analysis
            analysis = await analyze_high_risk(transaction)
            await send_alert(analysis)

        # Store result
        await store_result(transaction["id"], classification)

async def classify_transaction(transaction: dict) -> dict:
    """Quick classification for risk scoring."""

    response = await ai_client.chat.complete_async(
        deployment="gpt-4o-mini",  # Fast model for real-time
        messages=[{
            "role": "system",
            "content": "Classify transaction risk. Return JSON: {risk_score: 0-1, reason: string}"
        }, {
            "role": "user",
            "content": json.dumps(transaction)
        }],
        max_tokens=100
    )

    return json.loads(response.choices[0].message.content)

async def analyze_high_risk(transaction: dict) -> dict:
    """Detailed analysis for high-risk transactions."""

    # Get customer history for context
    history = await get_customer_history(transaction["customer_id"])

    response = await ai_client.chat.complete_async(
        deployment="gpt-4o",  # Full model for detailed analysis
        messages=[{
            "role": "system",
            "content": """Analyze this potentially fraudulent transaction.
            Consider customer history and transaction patterns.
            Provide detailed risk assessment and recommended actions."""
        }, {
            "role": "user",
            "content": f"""
            Transaction: {json.dumps(transaction)}
            Customer History: {json.dumps(history)}
            """
        }]
    )

    return {
        "transaction_id": transaction["id"],
        "analysis": response.choices[0].message.content,
        "timestamp": datetime.utcnow().isoformat()
    }

Real-Time Anomaly Detection

from collections import deque
import numpy as np

class StreamingAnomalyDetector:
    def __init__(self, ai_client, window_size: int = 100):
        self.ai_client = ai_client
        self.window_size = window_size
        self.data_windows = {}  # Per-metric windows

    async def process_metric(self, metric_name: str, value: float, metadata: dict) -> dict:
        """Process incoming metric and detect anomalies."""

        # Update window
        if metric_name not in self.data_windows:
            self.data_windows[metric_name] = deque(maxlen=self.window_size)

        window = self.data_windows[metric_name]
        window.append(value)

        # Statistical anomaly detection (fast)
        if len(window) >= 10:
            mean = np.mean(window)
            std = np.std(window)
            z_score = (value - mean) / (std + 0.0001)

            if abs(z_score) > 3:  # Statistical anomaly
                # AI-powered analysis
                analysis = await self._analyze_anomaly(
                    metric_name, value, list(window), metadata
                )
                return {
                    "anomaly": True,
                    "metric": metric_name,
                    "value": value,
                    "z_score": z_score,
                    "analysis": analysis
                }

        return {"anomaly": False, "metric": metric_name, "value": value}

    async def _analyze_anomaly(
        self,
        metric_name: str,
        value: float,
        history: list,
        metadata: dict
    ) -> dict:
        """AI analysis of detected anomaly."""

        response = await self.ai_client.chat.complete_async(
            deployment="gpt-4o-mini",
            messages=[{
                "role": "user",
                "content": f"""Analyze this metric anomaly:

                Metric: {metric_name}
                Current Value: {value}
                Recent History: {history[-20:]}
                Metadata: {json.dumps(metadata)}

                Determine:
                1. Is this a real anomaly or noise?
                2. Possible causes
                3. Recommended action
                4. Severity (critical/high/medium/low)

                Return as JSON."""
            }],
            max_tokens=200
        )

        return json.loads(response.choices[0].message.content)

Streaming RAG for Live Context

class StreamingRAG:
    def __init__(self, ai_client, vector_store, cache):
        self.ai_client = ai_client
        self.vector_store = vector_store
        self.cache = cache  # Redis or similar

    async def process_query_stream(
        self,
        query: str,
        context_stream: AsyncIterator[dict]
    ) -> AsyncIterator[str]:
        """Process query with streaming context and response."""

        # Retrieve static context
        static_context = await self.vector_store.search(query, top_k=3)

        # Start response generation
        messages = [{
            "role": "system",
            "content": f"""Answer based on context. Context:
            {self._format_context(static_context)}"""
        }, {
            "role": "user",
            "content": query
        }]

        # Stream response
        async with self.ai_client.chat.stream_async(
            deployment="gpt-4o",
            messages=messages
        ) as stream:
            async for chunk in stream:
                if chunk.choices[0].delta.content:
                    yield chunk.choices[0].delta.content

                # Optionally incorporate streaming context
                async for context_update in context_stream:
                    # Inject new context mid-stream if needed
                    pass

    async def realtime_qa(self, query: str, live_data: dict) -> AsyncIterator[str]:
        """Q&A with real-time data context."""

        # Check cache first
        cache_key = self._cache_key(query, live_data)
        cached = await self.cache.get(cache_key)
        if cached:
            yield cached
            return

        # Generate with live context
        response_parts = []

        async for part in self._generate_response(query, live_data):
            response_parts.append(part)
            yield part

        # Cache the full response
        full_response = "".join(response_parts)
        await self.cache.set(cache_key, full_response, ttl=60)  # 1 minute cache

Real-Time Dashboard AI

from fastapi import FastAPI, WebSocket
import asyncio

app = FastAPI()

class DashboardAI:
    def __init__(self, ai_client, data_service):
        self.ai_client = ai_client
        self.data_service = data_service

    async def generate_insight_stream(
        self,
        dashboard_id: str
    ) -> AsyncIterator[dict]:
        """Generate real-time insights for a dashboard."""

        while True:
            # Get current dashboard data
            data = await self.data_service.get_dashboard_data(dashboard_id)

            # Generate insight
            insight = await self._generate_insight(data)

            yield {
                "type": "insight",
                "content": insight,
                "timestamp": datetime.utcnow().isoformat()
            }

            await asyncio.sleep(30)  # Generate new insight every 30 seconds

    async def _generate_insight(self, data: dict) -> str:
        """Generate a single insight from dashboard data."""

        response = await self.ai_client.chat.complete_async(
            deployment="gpt-4o-mini",
            messages=[{
                "role": "user",
                "content": f"""Based on this dashboard data, provide ONE actionable insight.
                Be specific and concise (1-2 sentences).

                Data: {json.dumps(data)}"""
            }],
            max_tokens=100
        )

        return response.choices[0].message.content

@app.websocket("/ws/dashboard/{dashboard_id}")
async def dashboard_websocket(websocket: WebSocket, dashboard_id: str):
    """WebSocket endpoint for real-time dashboard insights."""

    await websocket.accept()

    dashboard_ai = DashboardAI(ai_client, data_service)

    try:
        async for insight in dashboard_ai.generate_insight_stream(dashboard_id):
            await websocket.send_json(insight)
    except Exception as e:
        await websocket.close(code=1000)

Optimizing for Latency

class LatencyOptimizedAI:
    def __init__(self, ai_client):
        self.ai_client = ai_client
        self.model_router = ModelRouter()

    async def infer(self, request: dict, max_latency_ms: int = 500) -> dict:
        """Route to appropriate model based on latency requirements."""

        # Select model based on latency budget
        model = self.model_router.select_model(
            task_complexity=request.get("complexity", "medium"),
            max_latency_ms=max_latency_ms
        )

        # Set timeout
        timeout = max_latency_ms / 1000

        try:
            response = await asyncio.wait_for(
                self._call_model(model, request),
                timeout=timeout
            )
            return response
        except asyncio.TimeoutError:
            # Fallback to faster model
            fallback_model = self.model_router.get_fallback(model)
            return await self._call_model(fallback_model, request)

class ModelRouter:
    """Route requests to appropriate models based on requirements."""

    MODELS = {
        "fast": {"name": "gpt-4o-mini", "avg_latency_ms": 200},
        "balanced": {"name": "gpt-4o", "avg_latency_ms": 500},
        "powerful": {"name": "gpt-4o", "avg_latency_ms": 1000, "reasoning": True}
    }

    def select_model(self, task_complexity: str, max_latency_ms: int) -> str:
        if max_latency_ms < 300 or task_complexity == "simple":
            return "fast"
        elif max_latency_ms < 700 or task_complexity == "medium":
            return "balanced"
        else:
            return "powerful"

    def get_fallback(self, model: str) -> str:
        fallbacks = {"powerful": "balanced", "balanced": "fast", "fast": "fast"}
        return fallbacks.get(model, "fast")

Best Practices

  1. Use appropriate models: Smaller models for real-time, larger for batch
  2. Implement timeouts: Always have fallback for slow responses
  3. Cache aggressively: Cache frequent queries and context
  4. Pre-compute features: Prepare data before inference
  5. Monitor latency: Track P99 latency and optimize bottlenecks
  6. Scale horizontally: Use multiple model endpoints for throughput

Real-time AI requires careful architecture to balance latency, cost, and quality. Start with simple use cases and optimize based on actual performance data.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.