6 min read
Real-Time AI: Streaming Inference for Live Data Applications
Real-time AI processes data as it arrives, enabling immediate insights and actions. For data applications, this means AI-powered alerts, live dashboards, and automated responses. Let’s explore streaming inference patterns.
Real-Time AI Architecture
Event Source Processing AI Inference Action
│ │ │ │
▼ ▼ ▼ ▼
┌────────┐ ┌───────────┐ ┌───────────┐ ┌──────────┐
│ Event │────────►│ Stream │──────────►│ AI Model │───────►│ Response │
│ Hubs │ │ Processing│ │ Endpoint │ │ Action │
└────────┘ └───────────┘ └───────────┘ └──────────┘
│ │
▼ ▼
┌───────────┐ ┌───────────┐
│ Feature │ │ Context │
│ Store │ │ (RAG) │
└───────────┘ └───────────┘
Streaming with Azure Functions
import azure.functions as func
from azure.ai.foundry import AIFoundryClient
from azure.identity import DefaultAzureCredential
import json
app = func.FunctionApp()
# Initialize AI client outside handler for connection reuse
ai_client = AIFoundryClient(
project="realtime-ai",
credential=DefaultAzureCredential()
)
@app.event_hub_message_trigger(
arg_name="events",
event_hub_name="transactions",
connection="EventHubConnection"
)
async def process_transactions(events: func.EventHubEvent):
"""Process transactions in real-time with AI."""
for event in events:
transaction = json.loads(event.get_body().decode())
# Fast classification using smaller model
classification = await classify_transaction(transaction)
if classification["risk_score"] > 0.8:
# High risk - detailed analysis
analysis = await analyze_high_risk(transaction)
await send_alert(analysis)
# Store result
await store_result(transaction["id"], classification)
async def classify_transaction(transaction: dict) -> dict:
"""Quick classification for risk scoring."""
response = await ai_client.chat.complete_async(
deployment="gpt-4o-mini", # Fast model for real-time
messages=[{
"role": "system",
"content": "Classify transaction risk. Return JSON: {risk_score: 0-1, reason: string}"
}, {
"role": "user",
"content": json.dumps(transaction)
}],
max_tokens=100
)
return json.loads(response.choices[0].message.content)
async def analyze_high_risk(transaction: dict) -> dict:
"""Detailed analysis for high-risk transactions."""
# Get customer history for context
history = await get_customer_history(transaction["customer_id"])
response = await ai_client.chat.complete_async(
deployment="gpt-4o", # Full model for detailed analysis
messages=[{
"role": "system",
"content": """Analyze this potentially fraudulent transaction.
Consider customer history and transaction patterns.
Provide detailed risk assessment and recommended actions."""
}, {
"role": "user",
"content": f"""
Transaction: {json.dumps(transaction)}
Customer History: {json.dumps(history)}
"""
}]
)
return {
"transaction_id": transaction["id"],
"analysis": response.choices[0].message.content,
"timestamp": datetime.utcnow().isoformat()
}
Real-Time Anomaly Detection
from collections import deque
import numpy as np
class StreamingAnomalyDetector:
def __init__(self, ai_client, window_size: int = 100):
self.ai_client = ai_client
self.window_size = window_size
self.data_windows = {} # Per-metric windows
async def process_metric(self, metric_name: str, value: float, metadata: dict) -> dict:
"""Process incoming metric and detect anomalies."""
# Update window
if metric_name not in self.data_windows:
self.data_windows[metric_name] = deque(maxlen=self.window_size)
window = self.data_windows[metric_name]
window.append(value)
# Statistical anomaly detection (fast)
if len(window) >= 10:
mean = np.mean(window)
std = np.std(window)
z_score = (value - mean) / (std + 0.0001)
if abs(z_score) > 3: # Statistical anomaly
# AI-powered analysis
analysis = await self._analyze_anomaly(
metric_name, value, list(window), metadata
)
return {
"anomaly": True,
"metric": metric_name,
"value": value,
"z_score": z_score,
"analysis": analysis
}
return {"anomaly": False, "metric": metric_name, "value": value}
async def _analyze_anomaly(
self,
metric_name: str,
value: float,
history: list,
metadata: dict
) -> dict:
"""AI analysis of detected anomaly."""
response = await self.ai_client.chat.complete_async(
deployment="gpt-4o-mini",
messages=[{
"role": "user",
"content": f"""Analyze this metric anomaly:
Metric: {metric_name}
Current Value: {value}
Recent History: {history[-20:]}
Metadata: {json.dumps(metadata)}
Determine:
1. Is this a real anomaly or noise?
2. Possible causes
3. Recommended action
4. Severity (critical/high/medium/low)
Return as JSON."""
}],
max_tokens=200
)
return json.loads(response.choices[0].message.content)
Streaming RAG for Live Context
class StreamingRAG:
def __init__(self, ai_client, vector_store, cache):
self.ai_client = ai_client
self.vector_store = vector_store
self.cache = cache # Redis or similar
async def process_query_stream(
self,
query: str,
context_stream: AsyncIterator[dict]
) -> AsyncIterator[str]:
"""Process query with streaming context and response."""
# Retrieve static context
static_context = await self.vector_store.search(query, top_k=3)
# Start response generation
messages = [{
"role": "system",
"content": f"""Answer based on context. Context:
{self._format_context(static_context)}"""
}, {
"role": "user",
"content": query
}]
# Stream response
async with self.ai_client.chat.stream_async(
deployment="gpt-4o",
messages=messages
) as stream:
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
# Optionally incorporate streaming context
async for context_update in context_stream:
# Inject new context mid-stream if needed
pass
async def realtime_qa(self, query: str, live_data: dict) -> AsyncIterator[str]:
"""Q&A with real-time data context."""
# Check cache first
cache_key = self._cache_key(query, live_data)
cached = await self.cache.get(cache_key)
if cached:
yield cached
return
# Generate with live context
response_parts = []
async for part in self._generate_response(query, live_data):
response_parts.append(part)
yield part
# Cache the full response
full_response = "".join(response_parts)
await self.cache.set(cache_key, full_response, ttl=60) # 1 minute cache
Real-Time Dashboard AI
from fastapi import FastAPI, WebSocket
import asyncio
app = FastAPI()
class DashboardAI:
def __init__(self, ai_client, data_service):
self.ai_client = ai_client
self.data_service = data_service
async def generate_insight_stream(
self,
dashboard_id: str
) -> AsyncIterator[dict]:
"""Generate real-time insights for a dashboard."""
while True:
# Get current dashboard data
data = await self.data_service.get_dashboard_data(dashboard_id)
# Generate insight
insight = await self._generate_insight(data)
yield {
"type": "insight",
"content": insight,
"timestamp": datetime.utcnow().isoformat()
}
await asyncio.sleep(30) # Generate new insight every 30 seconds
async def _generate_insight(self, data: dict) -> str:
"""Generate a single insight from dashboard data."""
response = await self.ai_client.chat.complete_async(
deployment="gpt-4o-mini",
messages=[{
"role": "user",
"content": f"""Based on this dashboard data, provide ONE actionable insight.
Be specific and concise (1-2 sentences).
Data: {json.dumps(data)}"""
}],
max_tokens=100
)
return response.choices[0].message.content
@app.websocket("/ws/dashboard/{dashboard_id}")
async def dashboard_websocket(websocket: WebSocket, dashboard_id: str):
"""WebSocket endpoint for real-time dashboard insights."""
await websocket.accept()
dashboard_ai = DashboardAI(ai_client, data_service)
try:
async for insight in dashboard_ai.generate_insight_stream(dashboard_id):
await websocket.send_json(insight)
except Exception as e:
await websocket.close(code=1000)
Optimizing for Latency
class LatencyOptimizedAI:
def __init__(self, ai_client):
self.ai_client = ai_client
self.model_router = ModelRouter()
async def infer(self, request: dict, max_latency_ms: int = 500) -> dict:
"""Route to appropriate model based on latency requirements."""
# Select model based on latency budget
model = self.model_router.select_model(
task_complexity=request.get("complexity", "medium"),
max_latency_ms=max_latency_ms
)
# Set timeout
timeout = max_latency_ms / 1000
try:
response = await asyncio.wait_for(
self._call_model(model, request),
timeout=timeout
)
return response
except asyncio.TimeoutError:
# Fallback to faster model
fallback_model = self.model_router.get_fallback(model)
return await self._call_model(fallback_model, request)
class ModelRouter:
"""Route requests to appropriate models based on requirements."""
MODELS = {
"fast": {"name": "gpt-4o-mini", "avg_latency_ms": 200},
"balanced": {"name": "gpt-4o", "avg_latency_ms": 500},
"powerful": {"name": "gpt-4o", "avg_latency_ms": 1000, "reasoning": True}
}
def select_model(self, task_complexity: str, max_latency_ms: int) -> str:
if max_latency_ms < 300 or task_complexity == "simple":
return "fast"
elif max_latency_ms < 700 or task_complexity == "medium":
return "balanced"
else:
return "powerful"
def get_fallback(self, model: str) -> str:
fallbacks = {"powerful": "balanced", "balanced": "fast", "fast": "fast"}
return fallbacks.get(model, "fast")
Best Practices
- Use appropriate models: Smaller models for real-time, larger for batch
- Implement timeouts: Always have fallback for slow responses
- Cache aggressively: Cache frequent queries and context
- Pre-compute features: Prepare data before inference
- Monitor latency: Track P99 latency and optimize bottlenecks
- Scale horizontally: Use multiple model endpoints for throughput
Real-time AI requires careful architecture to balance latency, cost, and quality. Start with simple use cases and optimize based on actual performance data.