Back to Blog
6 min read

Azure OpenAI Best Practices: Production-Ready AI Applications

Azure OpenAI provides enterprise-grade access to OpenAI models. Building production applications requires understanding deployment options, security considerations, and operational best practices. Let’s dive in.

Deployment Architecture

Multi-Region Setup

from openai import AzureOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential

class AzureOpenAIRouter:
    """Route requests across multiple Azure OpenAI deployments."""

    def __init__(self, deployments: list[dict]):
        self.clients = []
        for deployment in deployments:
            client = AzureOpenAI(
                api_key=deployment["api_key"],
                api_version="2024-06-01",
                azure_endpoint=deployment["endpoint"]
            )
            self.clients.append({
                "client": client,
                "deployment": deployment["deployment_name"],
                "region": deployment["region"],
                "priority": deployment.get("priority", 1)
            })
        # Sort by priority
        self.clients.sort(key=lambda x: x["priority"])

    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10))
    async def complete(self, messages: list, **kwargs) -> dict:
        """Route completion request with automatic failover."""

        last_error = None
        for client_info in self.clients:
            try:
                response = await client_info["client"].chat.completions.create(
                    model=client_info["deployment"],
                    messages=messages,
                    **kwargs
                )
                return {
                    "response": response,
                    "region": client_info["region"]
                }
            except Exception as e:
                last_error = e
                print(f"Failed on {client_info['region']}: {e}")
                continue

        raise last_error

# Configure multi-region
router = AzureOpenAIRouter([
    {
        "endpoint": "https://aoai-eastus.openai.azure.com/",
        "api_key": os.environ["AZURE_OPENAI_KEY_EASTUS"],
        "deployment_name": "gpt-4o",
        "region": "eastus",
        "priority": 1
    },
    {
        "endpoint": "https://aoai-westus.openai.azure.com/",
        "api_key": os.environ["AZURE_OPENAI_KEY_WESTUS"],
        "deployment_name": "gpt-4o",
        "region": "westus",
        "priority": 2
    }
])

Managed Identity Authentication

from azure.identity import DefaultAzureCredential, get_bearer_token_provider

# Use managed identity instead of API keys
credential = DefaultAzureCredential()
token_provider = get_bearer_token_provider(
    credential,
    "https://cognitiveservices.azure.com/.default"
)

client = AzureOpenAI(
    api_version="2024-06-01",
    azure_endpoint="https://my-aoai.openai.azure.com/",
    azure_ad_token_provider=token_provider
)

Rate Limiting and Throttling

from asyncio import Semaphore
from datetime import datetime, timedelta
import asyncio

class RateLimiter:
    """Manage Azure OpenAI rate limits."""

    def __init__(self, tpm_limit: int, rpm_limit: int):
        self.tpm_limit = tpm_limit  # Tokens per minute
        self.rpm_limit = rpm_limit  # Requests per minute
        self.request_semaphore = Semaphore(rpm_limit)
        self.token_count = 0
        self.request_count = 0
        self.window_start = datetime.utcnow()

    async def acquire(self, estimated_tokens: int):
        """Acquire permission to make a request."""
        async with self.request_semaphore:
            # Check if we need to reset the window
            now = datetime.utcnow()
            if (now - self.window_start) > timedelta(minutes=1):
                self.token_count = 0
                self.request_count = 0
                self.window_start = now

            # Check token limit
            if self.token_count + estimated_tokens > self.tpm_limit:
                wait_time = 60 - (now - self.window_start).seconds
                print(f"Token limit reached, waiting {wait_time}s")
                await asyncio.sleep(wait_time)
                self.token_count = 0
                self.request_count = 0
                self.window_start = datetime.utcnow()

            # Check request limit
            if self.request_count >= self.rpm_limit:
                wait_time = 60 - (now - self.window_start).seconds
                print(f"Request limit reached, waiting {wait_time}s")
                await asyncio.sleep(wait_time)
                self.token_count = 0
                self.request_count = 0
                self.window_start = datetime.utcnow()

            self.request_count += 1

    def record_usage(self, tokens_used: int):
        """Record actual token usage."""
        self.token_count += tokens_used

# Usage
rate_limiter = RateLimiter(tpm_limit=80000, rpm_limit=480)

async def make_request(messages, **kwargs):
    estimated_tokens = estimate_tokens(messages)
    await rate_limiter.acquire(estimated_tokens)

    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        **kwargs
    )

    rate_limiter.record_usage(response.usage.total_tokens)
    return response

Content Filtering and Safety

class SafetyWrapper:
    """Wrapper for Azure OpenAI with safety controls."""

    def __init__(self, client: AzureOpenAI):
        self.client = client

    async def safe_complete(self, messages: list, **kwargs) -> dict:
        """Make request with safety handling."""

        try:
            response = await self.client.chat.completions.create(
                model="gpt-4o",
                messages=messages,
                **kwargs
            )

            # Check content filter results
            if hasattr(response, 'choices') and response.choices:
                choice = response.choices[0]
                if hasattr(choice, 'content_filter_results'):
                    self._handle_content_filter(choice.content_filter_results)

            return {"success": True, "response": response}

        except Exception as e:
            # Handle content filter blocks
            if "content_filter" in str(e).lower():
                return {
                    "success": False,
                    "error": "content_filtered",
                    "message": "Response blocked by content filter"
                }
            raise

    def _handle_content_filter(self, filter_results):
        """Log content filter results."""
        categories = ["hate", "self_harm", "sexual", "violence"]
        for category in categories:
            if hasattr(filter_results, category):
                result = getattr(filter_results, category)
                if result.filtered:
                    print(f"Content filtered: {category} - {result.severity}")

