8 min read
Rate Limiting and Throttling with Azure OpenAI Service
Rate limiting is crucial for managing costs and ensuring fair resource distribution when using Azure OpenAI Service. Today, let’s explore Azure’s rate limiting mechanisms and how to build robust applications that handle them gracefully.
Understanding Azure OpenAI Quotas
Azure OpenAI uses Tokens Per Minute (TPM) as the primary quota metric:
| Model | Default TPM | Max TPM (with increase) |
|---|---|---|
| GPT-3.5-Turbo | 120K | 300K+ |
| Text-Davinci-003 | 120K | 240K |
| Text-Embedding-Ada-002 | 120K | 350K |
from dataclasses import dataclass
from typing import Optional
import time
@dataclass
class QuotaConfig:
"""Configuration for Azure OpenAI quotas."""
tokens_per_minute: int = 120000
requests_per_minute: int = 720 # Approximate based on TPM
model: str = "gpt-35-turbo"
class QuotaTracker:
"""Track quota usage in real-time."""
def __init__(self, config: QuotaConfig):
self.config = config
self.tokens_used_this_minute = 0
self.requests_this_minute = 0
self.minute_start = time.time()
def _reset_if_new_minute(self):
"""Reset counters if a minute has passed."""
now = time.time()
if now - self.minute_start >= 60:
self.tokens_used_this_minute = 0
self.requests_this_minute = 0
self.minute_start = now
def can_make_request(self, estimated_tokens: int) -> tuple[bool, float]:
"""
Check if request can be made.
Returns (can_proceed, wait_time_if_not)
"""
self._reset_if_new_minute()
# Check token limit
if self.tokens_used_this_minute + estimated_tokens > self.config.tokens_per_minute:
wait_time = 60 - (time.time() - self.minute_start)
return False, wait_time
# Check request limit
if self.requests_this_minute >= self.config.requests_per_minute:
wait_time = 60 - (time.time() - self.minute_start)
return False, wait_time
return True, 0
def record_usage(self, tokens_used: int):
"""Record token usage from a completed request."""
self._reset_if_new_minute()
self.tokens_used_this_minute += tokens_used
self.requests_this_minute += 1
def get_remaining(self) -> dict:
"""Get remaining quota for this minute."""
self._reset_if_new_minute()
return {
"tokens_remaining": self.config.tokens_per_minute - self.tokens_used_this_minute,
"requests_remaining": self.config.requests_per_minute - self.requests_this_minute,
"seconds_until_reset": max(0, 60 - (time.time() - self.minute_start))
}
Implementing Retry Logic
Handle rate limit errors with exponential backoff:
import openai
import time
import random
from typing import Callable, TypeVar, Any
from functools import wraps
T = TypeVar('T')
class RateLimitError(Exception):
"""Custom rate limit error with retry info."""
def __init__(self, message: str, retry_after: Optional[float] = None):
super().__init__(message)
self.retry_after = retry_after
def with_retry(
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
) -> Callable:
"""Decorator for retry logic with exponential backoff."""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
def wrapper(*args, **kwargs) -> T:
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except openai.error.RateLimitError as e:
last_exception = e
if attempt == max_retries:
raise
# Parse retry-after header if available
retry_after = getattr(e, 'retry_after', None)
if retry_after:
delay = float(retry_after)
else:
# Calculate exponential backoff
delay = min(
base_delay * (exponential_base ** attempt),
max_delay
)
# Add jitter
if jitter:
delay = delay * (0.5 + random.random())
print(f"Rate limited. Retrying in {delay:.2f}s (attempt {attempt + 1}/{max_retries})")
time.sleep(delay)
except openai.error.ServiceUnavailableError as e:
last_exception = e
if attempt == max_retries:
raise
delay = base_delay * (exponential_base ** attempt)
if jitter:
delay = delay * (0.5 + random.random())
print(f"Service unavailable. Retrying in {delay:.2f}s")
time.sleep(delay)
raise last_exception
return wrapper
return decorator
# Usage
@with_retry(max_retries=5, base_delay=1.0)
def call_openai(prompt: str, deployment: str) -> str:
"""Make OpenAI call with automatic retry."""
response = openai.Completion.create(
engine=deployment,
prompt=prompt,
max_tokens=500
)
return response.choices[0].text
Client-Side Rate Limiting
Implement proactive rate limiting to avoid hitting Azure limits:
import asyncio
from collections import deque
from datetime import datetime, timedelta
import threading
class TokenBucket:
"""Token bucket rate limiter."""
def __init__(self, rate: float, capacity: float):
"""
Args:
rate: Tokens added per second
capacity: Maximum tokens in bucket
"""
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_update = time.time()
self.lock = threading.Lock()
def _refill(self):
"""Refill tokens based on elapsed time."""
now = time.time()
elapsed = now - self.last_update
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_update = now
def acquire(self, tokens: int = 1, blocking: bool = True) -> bool:
"""
Acquire tokens from the bucket.
Args:
tokens: Number of tokens to acquire
blocking: If True, wait for tokens; if False, return immediately
Returns:
True if tokens acquired, False if not (non-blocking only)
"""
with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
if not blocking:
return False
# Calculate wait time
needed = tokens - self.tokens
wait_time = needed / self.rate
# Wait outside the lock
time.sleep(wait_time)
# Try again
with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def get_wait_time(self, tokens: int) -> float:
"""Get estimated wait time for acquiring tokens."""
with self.lock:
self._refill()
if self.tokens >= tokens:
return 0
needed = tokens - self.tokens
return needed / self.rate
class RateLimitedOpenAIClient:
"""OpenAI client with client-side rate limiting."""
def __init__(
self,
deployment: str,
tokens_per_minute: int = 120000,
requests_per_minute: int = 720
):
self.deployment = deployment
# Token bucket for TPM (tokens per second = TPM / 60)
self.token_bucket = TokenBucket(
rate=tokens_per_minute / 60,
capacity=tokens_per_minute / 60 * 10 # 10 second burst
)
# Token bucket for RPM
self.request_bucket = TokenBucket(
rate=requests_per_minute / 60,
capacity=requests_per_minute / 60 * 10
)
def estimate_tokens(self, prompt: str, max_tokens: int) -> int:
"""Estimate total tokens for a request."""
# Rough estimation: 1 token ≈ 4 characters
prompt_tokens = len(prompt) // 4
return prompt_tokens + max_tokens
@with_retry(max_retries=3)
def complete(self, prompt: str, max_tokens: int = 500, **kwargs) -> dict:
"""Make rate-limited completion request."""
estimated_tokens = self.estimate_tokens(prompt, max_tokens)
# Acquire from both buckets
self.request_bucket.acquire(1)
self.token_bucket.acquire(estimated_tokens)
response = openai.Completion.create(
engine=self.deployment,
prompt=prompt,
max_tokens=max_tokens,
**kwargs
)
# Return actual usage
actual_tokens = response.usage.total_tokens
return {
"text": response.choices[0].text.strip(),
"tokens_used": actual_tokens,
"estimated_tokens": estimated_tokens
}
def get_status(self) -> dict:
"""Get current rate limiter status."""
return {
"token_bucket_available": self.token_bucket.tokens,
"request_bucket_available": self.request_bucket.tokens,
"token_wait_time": self.token_bucket.get_wait_time(1000),
"request_wait_time": self.request_bucket.get_wait_time(1)
}
Request Queuing
For high-throughput applications, implement a request queue:
import asyncio
from asyncio import Queue, PriorityQueue
from dataclasses import dataclass, field
from typing import Any, Callable, Awaitable
import uuid
@dataclass(order=True)
class PrioritizedRequest:
priority: int
request_id: str = field(compare=False)
prompt: str = field(compare=False)
callback: Callable[[str], Awaitable[None]] = field(compare=False)
max_tokens: int = field(compare=False, default=500)
class AsyncRequestQueue:
"""Async request queue with priority support."""
def __init__(
self,
client: RateLimitedOpenAIClient,
max_concurrent: int = 10,
queue_size: int = 1000
):
self.client = client
self.queue: PriorityQueue = PriorityQueue(maxsize=queue_size)
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
self.running = False
async def enqueue(
self,
prompt: str,
callback: Callable[[str], Awaitable[None]],
priority: int = 5, # 1 = highest, 10 = lowest
max_tokens: int = 500
) -> str:
"""Add request to queue."""
request_id = str(uuid.uuid4())
request = PrioritizedRequest(
priority=priority,
request_id=request_id,
prompt=prompt,
callback=callback,
max_tokens=max_tokens
)
await self.queue.put(request)
return request_id
async def _process_request(self, request: PrioritizedRequest):
"""Process a single request."""
async with self.semaphore:
try:
# Run sync client in thread pool
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
lambda: self.client.complete(
request.prompt,
max_tokens=request.max_tokens
)
)
await request.callback(result["text"])
except Exception as e:
await request.callback(f"Error: {str(e)}")
async def start(self):
"""Start processing queue."""
self.running = True
while self.running:
try:
request = await asyncio.wait_for(
self.queue.get(),
timeout=1.0
)
# Process in background
asyncio.create_task(self._process_request(request))
except asyncio.TimeoutError:
continue
def stop(self):
"""Stop processing queue."""
self.running = False
def get_queue_size(self) -> int:
"""Get current queue size."""
return self.queue.qsize()
# Usage
async def example_usage():
client = RateLimitedOpenAIClient(
deployment="gpt35",
tokens_per_minute=120000
)
queue = AsyncRequestQueue(client, max_concurrent=10)
# Start queue processor
processor_task = asyncio.create_task(queue.start())
# Define callback
async def handle_response(response: str):
print(f"Got response: {response[:100]}...")
# Enqueue requests
for i in range(100):
await queue.enqueue(
prompt=f"Question {i}: What is cloud computing?",
callback=handle_response,
priority=5 if i % 10 != 0 else 1 # Every 10th request is high priority
)
# Wait for queue to empty
while queue.get_queue_size() > 0:
await asyncio.sleep(1)
print(f"Queue size: {queue.get_queue_size()}")
queue.stop()
await processor_task
Monitoring and Alerting
Track rate limiting events for monitoring:
from prometheus_client import Counter, Gauge, Histogram
import logging
# Prometheus metrics
rate_limit_hits = Counter(
'openai_rate_limit_hits_total',
'Total rate limit hits',
['deployment']
)
request_latency = Histogram(
'openai_request_latency_seconds',
'Request latency in seconds',
['deployment']
)
queue_size = Gauge(
'openai_queue_size',
'Current request queue size',
['deployment']
)
tokens_used = Counter(
'openai_tokens_used_total',
'Total tokens used',
['deployment', 'type']
)
class MetricsCollector:
"""Collect and export rate limiting metrics."""
def __init__(self, deployment: str):
self.deployment = deployment
self.logger = logging.getLogger("openai_metrics")
def record_request(self, latency: float, token_count: int, token_type: str):
"""Record a completed request."""
request_latency.labels(deployment=self.deployment).observe(latency)
tokens_used.labels(
deployment=self.deployment,
type=token_type
).inc(token_count)
def record_rate_limit(self):
"""Record a rate limit hit."""
rate_limit_hits.labels(deployment=self.deployment).inc()
self.logger.warning(f"Rate limit hit for {self.deployment}")
def update_queue_size(self, size: int):
"""Update queue size metric."""
queue_size.labels(deployment=self.deployment).set(size)
Best Practices
- Implement client-side rate limiting: Don’t rely only on server-side limits
- Use exponential backoff: With jitter to avoid thundering herd
- Queue requests: For batch processing and priority handling
- Monitor usage: Track tokens, requests, and rate limit events
- Request quota increases: If you need higher limits
- Estimate tokens: Before requests to manage quota proactively