Back to Blog
7 min read

Scaling AI Systems: From Prototype to Enterprise

Scaling AI systems presents unique challenges. Let’s explore patterns and strategies for taking AI from proof-of-concept to enterprise scale.

The Scaling Journey

Prototype (1-10 users)
    ↓ Single deployment, no caching
Pilot (10-100 users)
    ↓ Add caching, basic monitoring
Production (100-1K users)
    ↓ Multiple deployments, load balancing
Scale (1K-10K users)
    ↓ Global distribution, advanced caching
Enterprise (10K+ users)
    ↓ Full platform, optimization at every layer

Pattern 1: Request Queuing and Prioritization

import asyncio
from dataclasses import dataclass
from enum import Enum
from heapq import heappush, heappop

class Priority(Enum):
    CRITICAL = 1  # Customer-facing, real-time
    HIGH = 2      # Important but can wait seconds
    NORMAL = 3    # Standard requests
    LOW = 4       # Background, batch
    BULK = 5      # Lowest priority, batch jobs

@dataclass
class AIRequest:
    id: str
    priority: Priority
    payload: dict
    created_at: float
    deadline: float = None

    def __lt__(self, other):
        # Priority first, then FIFO
        if self.priority.value != other.priority.value:
            return self.priority.value < other.priority.value
        return self.created_at < other.created_at

class PriorityRequestQueue:
    """Priority-based request queue with deadline awareness."""

    def __init__(self, max_concurrent: int = 100):
        self.queue = []
        self.max_concurrent = max_concurrent
        self.active_count = 0
        self.semaphore = asyncio.Semaphore(max_concurrent)

    async def enqueue(self, request: AIRequest):
        # Check if deadline already passed
        if request.deadline and time.time() > request.deadline:
            return {"status": "expired", "request_id": request.id}

        heappush(self.queue, request)
        return {"status": "queued", "position": len(self.queue)}

    async def process_next(self):
        async with self.semaphore:
            if not self.queue:
                return None

            request = heappop(self.queue)

            # Skip expired requests
            if request.deadline and time.time() > request.deadline:
                return {"status": "expired", "request_id": request.id}

            result = await self.execute(request)
            return result

    async def run_processor(self, num_workers: int = 10):
        """Run multiple workers processing the queue."""
        workers = [
            asyncio.create_task(self.worker(i))
            for i in range(num_workers)
        ]
        await asyncio.gather(*workers)

    async def worker(self, worker_id: int):
        while True:
            result = await self.process_next()
            if result:
                await self.publish_result(result)
            else:
                await asyncio.sleep(0.1)

Pattern 2: Multi-Region Deployment

class GlobalAIRouter:
    """Route requests to optimal region."""

    def __init__(self):
        self.regions = {
            "eastus": {
                "endpoint": "https://ai-eastus.openai.azure.com",
                "capacity_tpm": 80000,
                "current_load": 0.3
            },
            "westus": {
                "endpoint": "https://ai-westus.openai.azure.com",
                "capacity_tpm": 80000,
                "current_load": 0.5
            },
            "westeurope": {
                "endpoint": "https://ai-westeurope.openai.azure.com",
                "capacity_tpm": 60000,
                "current_load": 0.4
            },
            "eastasia": {
                "endpoint": "https://ai-eastasia.openai.azure.com",
                "capacity_tpm": 40000,
                "current_load": 0.2
            }
        }

    async def route_request(self, request, user_region: str) -> str:
        # Strategy 1: Prefer user's region if capacity available
        if user_region in self.regions:
            region = self.regions[user_region]
            if region["current_load"] < 0.8:
                return user_region

        # Strategy 2: Find lowest load region
        available = [
            (name, r) for name, r in self.regions.items()
            if r["current_load"] < 0.9
        ]

        if not available:
            raise CapacityException("All regions at capacity")

        # Sort by load, prefer closer regions
        available.sort(key=lambda x: (
            x[1]["current_load"],
            self.region_distance(user_region, x[0])
        ))

        return available[0][0]

    def region_distance(self, from_region: str, to_region: str) -> int:
        # Simplified distance scoring
        distances = {
            ("eastus", "westus"): 1,
            ("eastus", "westeurope"): 2,
            ("eastus", "eastasia"): 3,
            # ... more combinations
        }
        key = tuple(sorted([from_region, to_region]))
        return distances.get(key, 2)

Pattern 3: Intelligent Caching

import hashlib
import numpy as np
from typing import Optional

class SemanticCache:
    """Cache with semantic similarity matching."""

    def __init__(self, similarity_threshold: float = 0.95):
        self.cache = {}  # hash -> response
        self.embeddings = {}  # hash -> embedding
        self.threshold = similarity_threshold
        self.embedding_model = EmbeddingModel()

    async def get(self, query: str) -> Optional[dict]:
        # Exact match first
        query_hash = self.hash_query(query)
        if query_hash in self.cache:
            return {"response": self.cache[query_hash], "cache_type": "exact"}

        # Semantic match
        query_embedding = await self.embedding_model.embed(query)

        best_match = None
        best_similarity = 0

        for cached_hash, cached_embedding in self.embeddings.items():
            similarity = self.cosine_similarity(query_embedding, cached_embedding)
            if similarity > best_similarity:
                best_similarity = similarity
                best_match = cached_hash

        if best_match and best_similarity >= self.threshold:
            return {
                "response": self.cache[best_match],
                "cache_type": "semantic",
                "similarity": best_similarity
            }

        return None

    async def set(self, query: str, response: str, ttl: int = 3600):
        query_hash = self.hash_query(query)
        query_embedding = await self.embedding_model.embed(query)

        self.cache[query_hash] = response
        self.embeddings[query_hash] = query_embedding

        # Set expiration
        asyncio.create_task(self.expire_after(query_hash, ttl))

    def hash_query(self, query: str) -> str:
        return hashlib.md5(query.encode()).hexdigest()

    def cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

