Back to Blog
7 min read

Model Deployment Patterns for Azure OpenAI

Deploying LLM applications requires patterns beyond traditional API deployment. Traffic management, failover, cost control, and quality assurance all need consideration. Here are production-proven patterns.

Pattern 1: Gateway Pattern

Centralize LLM access through an API gateway:

from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import HTTPBearer
from pydantic import BaseModel
import asyncio

app = FastAPI()
security = HTTPBearer()

class ChatRequest(BaseModel):
    messages: list[dict]
    model: str = "gpt-35-turbo"
    temperature: float = 0.7
    max_tokens: int = 1000

class LLMGateway:
    """Centralized LLM gateway with routing and management."""

    def __init__(self):
        self.endpoints = {}
        self.rate_limiters = {}
        self.circuit_breakers = {}

    def register_endpoint(
        self,
        name: str,
        endpoint: str,
        key: str,
        priority: int = 1
    ):
        """Register an LLM endpoint."""
        self.endpoints[name] = {
            "endpoint": endpoint,
            "key": key,
            "priority": priority,
            "healthy": True
        }

    async def route_request(
        self,
        request: ChatRequest,
        user_id: str
    ) -> dict:
        """Route request to appropriate endpoint."""

        # Check rate limit
        if not await self._check_rate_limit(user_id):
            raise HTTPException(429, "Rate limit exceeded")

        # Select endpoint
        endpoint = self._select_endpoint(request.model)
        if not endpoint:
            raise HTTPException(503, "No healthy endpoints available")

        # Check circuit breaker
        if not self._check_circuit(endpoint["name"]):
            raise HTTPException(503, f"Endpoint {endpoint['name']} is unavailable")

        # Make request
        try:
            response = await self._call_endpoint(endpoint, request)
            self._record_success(endpoint["name"])
            return response
        except Exception as e:
            self._record_failure(endpoint["name"])
            # Try failover
            return await self._failover(request, endpoint["name"])

    def _select_endpoint(self, model: str) -> dict:
        """Select best available endpoint."""
        available = [
            e for e in self.endpoints.values()
            if e["healthy"]
        ]

        if not available:
            return None

        # Sort by priority
        available.sort(key=lambda x: x["priority"])
        return available[0]

    async def _failover(
        self,
        request: ChatRequest,
        failed_endpoint: str
    ) -> dict:
        """Attempt failover to another endpoint."""
        for name, endpoint in self.endpoints.items():
            if name != failed_endpoint and endpoint["healthy"]:
                try:
                    return await self._call_endpoint(endpoint, request)
                except:
                    continue

        raise HTTPException(503, "All endpoints failed")

gateway = LLMGateway()

@app.post("/api/chat")
async def chat(
    request: ChatRequest,
    token: str = Depends(security)
):
    user_id = validate_token(token)
    return await gateway.route_request(request, user_id)

Pattern 2: Blue-Green Deployment

Deploy new versions alongside existing:

class BlueGreenDeployer:
    """Blue-green deployment for LLM configurations."""

    def __init__(self, gateway: LLMGateway):
        self.gateway = gateway
        self.active_slot = "blue"
        self.slots = {
            "blue": {"config": None, "healthy": False},
            "green": {"config": None, "healthy": False}
        }

    async def deploy_to_inactive(self, config: dict) -> str:
        """Deploy to inactive slot."""
        inactive = "green" if self.active_slot == "blue" else "blue"

        # Configure inactive slot
        self.slots[inactive]["config"] = config
        await self._configure_slot(inactive, config)

        # Health check
        healthy = await self._health_check(inactive)
        self.slots[inactive]["healthy"] = healthy

        if not healthy:
            raise Exception(f"Deployment to {inactive} failed health check")

        return inactive

    async def switch_traffic(self) -> str:
        """Switch traffic to inactive slot."""
        inactive = "green" if self.active_slot == "blue" else "blue"

        if not self.slots[inactive]["healthy"]:
            raise Exception(f"Cannot switch to unhealthy {inactive} slot")

        # Update routing
        await self.gateway.update_routing(inactive)
        self.active_slot = inactive

        return self.active_slot

    async def rollback(self) -> str:
        """Rollback to previous slot."""
        previous = "green" if self.active_slot == "blue" else "blue"

        if not self.slots[previous]["healthy"]:
            raise Exception("Previous slot not healthy, cannot rollback")

        await self.gateway.update_routing(previous)
        self.active_slot = previous

        return self.active_slot

    async def _health_check(self, slot: str) -> bool:
        """Health check a slot."""
        config = self.slots[slot]["config"]

        try:
            # Send test request
            response = await self._test_request(config)
            return response.status == "success"
        except:
            return False

