Back to Blog
5 min read

Real-Time AI Patterns: Building Low-Latency Intelligent Systems

Real-time AI requires careful architecture to minimize latency while maintaining quality. Let’s explore patterns for building responsive AI systems.

Real-Time AI Architecture Patterns

Pattern 1: Stream Processing
Event → Filter → Enrich → AI Inference → Action

              [Feature Store]

Pattern 2: Request-Response with Caching
Request → Cache Check → [Hit: Return] / [Miss: AI → Cache → Return]

Pattern 3: Speculative Processing
Input → Fast Model → Return Quick Result

      Background: Slow Model → Update if Different

Pattern 4: Edge-Cloud Hybrid
Input → Edge AI (fast, simple) → [Complex: Cloud AI]

Low-Latency Inference Service

from fastapi import FastAPI, BackgroundTasks
import asyncio
from typing import Optional
import time

app = FastAPI()

class RealtimeAIService:
    def __init__(self):
        self.fast_model = load_model("fast")  # Small, quick model
        self.full_model = load_model("full")  # Large, accurate model
        self.cache = LRUCache(maxsize=10000)
        self.metrics = MetricsCollector()

    async def infer(
        self,
        input_data: dict,
        max_latency_ms: int = 100,
        background_tasks: BackgroundTasks = None
    ) -> dict:
        """Inference with latency budget."""
        start = time.time()

        # Check cache
        cache_key = self._cache_key(input_data)
        cached = self.cache.get(cache_key)
        if cached:
            self.metrics.record("cache_hit")
            return cached

        # Fast inference
        fast_result = await self._fast_inference(input_data)

        elapsed_ms = (time.time() - start) * 1000
        remaining_budget = max_latency_ms - elapsed_ms

        if remaining_budget > 50:
            # Time for enhanced inference
            try:
                full_result = await asyncio.wait_for(
                    self._full_inference(input_data),
                    timeout=remaining_budget / 1000
                )
                self.cache.set(cache_key, full_result)
                return full_result
            except asyncio.TimeoutError:
                pass

        # Return fast result, optionally enhance in background
        if background_tasks:
            background_tasks.add_task(
                self._background_enhance, input_data, cache_key
            )

        return fast_result

    async def _fast_inference(self, input_data: dict) -> dict:
        """Quick inference with small model."""
        return await self.fast_model.predict(input_data)

    async def _full_inference(self, input_data: dict) -> dict:
        """Full inference with large model."""
        return await self.full_model.predict(input_data)

    async def _background_enhance(self, input_data: dict, cache_key: str):
        """Enhance result in background for future requests."""
        result = await self._full_inference(input_data)
        self.cache.set(cache_key, result)

service = RealtimeAIService()

@app.post("/predict")
async def predict(
    input_data: dict,
    background_tasks: BackgroundTasks,
    max_latency_ms: int = 100
):
    return await service.infer(input_data, max_latency_ms, background_tasks)

Streaming AI Responses

from fastapi.responses import StreamingResponse
import json

class StreamingAI:
    def __init__(self, ai_client):
        self.ai_client = ai_client

    async def stream_response(self, prompt: str):
        """Stream AI response token by token."""

        async with self.ai_client.chat.stream_async(
            deployment="gpt-4o",
            messages=[{"role": "user", "content": prompt}]
        ) as stream:
            async for chunk in stream:
                if chunk.choices[0].delta.content:
                    yield chunk.choices[0].delta.content

    async def stream_with_tools(self, prompt: str, tools: list):
        """Stream response with tool calls."""

        events = []

        async with self.ai_client.chat.stream_async(
            deployment="gpt-4o",
            messages=[{"role": "user", "content": prompt}],
            tools=tools
        ) as stream:
            async for chunk in stream:
                delta = chunk.choices[0].delta

                if delta.tool_calls:
                    for tc in delta.tool_calls:
                        events.append({
                            "type": "tool_call",
                            "tool": tc.function.name,
                            "args": tc.function.arguments
                        })
                        yield f"data: {json.dumps(events[-1])}\n\n"

                if delta.content:
                    events.append({
                        "type": "content",
                        "content": delta.content
                    })
                    yield f"data: {json.dumps(events[-1])}\n\n"

