Skip to content
Back to Blog
1 min read

Model Deployment Patterns for Azure OpenAI

I wrote “Model Deployment Patterns for Azure OpenAI” to share practical, production-minded guidance on this topic.

Pattern 1: Gateway Pattern

Centralize LLM access through an API gateway:

from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import HTTPBearer
from pydantic import BaseModel
import asyncio

app = FastAPI()
security = HTTPBearer()

class ChatRequest(BaseModel):
    messages: list[dict]
    model: str = "gpt-35-turbo"
    temperature: float = 0.7
    max_tokens: int = 1000

class LLMGateway:
    """Centralized LLM gateway with routing and management."""

    def __init__(self):
        self.endpoints = {}
        self.rate_limiters = {}
        self.circuit_breakers = {}

    def register_endpoint(
        self,
        name: str,
        endpoint: str,
        key: str,
        priority: int = 1
    ):
        """Register an LLM endpoint."""
        self.endpoints[name] = {
            "endpoint": endpoint,
            "key": key,
            "priority": priority,
            "healthy": True
        }

    async def route_request(
        self,
        request: ChatRequest,
        user_id: str
    ) -> dict:
        """Route request to appropriate endpoint."""

        # Check rate limit
        if not await self._check_rate_limit(user_id):
            raise HTTPException(429, "Rate limit exceeded")

        # Select endpoint
        endpoint = self._select_endpoint(request.model)
        if not endpoint:
            raise HTTPException(503, "No healthy endpoints available")

        # Check circuit breaker
        if not self._check_circuit(endpoint["name"]):
            raise HTTPException(503, f"Endpoint {endpoint['name']} is unavailable")

        # Make request
        try:
            response = await self._call_endpoint(endpoint, request)
            self._record_success(endpoint["name"])
            return response
        except Exception as e:
            self._record_failure(endpoint["name"])
            # Try failover
            return await self._failover(request, endpoint["name"])

    def _select_endpoint(self, model: str) -> dict:
        """Select best available endpoint."""
        available = [
            e for e in self.endpoints.values()
            if e["healthy"]
        ]

        if not available:
            return None

        # Sort by priority
        available.sort(key=lambda x: x["priority"])
        return available[0]

    async def _failover(
        self,
        request: ChatRequest,
        failed_endpoint: str
    ) -> dict:
        """Attempt failover to another endpoint."""
        for name, endpoint in self.endpoints.items():
            if name != failed_endpoint and endpoint["healthy"]:
                try:
                    return await self._call_endpoint(endpoint, request)
                except:
                    continue

        raise HTTPException(503, "All endpoints failed")

gateway = LLMGateway()

@app.post("/api/chat")
async def chat(
    request: ChatRequest,
    token: str = Depends(security)
):
    user_id = validate_token(token)
    return await gateway.route_request(request, user_id)

Pattern 2: Blue-Green Deployment

Deploy new versions alongside existing:

class BlueGreenDeployer:
    """Blue-green deployment for LLM configurations."""

    def __init__(self, gateway: LLMGateway):
        self.gateway = gateway
        self.active_slot = "blue"
        self.slots = {
            "blue": {"config": None, "healthy": False},
            "green": {"config": None, "healthy": False}
        }

    async def deploy_to_inactive(self, config: dict) -> str:
        """Deploy to inactive slot."""
        inactive = "green" if self.active_slot == "blue" else "blue"

        # Configure inactive slot
        self.slots[inactive]["config"] = config
        await self._configure_slot(inactive, config)

        # Health check
        healthy = await self._health_check(inactive)
        self.slots[inactive]["healthy"] = healthy

        if not healthy:
            raise Exception(f"Deployment to {inactive} failed health check")

        return inactive

    async def switch_traffic(self) -> str:
        """Switch traffic to inactive slot."""
        inactive = "green" if self.active_slot == "blue" else "blue"

        if not self.slots[inactive]["healthy"]:
            raise Exception(f"Cannot switch to unhealthy {inactive} slot")

        # Update routing
        await self.gateway.update_routing(inactive)
        self.active_slot = inactive

        return self.active_slot

    async def rollback(self) -> str:
        """Rollback to previous slot."""
        previous = "green" if self.active_slot == "blue" else "blue"

        if not self.slots[previous]["healthy"]:
            raise Exception("Previous slot not healthy, cannot rollback")

        await self.gateway.update_routing(previous)
        self.active_slot = previous

        return self.active_slot

    async def _health_check(self, slot: str) -> bool:
        """Health check a slot."""
        config = self.slots[slot]["config"]

        try:
            # Send test request
            response = await self._test_request(config)
            return response.status == "success"
        except:
            return False