# Cache hit rates we achieved:
# - Exact match: 15-20%
# - Semantic match: Additional 25-35%
# - Total: 40-55% cache hit rate

Pattern 4: Request Batching

class RequestBatcher:
    """Batch multiple requests for efficiency."""

    def __init__(self, max_batch_size: int = 32, max_wait_ms: int = 100):
        self.max_batch_size = max_batch_size
        self.max_wait_ms = max_wait_ms
        self.pending = []
        self.lock = asyncio.Lock()

    async def add_request(self, request: dict) -> asyncio.Future:
        """Add request to batch, return future for result."""
        future = asyncio.Future()

        async with self.lock:
            self.pending.append({"request": request, "future": future})

            if len(self.pending) >= self.max_batch_size:
                # Batch is full, process immediately
                await self.process_batch()
            elif len(self.pending) == 1:
                # First request, schedule batch processing
                asyncio.create_task(self.schedule_batch())

        return future

    async def schedule_batch(self):
        """Wait for more requests or timeout."""
        await asyncio.sleep(self.max_wait_ms / 1000)
        async with self.lock:
            if self.pending:
                await self.process_batch()

    async def process_batch(self):
        """Process all pending requests as a batch."""
        if not self.pending:
            return

        batch = self.pending[:]
        self.pending = []

        # Execute batch request
        requests = [item["request"] for item in batch]
        results = await self.execute_batch(requests)

        # Resolve futures
        for item, result in zip(batch, results):
            item["future"].set_result(result)

    async def execute_batch(self, requests: list) -> list:
        """Execute batch of requests."""
        # For embeddings - native batch support
        if requests[0]["type"] == "embedding":
            response = await self.client.embeddings.create(
                model="text-embedding-3-large",
                input=[r["text"] for r in requests]
            )
            return [e.embedding for e in response.data]

        # For completions - parallel execution
        tasks = [self.execute_single(r) for r in requests]
        return await asyncio.gather(*tasks)

Pattern 5: Model Routing

class ModelRouter:
    """Route to appropriate model based on request."""

    MODELS = {
        "simple": {
            "name": "gpt-4o-mini",
            "cost_per_1k": 0.00015,
            "latency_ms": 200,
            "quality": 0.85
        },
        "standard": {
            "name": "gpt-4o",
            "cost_per_1k": 0.005,
            "latency_ms": 500,
            "quality": 0.95
        },
        "reasoning": {
            "name": "o1-preview",
            "cost_per_1k": 0.015,
            "latency_ms": 2000,
            "quality": 0.98
        }
    }

    async def select_model(self, request: dict) -> str:
        """Select best model for request."""

        # Rule-based selection
        if request.get("require_reasoning"):
            return "reasoning"

        if request.get("max_latency_ms", 1000) < 300:
            return "simple"

        # Complexity-based selection
        complexity = await self.estimate_complexity(request)

        if complexity < 0.3:
            return "simple"
        elif complexity < 0.7:
            return "standard"
        else:
            return "reasoning"

    async def estimate_complexity(self, request: dict) -> float:
        """Estimate request complexity 0-1."""
        factors = []

        # Length factor
        text_length = len(request.get("content", ""))
        factors.append(min(text_length / 5000, 1.0))

        # Keyword factor
        complex_keywords = ["analyze", "compare", "design", "optimize", "debug"]
        keyword_count = sum(1 for kw in complex_keywords if kw in request.get("content", "").lower())
        factors.append(min(keyword_count / 3, 1.0))

        # Context factor
        if request.get("context_length", 0) > 10000:
            factors.append(0.8)
        else:
            factors.append(0.2)

        return sum(factors) / len(factors)

Pattern 6: Horizontal Scaling

# Kubernetes deployment for AI service
"""
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ai-service
spec:
  replicas: 5
  selector:
    matchLabels:
      app: ai-service
  template:
    metadata:
      labels:
        app: ai-service
    spec:
      containers:
      - name: ai-service
        image: ai-service:latest
        resources:
          requests:
            memory: "2Gi"
            cpu: "1000m"
          limits:
            memory: "4Gi"
            cpu: "2000m"
        env:
        - name: AZURE_OPENAI_ENDPOINT
          valueFrom:
            secretKeyRef:
              name: ai-secrets
              key: endpoint
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: ai-service-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: ai-service
  minReplicas: 3
  maxReplicas: 20
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: External
    external:
      metric:
        name: queue_length
      target:
        type: AverageValue
        averageValue: 100
"""

Scaling Metrics to Track

scaling_metrics = {
    "capacity": [
        "requests_per_second",
        "tokens_per_minute",
        "concurrent_requests",
        "queue_depth"
    ],
    "performance": [
        "latency_p50",
        "latency_p95",
        "latency_p99",
        "cache_hit_rate"
    ],
    "cost": [
        "cost_per_request",
        "cost_per_user",
        "total_daily_cost"
    ],
    "reliability": [
        "error_rate",
        "availability",
        "circuit_breaker_status"
    ]
}

Scaling AI requires thinking about capacity, latency, cost, and reliability together. No single optimization solves everything.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.