7 min read
Rate Limit Handling: Maximizing Throughput Within Constraints
Rate limits are a reality of working with AI APIs. Effective rate limit handling maximizes throughput while avoiding errors and maintaining good API citizenship.
Understanding Rate Limits
from dataclasses import dataclass
from typing import Dict, Optional
import time
from collections import deque
import threading
@dataclass
class RateLimitConfig:
"""Rate limit configuration"""
requests_per_minute: int
tokens_per_minute: int
requests_per_day: Optional[int] = None
# OpenAI rate limits by tier (approximate)
RATE_LIMITS = {
"tier_1": RateLimitConfig(
requests_per_minute=500,
tokens_per_minute=30_000,
requests_per_day=10_000
),
"tier_2": RateLimitConfig(
requests_per_minute=5_000,
tokens_per_minute=450_000,
requests_per_day=None
),
"tier_3": RateLimitConfig(
requests_per_minute=5_000,
tokens_per_minute=800_000,
requests_per_day=None
),
"o1_preview": RateLimitConfig(
requests_per_minute=20,
tokens_per_minute=30_000,
requests_per_day=100
)
}
class RateLimitTracker:
"""Track and enforce rate limits"""
def __init__(self, config: RateLimitConfig):
self.config = config
self.request_times: deque = deque()
self.token_counts: deque = deque()
self.daily_requests = 0
self.last_reset = time.time()
self._lock = threading.Lock()
def can_make_request(self, estimated_tokens: int = 1000) -> tuple[bool, float]:
"""Check if request can be made, return (can_proceed, wait_time)"""
with self._lock:
now = time.time()
minute_ago = now - 60
# Clean old entries
while self.request_times and self.request_times[0] < minute_ago:
self.request_times.popleft()
while self.token_counts and self.token_counts[0][0] < minute_ago:
self.token_counts.popleft()
# Check request rate
if len(self.request_times) >= self.config.requests_per_minute:
wait_time = self.request_times[0] - minute_ago
return False, wait_time
# Check token rate
current_tokens = sum(tc[1] for tc in self.token_counts)
if current_tokens + estimated_tokens > self.config.tokens_per_minute:
wait_time = self.token_counts[0][0] - minute_ago
return False, wait_time
# Check daily limit
if self.config.requests_per_day:
self._check_daily_reset()
if self.daily_requests >= self.config.requests_per_day:
return False, self._time_until_daily_reset()
return True, 0
def record_request(self, tokens_used: int):
"""Record a completed request"""
with self._lock:
now = time.time()
self.request_times.append(now)
self.token_counts.append((now, tokens_used))
self.daily_requests += 1
def _check_daily_reset(self):
"""Reset daily counter if needed"""
now = time.time()
if now - self.last_reset > 86400: # 24 hours
self.daily_requests = 0
self.last_reset = now
def _time_until_daily_reset(self) -> float:
"""Calculate time until daily reset"""
return 86400 - (time.time() - self.last_reset)
def get_usage_stats(self) -> dict:
"""Get current usage statistics"""
with self._lock:
now = time.time()
minute_ago = now - 60
current_requests = len([t for t in self.request_times if t > minute_ago])
current_tokens = sum(tc[1] for tc in self.token_counts if tc[0] > minute_ago)
return {
"rpm_used": current_requests,
"rpm_limit": self.config.requests_per_minute,
"rpm_utilization": current_requests / self.config.requests_per_minute,
"tpm_used": current_tokens,
"tpm_limit": self.config.tokens_per_minute,
"tpm_utilization": current_tokens / self.config.tokens_per_minute,
"daily_used": self.daily_requests,
"daily_limit": self.config.requests_per_day
}
Token Bucket Algorithm
class TokenBucket:
"""Token bucket rate limiter"""
def __init__(self, capacity: int, refill_rate: float):
"""
capacity: Maximum tokens in bucket
refill_rate: Tokens added per second
"""
self.capacity = capacity
self.refill_rate = refill_rate
self.tokens = capacity
self.last_refill = time.time()
self._lock = threading.Lock()
def acquire(self, tokens: int = 1, blocking: bool = True) -> bool:
"""
Acquire tokens from bucket
Returns True if acquired, False if not (when non-blocking)
"""
while True:
with self._lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
if not blocking:
return False
# Calculate wait time
tokens_needed = tokens - self.tokens
wait_time = tokens_needed / self.refill_rate
time.sleep(min(wait_time, 1.0)) # Wait in small increments
def _refill(self):
"""Refill tokens based on elapsed time"""
now = time.time()
elapsed = now - self.last_refill
new_tokens = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + new_tokens)
self.last_refill = now
# Dual token bucket for requests and tokens
class DualTokenBucket:
"""Separate buckets for request rate and token rate"""
def __init__(self, rpm: int, tpm: int):
# Requests per second = rpm / 60
self.request_bucket = TokenBucket(rpm, rpm / 60)
# Tokens per second = tpm / 60
self.token_bucket = TokenBucket(tpm, tpm / 60)
def acquire(self, estimated_tokens: int = 1000) -> bool:
"""Acquire from both buckets"""
# First check if we can get request slot
if not self.request_bucket.acquire(1, blocking=True):
return False
# Then acquire token budget
return self.token_bucket.acquire(estimated_tokens, blocking=True)
# Usage
rate_limiter = DualTokenBucket(rpm=500, tpm=30_000)
def rate_limited_call(prompt: str) -> str:
# Estimate tokens
estimated_tokens = len(prompt) // 4 + 1000 # rough estimate
# Wait for rate limit
rate_limiter.acquire(estimated_tokens)
# Make the call
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
Adaptive Rate Limiting
class AdaptiveRateLimiter:
"""Adjust rate limiting based on actual API responses"""
def __init__(self, initial_rpm: int, initial_tpm: int):
self.rpm = initial_rpm
self.tpm = initial_tpm
self.rate_limit_hits = 0
self.successful_requests = 0
def adjust_from_headers(self, headers: dict):
"""Adjust limits based on API response headers"""
# OpenAI includes rate limit info in headers
if 'x-ratelimit-limit-requests' in headers:
self.rpm = int(headers['x-ratelimit-limit-requests'])
if 'x-ratelimit-limit-tokens' in headers:
self.tpm = int(headers['x-ratelimit-limit-tokens'])
# Track remaining
remaining_requests = headers.get('x-ratelimit-remaining-requests')
remaining_tokens = headers.get('x-ratelimit-remaining-tokens')
return {
"rpm_limit": self.rpm,
"tpm_limit": self.tpm,
"remaining_requests": remaining_requests,
"remaining_tokens": remaining_tokens
}
def record_rate_limit(self, retry_after: float):
"""Record when we hit a rate limit"""
self.rate_limit_hits += 1
# Reduce our assumed limits
if self.rate_limit_hits > 3:
self.rpm = int(self.rpm * 0.8)
self.tpm = int(self.tpm * 0.8)
self.rate_limit_hits = 0
logger.warning(f"Reducing rate limits to RPM={self.rpm}, TPM={self.tpm}")
def record_success(self):
"""Record successful request"""
self.successful_requests += 1
# Gradually increase if we've been successful
if self.successful_requests > 100:
self.rpm = int(self.rpm * 1.1)
self.tpm = int(self.tpm * 1.1)
self.successful_requests = 0
logger.info(f"Increasing rate limits to RPM={self.rpm}, TPM={self.tpm}")
Request Queue with Rate Limiting
import asyncio
from typing import List, Callable
from dataclasses import dataclass, field
from queue import PriorityQueue
import heapq
@dataclass(order=True)
class PrioritizedRequest:
"""Request with priority for queue ordering"""
priority: int
request_id: str = field(compare=False)
func: Callable = field(compare=False)
args: tuple = field(compare=False)
kwargs: dict = field(compare=False)
class RateLimitedQueue:
"""Queue that processes requests within rate limits"""
def __init__(self, rpm: int, tpm: int):
self.rate_limiter = DualTokenBucket(rpm, tpm)
self.queue: List = []
self.results: Dict[str, Any] = {}
self.running = False
self._lock = threading.Lock()
def submit(self, func: Callable, priority: int = 5,
estimated_tokens: int = 1000, *args, **kwargs) -> str:
"""Submit request to queue"""
import uuid
request_id = str(uuid.uuid4())
request = PrioritizedRequest(
priority=priority,
request_id=request_id,
func=func,
args=args,
kwargs={**kwargs, "_estimated_tokens": estimated_tokens}
)
with self._lock:
heapq.heappush(self.queue, request)
return request_id
def get_result(self, request_id: str, timeout: float = 60) -> Any:
"""Get result for a submitted request"""
start = time.time()
while time.time() - start < timeout:
if request_id in self.results:
return self.results.pop(request_id)
time.sleep(0.1)
raise TimeoutError(f"Request {request_id} timed out")
def start_processing(self):
"""Start background processing"""
self.running = True
threading.Thread(target=self._process_loop, daemon=True).start()
def _process_loop(self):
"""Process requests from queue"""
while self.running:
with self._lock:
if not self.queue:
time.sleep(0.1)
continue
request = heapq.heappop(self.queue)
estimated_tokens = request.kwargs.pop("_estimated_tokens", 1000)
# Wait for rate limit
self.rate_limiter.acquire(estimated_tokens)
# Execute
try:
result = request.func(*request.args, **request.kwargs)
self.results[request.request_id] = {"success": True, "result": result}
except Exception as e:
self.results[request.request_id] = {"success": False, "error": str(e)}
# Usage
queue = RateLimitedQueue(rpm=500, tpm=30_000)
queue.start_processing()
# Submit multiple requests
request_ids = []
for prompt in prompts:
req_id = queue.submit(
lambda p=prompt: call_openai(p),
priority=5,
estimated_tokens=2000
)
request_ids.append(req_id)
# Collect results
results = [queue.get_result(rid) for rid in request_ids]
Rate Limit Error Handling
from openai import RateLimitError
def handle_rate_limit_error(error: RateLimitError) -> float:
"""Extract wait time from rate limit error"""
# Check for retry-after header
retry_after = error.response.headers.get('retry-after')
if retry_after:
return float(retry_after)
# Check error message for hints
message = str(error)
if "Please retry after" in message:
# Parse time from message
import re
match = re.search(r'(\d+)\s*seconds?', message)
if match:
return float(match.group(1))
# Default backoff
return 60
def call_with_rate_limit_handling(func: Callable, max_retries: int = 5) -> Any:
"""Call function with rate limit handling"""
for attempt in range(max_retries):
try:
return func()
except RateLimitError as e:
if attempt == max_retries - 1:
raise
wait_time = handle_rate_limit_error(e)
logger.warning(f"Rate limited. Waiting {wait_time}s before retry...")
time.sleep(wait_time)
raise RuntimeError("Max retries exceeded")
Effective rate limit handling is about working with the API, not against it. Use token buckets, queuing, and adaptive limits to maximize throughput while maintaining reliability.