Pattern 3: Canary Releases

Gradual traffic migration:

import random
from dataclasses import dataclass

@dataclass
class CanaryConfig:
    stable_version: str
    canary_version: str
    canary_percent: float
    metrics_window_minutes: int = 10
    error_threshold: float = 0.05

class CanaryDeployer:
    """Canary deployment with automatic rollback."""

    def __init__(self, gateway, metrics_client):
        self.gateway = gateway
        self.metrics = metrics_client
        self.active_canary: CanaryConfig = None

    async def start_canary(
        self,
        stable: str,
        canary: str,
        initial_percent: float = 5
    ) -> CanaryConfig:
        """Start canary deployment."""
        config = CanaryConfig(
            stable_version=stable,
            canary_version=canary,
            canary_percent=initial_percent
        )

        self.active_canary = config
        await self._configure_routing(config)

        # Start monitoring
        asyncio.create_task(self._monitor_canary())

        return config

    async def increase_canary(self, new_percent: float):
        """Increase canary traffic."""
        if not self.active_canary:
            raise Exception("No active canary")

        self.active_canary.canary_percent = new_percent
        await self._configure_routing(self.active_canary)

    async def promote_canary(self):
        """Promote canary to stable."""
        if not self.active_canary:
            raise Exception("No active canary")

        self.active_canary.canary_percent = 100
        await self._configure_routing(self.active_canary)
        self.active_canary = None

    async def rollback_canary(self):
        """Rollback canary deployment."""
        if not self.active_canary:
            return

        self.active_canary.canary_percent = 0
        await self._configure_routing(self.active_canary)
        self.active_canary = None

    async def _monitor_canary(self):
        """Monitor canary health and auto-rollback if needed."""
        while self.active_canary:
            await asyncio.sleep(60)  # Check every minute

            # Get error rates
            canary_errors = await self.metrics.get_error_rate(
                self.active_canary.canary_version,
                self.active_canary.metrics_window_minutes
            )

            stable_errors = await self.metrics.get_error_rate(
                self.active_canary.stable_version,
                self.active_canary.metrics_window_minutes
            )

            # Check if canary is significantly worse
            if canary_errors > stable_errors + self.active_canary.error_threshold:
                print(f"Canary error rate {canary_errors:.2%} exceeds threshold, rolling back")
                await self.rollback_canary()
                break

    def route_request(self) -> str:
        """Route request to appropriate version."""
        if not self.active_canary:
            return "stable"

        if random.random() * 100 < self.active_canary.canary_percent:
            return self.active_canary.canary_version
        return self.active_canary.stable_version

Pattern 4: A/B Testing

Compare model versions:

import hashlib
from dataclasses import dataclass
from typing import Optional

@dataclass
class ABTest:
    name: str
    variant_a: dict  # Model/prompt config
    variant_b: dict
    traffic_split: float = 0.5  # Percent to variant B
    metrics: list[str] = None  # Metrics to track