Pattern 3: Canary Releases

Gradual traffic migration:

import random
from dataclasses import dataclass

@dataclass
class CanaryConfig:
    stable_version: str
    canary_version: str
    canary_percent: float
    metrics_window_minutes: int = 10
    error_threshold: float = 0.05

class CanaryDeployer:
    """Canary deployment with automatic rollback."""

    def __init__(self, gateway, metrics_client):
        self.gateway = gateway
        self.metrics = metrics_client
        self.active_canary: CanaryConfig = None

    async def start_canary(
        self,
        stable: str,
        canary: str,
        initial_percent: float = 5
    ) -> CanaryConfig:
        """Start canary deployment."""
        config = CanaryConfig(
            stable_version=stable,
            canary_version=canary,
            canary_percent=initial_percent
        )

        self.active_canary = config
        await self._configure_routing(config)

        # Start monitoring
        asyncio.create_task(self._monitor_canary())

        return config

    async def increase_canary(self, new_percent: float):
        """Increase canary traffic."""
        if not self.active_canary:
            raise Exception("No active canary")

        self.active_canary.canary_percent = new_percent
        await self._configure_routing(self.active_canary)

    async def promote_canary(self):
        """Promote canary to stable."""
        if not self.active_canary:
            raise Exception("No active canary")

        self.active_canary.canary_percent = 100
        await self._configure_routing(self.active_canary)
        self.active_canary = None

    async def rollback_canary(self):
        """Rollback canary deployment."""
        if not self.active_canary:
            return

        self.active_canary.canary_percent = 0
        await self._configure_routing(self.active_canary)
        self.active_canary = None

    async def _monitor_canary(self):
        """Monitor canary health and auto-rollback if needed."""
        while self.active_canary:
            await asyncio.sleep(60)  # Check every minute

            # Get error rates
            canary_errors = await self.metrics.get_error_rate(
                self.active_canary.canary_version,
                self.active_canary.metrics_window_minutes
            )

            stable_errors = await self.metrics.get_error_rate(
                self.active_canary.stable_version,
                self.active_canary.metrics_window_minutes
            )

            # Check if canary is significantly worse
            if canary_errors > stable_errors + self.active_canary.error_threshold:
                print(f"Canary error rate {canary_errors:.2%} exceeds threshold, rolling back")
                await self.rollback_canary()
                break

    def route_request(self) -> str:
        """Route request to appropriate version."""
        if not self.active_canary:
            return "stable"

        if random.random() * 100 < self.active_canary.canary_percent:
            return self.active_canary.canary_version
        return self.active_canary.stable_version

Pattern 4: A/B Testing

Compare model versions:

import hashlib
from dataclasses import dataclass
from typing import Optional

@dataclass
class ABTest:
    name: str
    variant_a: dict  # Model/prompt config
    variant_b: dict
    traffic_split: float = 0.5  # Percent to variant B
    metrics: list[str] = None  # Metrics to track

