Back to Blog
3 min read

Provisioned Throughput Deep Dive: Architecture and Implementation

Deep dive into implementing and operating provisioned throughput deployments for Azure OpenAI at scale.

Deployment Architecture

from azure.ai.resources import AIProjectClient
from azure.identity import DefaultAzureCredential

def create_ptu_deployment(
    resource_group: str,
    account_name: str,
    deployment_name: str,
    model: str,
    ptu_count: int
) -> dict:
    """Create a PTU deployment."""

    client = AIProjectClient(
        credential=DefaultAzureCredential(),
        subscription_id="your-subscription"
    )

    deployment = {
        "model": model,
        "capacity": ptu_count,
        "sku": {
            "name": "ProvisionedManaged",
            "capacity": ptu_count
        }
    }

    # Create deployment
    result = client.deployments.begin_create_or_update(
        resource_group,
        account_name,
        deployment_name,
        deployment
    ).result()

    return result

Load Balancing Multiple PTU Deployments

class PTULoadBalancer:
    def __init__(self, deployments: list[dict]):
        self.deployments = deployments
        self.current_index = 0

    def get_deployment(self) -> dict:
        """Round-robin across deployments."""
        deployment = self.deployments[self.current_index]
        self.current_index = (self.current_index + 1) % len(self.deployments)
        return deployment

    async def call_with_failover(self, messages: list[dict]) -> str:
        """Call with automatic failover."""

        for _ in range(len(self.deployments)):
            deployment = self.get_deployment()

            try:
                response = await self._call_deployment(deployment, messages)
                return response
            except Exception as e:
                print(f"Deployment {deployment['name']} failed: {e}")
                continue

        raise Exception("All deployments failed")

# Configure multiple deployments
deployments = [
    {"name": "ptu-east", "endpoint": "https://east.openai.azure.com/", "capacity": 100},
    {"name": "ptu-west", "endpoint": "https://west.openai.azure.com/", "capacity": 100}
]

balancer = PTULoadBalancer(deployments)

Capacity Monitoring

from azure.monitor.query import MetricsQueryClient

class PTUCapacityMonitor:
    def __init__(self, resource_id: str):
        self.client = MetricsQueryClient(DefaultAzureCredential())
        self.resource_id = resource_id

    def get_utilization(self, timespan_minutes: int = 60) -> dict:
        """Get PTU utilization metrics."""

        response = self.client.query_resource(
            self.resource_id,
            metric_names=["TokensProcessed", "Requests"],
            timespan=timedelta(minutes=timespan_minutes)
        )

        metrics = {}
        for metric in response.metrics:
            metrics[metric.name] = {
                "total": sum(ts.total for ts in metric.timeseries[0].data if ts.total),
                "average": sum(ts.average for ts in metric.timeseries[0].data if ts.average) / len(metric.timeseries[0].data)
            }

        return metrics

    def check_capacity_alert(self, threshold_percent: float = 80) -> bool:
        """Check if utilization exceeds threshold."""
        metrics = self.get_utilization(timespan_minutes=5)
        # Calculate utilization percentage based on capacity
        # Implementation depends on your capacity calculation
        return False

Scaling Strategy

class PTUScaler:
    def __init__(self, min_ptu: int, max_ptu: int, target_utilization: float = 0.7):
        self.min_ptu = min_ptu
        self.max_ptu = max_ptu
        self.target_utilization = target_utilization

    def calculate_optimal_capacity(self, current_ptu: int, current_utilization: float) -> int:
        """Calculate optimal PTU count."""

        if current_utilization > 0.9:
            # Scale up
            new_ptu = min(self.max_ptu, int(current_ptu * 1.5))
        elif current_utilization < 0.5:
            # Scale down
            new_ptu = max(self.min_ptu, int(current_ptu * 0.75))
        else:
            new_ptu = current_ptu

        return new_ptu

Best Practices

  1. Multi-region deployment - Distribute for resilience
  2. Monitor continuously - Track utilization and latency
  3. Plan scaling windows - PTU changes aren’t instant
  4. Budget for headroom - 20-30% over baseline
  5. Combine with PAYG - Handle overflow gracefully

Conclusion

Provisioned throughput requires careful planning and monitoring but delivers consistent performance for production workloads.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.