@app.get("/stream")
async def stream_endpoint(prompt: str):
    streaming_ai = StreamingAI(ai_client)
    return StreamingResponse(
        streaming_ai.stream_response(prompt),
        media_type="text/event-stream"
    )

Event-Driven Real-Time AI

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub import EventData

class RealtimeEventProcessor:
    def __init__(self, eventhub_conn: str, ai_service: RealtimeAIService):
        self.consumer = EventHubConsumerClient.from_connection_string(
            eventhub_conn,
            consumer_group="$Default",
            eventhub_name="events"
        )
        self.ai_service = ai_service
        self.output_queue = asyncio.Queue()

    async def process_events(self):
        """Process events with AI in real-time."""

        async def on_event(partition_context, event):
            # Parse event
            data = json.loads(event.body_as_str())

            # AI inference with tight latency
            result = await self.ai_service.infer(
                data,
                max_latency_ms=50  # Very tight budget
            )

            # Handle result
            await self._handle_result(data, result)

            # Checkpoint
            await partition_context.update_checkpoint(event)

        await self.consumer.receive(
            on_event=on_event,
            starting_position="-1"
        )

    async def _handle_result(self, input_data: dict, result: dict):
        """Handle AI result - alert, store, or forward."""

        if result.get("anomaly_score", 0) > 0.8:
            await self._send_alert(input_data, result)

        await self.output_queue.put(result)

Real-Time Feature Computation

class RealtimeFeatureService:
    """Compute features for AI inference in real-time."""

    def __init__(self, feature_store):
        self.feature_store = feature_store
        self.window_aggregates = {}  # In-memory windowed aggregates

    async def get_features(
        self,
        entity_id: str,
        event: dict
    ) -> dict:
        """Get features combining historical and real-time."""

        # Historical features from store
        historical = await self.feature_store.get_features(
            entity_id,
            feature_names=["avg_transaction_30d", "max_transaction_30d"]
        )

        # Real-time window features
        window_key = f"{entity_id}_1h"
        window = self.window_aggregates.get(window_key, {
            "count": 0,
            "sum": 0,
            "max": 0
        })

        # Update window with new event
        window["count"] += 1
        window["sum"] += event.get("amount", 0)
        window["max"] = max(window["max"], event.get("amount", 0))
        self.window_aggregates[window_key] = window

        # Combine features
        return {
            **historical,
            "transactions_1h": window["count"],
            "total_amount_1h": window["sum"],
            "max_amount_1h": window["max"],
            "current_amount": event.get("amount", 0)
        }

    async def cleanup_windows(self):
        """Periodically clean up old windows."""
        while True:
            await asyncio.sleep(60)
            # Remove windows older than 1 hour
            # Implementation depends on your windowing strategy

Latency Monitoring

from prometheus_client import Histogram, Counter
import functools
import time

latency_histogram = Histogram(
    'ai_inference_latency_seconds',
    'AI inference latency',
    buckets=[.01, .025, .05, .075, .1, .25, .5, 1.0]
)

inference_counter = Counter(
    'ai_inference_total',
    'Total AI inferences',
    ['model', 'cache_hit']
)

def track_latency(func):
    """Decorator to track inference latency."""

    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        start = time.time()
        try:
            result = await func(*args, **kwargs)
            return result
        finally:
            elapsed = time.time() - start
            latency_histogram.observe(elapsed)

    return wrapper

class MonitoredAIService:
    @track_latency
    async def infer(self, input_data: dict) -> dict:
        # Check cache
        cache_hit = self.cache.get(self._key(input_data)) is not None

        inference_counter.labels(
            model="gpt-4o-mini",
            cache_hit=str(cache_hit)
        ).inc()

        # ... inference logic

Best Practices

  1. Set latency budgets: Define SLAs for each use case
  2. Use tiered models: Fast model first, upgrade if time allows
  3. Stream responses: Start delivering value immediately
  4. Cache aggressively: Similar requests benefit from caching
  5. Pre-compute features: Don’t compute features during inference
  6. Monitor percentiles: P99 latency matters more than average

Real-time AI requires discipline around latency. Design for the worst case and optimize for the common case.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.