class ABTestManager:
    """Manage A/B tests for LLM configurations."""

    def __init__(self, metrics_client):
        self.metrics = metrics_client
        self.active_tests: dict[str, ABTest] = {}

    def create_test(self, test: ABTest) -> str:
        """Create a new A/B test."""
        test_id = f"ab_{test.name}_{datetime.now().strftime('%Y%m%d')}"
        self.active_tests[test_id] = test
        return test_id

    def assign_variant(
        self,
        test_id: str,
        user_id: str
    ) -> str:
        """Deterministically assign user to variant."""
        test = self.active_tests.get(test_id)
        if not test:
            return "control"

        # Hash user_id for consistent assignment
        hash_val = int(hashlib.md5(f"{test_id}:{user_id}".encode()).hexdigest(), 16)
        bucket = hash_val % 100

        if bucket < test.traffic_split * 100:
            return "B"
        return "A"

    async def record_outcome(
        self,
        test_id: str,
        user_id: str,
        variant: str,
        metrics: dict
    ):
        """Record test outcome."""
        await self.metrics.record(
            test_id,
            {
                "user_id": user_id,
                "variant": variant,
                **metrics
            }
        )

    async def get_results(self, test_id: str) -> dict:
        """Get test results with statistical analysis."""
        test = self.active_tests.get(test_id)
        if not test:
            return None

        # Get metrics for each variant
        variant_a_metrics = await self.metrics.query(test_id, filter={"variant": "A"})
        variant_b_metrics = await self.metrics.query(test_id, filter={"variant": "B"})

        # Calculate statistics
        results = {
            "test_id": test_id,
            "variant_a": self._calculate_stats(variant_a_metrics),
            "variant_b": self._calculate_stats(variant_b_metrics),
            "statistical_significance": self._calculate_significance(
                variant_a_metrics,
                variant_b_metrics
            )
        }

        return results

    def _calculate_stats(self, metrics: list) -> dict:
        """Calculate statistics for a variant."""
        if not metrics:
            return {}

        # Example metrics
        return {
            "sample_size": len(metrics),
            "mean_latency": sum(m.get("latency", 0) for m in metrics) / len(metrics),
            "satisfaction_rate": sum(1 for m in metrics if m.get("satisfied")) / len(metrics)
        }

Pattern 5: Shadow Deployment

Test new versions without affecting production:

class ShadowDeployer:
    """Shadow deployment for safe testing."""

    def __init__(self, production_client, shadow_client):
        self.production = production_client
        self.shadow = shadow_client
        self.comparison_results = []

    async def process_with_shadow(
        self,
        request: ChatRequest
    ) -> dict:
        """Process request in production and shadow."""

        # Production request (blocking)
        prod_start = time.time()
        prod_response = await self.production.chat_completion(
            messages=request.messages,
            model=request.model
        )
        prod_latency = time.time() - prod_start

        # Shadow request (non-blocking, fire and forget)
        asyncio.create_task(
            self._shadow_request(request, prod_response, prod_latency)
        )

        return prod_response

    async def _shadow_request(
        self,
        request: ChatRequest,
        prod_response: dict,
        prod_latency: float
    ):
        """Make shadow request and compare."""
        try:
            shadow_start = time.time()
            shadow_response = await self.shadow.chat_completion(
                messages=request.messages,
                model=request.model
            )
            shadow_latency = time.time() - shadow_start

            # Compare responses
            comparison = await self._compare_responses(
                prod_response,
                shadow_response,
                prod_latency,
                shadow_latency
            )

            self.comparison_results.append(comparison)

        except Exception as e:
            # Shadow failures don't affect production
            logging.warning(f"Shadow request failed: {e}")

    async def _compare_responses(
        self,
        prod: dict,
        shadow: dict,
        prod_latency: float,
        shadow_latency: float
    ) -> dict:
        """Compare production and shadow responses."""
        return {
            "timestamp": datetime.utcnow().isoformat(),
            "prod_latency": prod_latency,
            "shadow_latency": shadow_latency,
            "latency_diff": shadow_latency - prod_latency,
            "response_match": prod.get("content") == shadow.get("content"),
            "prod_tokens": prod.get("usage", {}).get("total_tokens"),
            "shadow_tokens": shadow.get("usage", {}).get("total_tokens")
        }

    def get_comparison_report(self) -> dict:
        """Get shadow comparison report."""
        if not self.comparison_results:
            return {"message": "No comparisons yet"}

        return {
            "total_comparisons": len(self.comparison_results),
            "match_rate": sum(1 for c in self.comparison_results if c["response_match"]) / len(self.comparison_results),
            "avg_latency_diff": sum(c["latency_diff"] for c in self.comparison_results) / len(self.comparison_results),
            "shadow_faster_rate": sum(1 for c in self.comparison_results if c["latency_diff"] < 0) / len(self.comparison_results)
        }

These deployment patterns enable safe, controlled rollouts of LLM application changes. Start with gateway and blue-green patterns, then add canary and A/B testing as your operations mature.

Michael John Pena

Michael John Pena

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.