class ABTestManager:
    """Manage A/B tests for LLM configurations."""

    def __init__(self, metrics_client):
        self.metrics = metrics_client
        self.active_tests: dict[str, ABTest] = {}

    def create_test(self, test: ABTest) -> str:
        """Create a new A/B test."""
        test_id = f"ab_{test.name}_{datetime.now().strftime('%Y%m%d')}"
        self.active_tests[test_id] = test
        return test_id

    def assign_variant(
        self,
        test_id: str,
        user_id: str
    ) -> str:
        """Deterministically assign user to variant."""
        test = self.active_tests.get(test_id)
        if not test:
            return "control"

        # Hash user_id for consistent assignment
        hash_val = int(hashlib.md5(f"{test_id}:{user_id}".encode()).hexdigest(), 16)
        bucket = hash_val % 100

        if bucket < test.traffic_split * 100:
            return "B"
        return "A"

    async def record_outcome(
        self,
        test_id: str,
        user_id: str,
        variant: str,
        metrics: dict
    ):
        """Record test outcome."""
        await self.metrics.record(
            test_id,
            {
                "user_id": user_id,
                "variant": variant,
                **metrics
            }
        )

    async def get_results(self, test_id: str) -> dict:
        """Get test results with statistical analysis."""
        test = self.active_tests.get(test_id)
        if not test:
            return None

        # Get metrics for each variant
        variant_a_metrics = await self.metrics.query(test_id, filter={"variant": "A"})
        variant_b_metrics = await self.metrics.query(test_id, filter={"variant": "B"})

        # Calculate statistics
        results = {
            "test_id": test_id,
            "variant_a": self._calculate_stats(variant_a_metrics),
            "variant_b": self._calculate_stats(variant_b_metrics),
            "statistical_significance": self._calculate_significance(
                variant_a_metrics,
                variant_b_metrics
            )
        }

        return results

    def _calculate_stats(self, metrics: list) -> dict:
        """Calculate statistics for a variant."""
        if not metrics:
            return {}

        # Example metrics
        return {
            "sample_size": len(metrics),
            "mean_latency": sum(m.get("latency", 0) for m in metrics) / len(metrics),
            "satisfaction_rate": sum(1 for m in metrics if m.get("satisfied")) / len(metrics)
        }

Pattern 5: Shadow Deployment

Test new versions without affecting production:

class ShadowDeployer:
    """Shadow deployment for safe testing."""

    def __init__(self, production_client, shadow_client):
        self.production = production_client
        self.shadow = shadow_client
        self.comparison_results = []

    async def process_with_shadow(
        self,
        request: ChatRequest
    ) -> dict:
        """Process request in production and shadow."""

        # Production request (blocking)
        prod_start = time.time()
        prod_response = await self.production.chat_completion(
            messages=request.messages,
            model=request.model
        )
        prod_latency = time.time() - prod_start

        # Shadow request (non-blocking, fire and forget)
        asyncio.create_task(
            self._shadow_request(request, prod_response, prod_latency)
        )

        return prod_response

    async def _shadow_request(
        self,
        request: ChatRequest,
        prod_response: dict,
        prod_latency: float
    ):
        """Make shadow request and compare."""
        try:
            shadow_start = time.time()
            shadow_response = await self.shadow.chat_completion(
                messages=request.messages,
                model=request.model
            )
            shadow_latency = time.time() - shadow_start

            # Compare responses
            comparison = await self._compare_responses(
                prod_response,
                shadow_response,
                prod_latency,
                shadow_latency
            )

            self.comparison_results.append(comparison)

        except Exception as e:
            # Shadow failures don't affect production
            logging.warning(f"Shadow request failed: {e}")

    async def _compare_responses(
        self,
        prod: dict,
        shadow: dict,
        prod_latency: float,
        shadow_latency: float
    ) -> dict:
        """Compare production and shadow responses."""
        return {
            "timestamp": datetime.utcnow().isoformat(),
            "prod_latency": prod_latency,
            "shadow_latency": shadow_latency,
            "latency_diff": shadow_latency - prod_latency,
            "response_match": prod.get("content") == shadow.get("content"),
            "prod_tokens": prod.get("usage", {}).get("total_tokens"),
            "shadow_tokens": shadow.get("usage", {}).get("total_tokens")
        }

    def get_comparison_report(self) -> dict:
        """Get shadow comparison report."""
        if not self.comparison_results:
            return {"message": "No comparisons yet"}

        return {
            "total_comparisons": len(self.comparison_results),
            "match_rate": sum(1 for c in self.comparison_results if c["response_match"]) / len(self.comparison_results),
            "avg_latency_diff": sum(c["latency_diff"] for c in self.comparison_results) / len(self.comparison_results),
            "shadow_faster_rate": sum(1 for c in self.comparison_results if c["latency_diff"] < 0) / len(self.comparison_results)
        }

These deployment patterns enable safe, controlled rollouts of LLM application changes. Start with gateway and blue-green patterns, then add canary and A/B testing as your operations mature.\n\n## Takeaways\n\nAdd a concise, personal takeaway and recommended next steps here.\n

Michael John Pena

Michael John Pena

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.