Caching for Cost Optimization

import hashlib
import json
from azure.cosmos import CosmosClient

class ResponseCache:
    """Cache Azure OpenAI responses for cost savings."""

    def __init__(self, cosmos_client: CosmosClient, database: str, container: str):
        self.container = cosmos_client.get_database_client(database).get_container_client(container)
        self.ttl_hours = 24

    def _cache_key(self, messages: list, model: str, temperature: float) -> str:
        """Generate cache key from request parameters."""
        content = json.dumps({
            "messages": messages,
            "model": model,
            "temperature": temperature
        }, sort_keys=True)
        return hashlib.sha256(content.encode()).hexdigest()

    async def get(self, messages: list, model: str, temperature: float = 0) -> dict | None:
        """Get cached response if available."""
        # Only cache deterministic requests
        if temperature > 0:
            return None

        key = self._cache_key(messages, model, temperature)
        try:
            item = self.container.read_item(item=key, partition_key=key)
            return item.get("response")
        except:
            return None

    async def set(self, messages: list, model: str, temperature: float, response: dict):
        """Cache a response."""
        if temperature > 0:
            return

        key = self._cache_key(messages, model, temperature)
        self.container.upsert_item({
            "id": key,
            "partitionKey": key,
            "response": response,
            "ttl": self.ttl_hours * 3600
        })

# Usage
cache = ResponseCache(cosmos_client, "ai_cache", "responses")

async def cached_complete(messages, model="gpt-4o", temperature=0, **kwargs):
    # Check cache
    cached = await cache.get(messages, model, temperature)
    if cached:
        return cached

    # Make request
    response = await client.chat.completions.create(
        model=model,
        messages=messages,
        temperature=temperature,
        **kwargs
    )

    # Cache response
    await cache.set(messages, model, temperature, response.model_dump())

    return response

Monitoring and Observability

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter
import time

# Setup tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
exporter = AzureMonitorTraceExporter(connection_string=os.environ["APPINSIGHTS_CONNECTION_STRING"])
trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(exporter))

class MonitoredClient:
    """Azure OpenAI client with observability."""

    def __init__(self, client: AzureOpenAI):
        self.client = client

    async def complete(self, messages: list, **kwargs) -> dict:
        with tracer.start_as_current_span("azure_openai_completion") as span:
            start_time = time.time()

            span.set_attribute("model", kwargs.get("model", "gpt-4o"))
            span.set_attribute("message_count", len(messages))

            try:
                response = await self.client.chat.completions.create(
                    messages=messages,
                    **kwargs
                )

                # Record metrics
                duration = time.time() - start_time
                span.set_attribute("duration_ms", duration * 1000)
                span.set_attribute("prompt_tokens", response.usage.prompt_tokens)
                span.set_attribute("completion_tokens", response.usage.completion_tokens)
                span.set_attribute("total_tokens", response.usage.total_tokens)

                return response

            except Exception as e:
                span.set_attribute("error", str(e))
                span.record_exception(e)
                raise

Cost Management

class CostTracker:
    """Track Azure OpenAI costs."""

    # Pricing per 1K tokens (example - check current pricing)
    PRICING = {
        "gpt-4o": {"input": 0.005, "output": 0.015},
        "gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
        "text-embedding-3-large": {"input": 0.00013, "output": 0}
    }

    def __init__(self):
        self.usage = {}

    def record(self, model: str, prompt_tokens: int, completion_tokens: int):
        """Record usage for cost tracking."""
        if model not in self.usage:
            self.usage[model] = {"prompt_tokens": 0, "completion_tokens": 0}

        self.usage[model]["prompt_tokens"] += prompt_tokens
        self.usage[model]["completion_tokens"] += completion_tokens

    def get_cost(self) -> dict:
        """Calculate costs from usage."""
        costs = {}
        total = 0

        for model, tokens in self.usage.items():
            if model in self.PRICING:
                pricing = self.PRICING[model]
                input_cost = (tokens["prompt_tokens"] / 1000) * pricing["input"]
                output_cost = (tokens["completion_tokens"] / 1000) * pricing["output"]
                model_cost = input_cost + output_cost
                costs[model] = {
                    "input_cost": input_cost,
                    "output_cost": output_cost,
                    "total": model_cost
                }
                total += model_cost

        costs["total"] = total
        return costs

# Usage
cost_tracker = CostTracker()

response = await client.chat.completions.create(...)
cost_tracker.record(
    model="gpt-4o",
    prompt_tokens=response.usage.prompt_tokens,
    completion_tokens=response.usage.completion_tokens
)

print(f"Current costs: ${cost_tracker.get_cost()['total']:.4f}")

Best Practices Summary

  1. Multi-region deployment: Ensure availability and handle regional outages
  2. Managed identity: Avoid API keys in production
  3. Rate limiting: Implement client-side limits to avoid throttling
  4. Caching: Cache deterministic requests to reduce costs
  5. Monitoring: Track latency, tokens, and errors
  6. Content filtering: Handle filtered responses gracefully
  7. Cost tracking: Monitor and budget for AI costs

Azure OpenAI provides enterprise capabilities, but you need to build robust infrastructure around it for production use.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.