3 min read
Provisioned Throughput Deep Dive: Architecture and Implementation
Deep dive into implementing and operating provisioned throughput deployments for Azure OpenAI at scale.
Deployment Architecture
from azure.ai.resources import AIProjectClient
from azure.identity import DefaultAzureCredential
def create_ptu_deployment(
resource_group: str,
account_name: str,
deployment_name: str,
model: str,
ptu_count: int
) -> dict:
"""Create a PTU deployment."""
client = AIProjectClient(
credential=DefaultAzureCredential(),
subscription_id="your-subscription"
)
deployment = {
"model": model,
"capacity": ptu_count,
"sku": {
"name": "ProvisionedManaged",
"capacity": ptu_count
}
}
# Create deployment
result = client.deployments.begin_create_or_update(
resource_group,
account_name,
deployment_name,
deployment
).result()
return result
Load Balancing Multiple PTU Deployments
class PTULoadBalancer:
def __init__(self, deployments: list[dict]):
self.deployments = deployments
self.current_index = 0
def get_deployment(self) -> dict:
"""Round-robin across deployments."""
deployment = self.deployments[self.current_index]
self.current_index = (self.current_index + 1) % len(self.deployments)
return deployment
async def call_with_failover(self, messages: list[dict]) -> str:
"""Call with automatic failover."""
for _ in range(len(self.deployments)):
deployment = self.get_deployment()
try:
response = await self._call_deployment(deployment, messages)
return response
except Exception as e:
print(f"Deployment {deployment['name']} failed: {e}")
continue
raise Exception("All deployments failed")
# Configure multiple deployments
deployments = [
{"name": "ptu-east", "endpoint": "https://east.openai.azure.com/", "capacity": 100},
{"name": "ptu-west", "endpoint": "https://west.openai.azure.com/", "capacity": 100}
]
balancer = PTULoadBalancer(deployments)
Capacity Monitoring
from azure.monitor.query import MetricsQueryClient
class PTUCapacityMonitor:
def __init__(self, resource_id: str):
self.client = MetricsQueryClient(DefaultAzureCredential())
self.resource_id = resource_id
def get_utilization(self, timespan_minutes: int = 60) -> dict:
"""Get PTU utilization metrics."""
response = self.client.query_resource(
self.resource_id,
metric_names=["TokensProcessed", "Requests"],
timespan=timedelta(minutes=timespan_minutes)
)
metrics = {}
for metric in response.metrics:
metrics[metric.name] = {
"total": sum(ts.total for ts in metric.timeseries[0].data if ts.total),
"average": sum(ts.average for ts in metric.timeseries[0].data if ts.average) / len(metric.timeseries[0].data)
}
return metrics
def check_capacity_alert(self, threshold_percent: float = 80) -> bool:
"""Check if utilization exceeds threshold."""
metrics = self.get_utilization(timespan_minutes=5)
# Calculate utilization percentage based on capacity
# Implementation depends on your capacity calculation
return False
Scaling Strategy
class PTUScaler:
def __init__(self, min_ptu: int, max_ptu: int, target_utilization: float = 0.7):
self.min_ptu = min_ptu
self.max_ptu = max_ptu
self.target_utilization = target_utilization
def calculate_optimal_capacity(self, current_ptu: int, current_utilization: float) -> int:
"""Calculate optimal PTU count."""
if current_utilization > 0.9:
# Scale up
new_ptu = min(self.max_ptu, int(current_ptu * 1.5))
elif current_utilization < 0.5:
# Scale down
new_ptu = max(self.min_ptu, int(current_ptu * 0.75))
else:
new_ptu = current_ptu
return new_ptu
Best Practices
- Multi-region deployment - Distribute for resilience
- Monitor continuously - Track utilization and latency
- Plan scaling windows - PTU changes aren’t instant
- Budget for headroom - 20-30% over baseline
- Combine with PAYG - Handle overflow gracefully
Conclusion
Provisioned throughput requires careful planning and monitoring but delivers consistent performance for production workloads.