Skip to content
Back to Blog
1 min read

Rate Limit Handling: Maximizing Throughput Within Constraints

I wrote “Rate Limit Handling: Maximizing Throughput Within Constraints” to share practical, production-minded guidance on this topic.

Understanding Rate Limits

from dataclasses import dataclass
from typing import Dict, Optional
import time
from collections import deque
import threading

@dataclass
class RateLimitConfig:
    """Rate limit configuration"""
    requests_per_minute: int
    tokens_per_minute: int
    requests_per_day: Optional[int] = None

# OpenAI rate limits by tier (approximate)
RATE_LIMITS = {
    "tier_1": RateLimitConfig(
        requests_per_minute=500,
        tokens_per_minute=30_000,
        requests_per_day=10_000
    ),
    "tier_2": RateLimitConfig(
        requests_per_minute=5_000,
        tokens_per_minute=450_000,
        requests_per_day=None
    ),
    "tier_3": RateLimitConfig(
        requests_per_minute=5_000,
        tokens_per_minute=800_000,
        requests_per_day=None
    ),
    "o1_preview": RateLimitConfig(
        requests_per_minute=20,
        tokens_per_minute=30_000,
        requests_per_day=100
    )
}

class RateLimitTracker:
    """Track and enforce rate limits"""

    def __init__(self, config: RateLimitConfig):
        self.config = config
        self.request_times: deque = deque()
        self.token_counts: deque = deque()
        self.daily_requests = 0
        self.last_reset = time.time()
        self._lock = threading.Lock()

    def can_make_request(self, estimated_tokens: int = 1000) -> tuple[bool, float]:
        """Check if request can be made, return (can_proceed, wait_time)"""

        with self._lock:
            now = time.time()
            minute_ago = now - 60

            # Clean old entries
            while self.request_times and self.request_times[0] < minute_ago:
                self.request_times.popleft()
            while self.token_counts and self.token_counts[0][0] < minute_ago:
                self.token_counts.popleft()

            # Check request rate
            if len(self.request_times) >= self.config.requests_per_minute:
                wait_time = self.request_times[0] - minute_ago
                return False, wait_time

            # Check token rate
            current_tokens = sum(tc[1] for tc in self.token_counts)
            if current_tokens + estimated_tokens > self.config.tokens_per_minute:
                wait_time = self.token_counts[0][0] - minute_ago
                return False, wait_time

            # Check daily limit
            if self.config.requests_per_day:
                self._check_daily_reset()
                if self.daily_requests >= self.config.requests_per_day:
                    return False, self._time_until_daily_reset()

            return True, 0

    def record_request(self, tokens_used: int):
        """Record a completed request"""
        with self._lock:
            now = time.time()
            self.request_times.append(now)
            self.token_counts.append((now, tokens_used))
            self.daily_requests += 1

    def _check_daily_reset(self):
        """Reset daily counter if needed"""
        now = time.time()
        if now - self.last_reset > 86400:  # 24 hours
            self.daily_requests = 0
            self.last_reset = now

    def _time_until_daily_reset(self) -> float:
        """Calculate time until daily reset"""
        return 86400 - (time.time() - self.last_reset)

    def get_usage_stats(self) -> dict:
        """Get current usage statistics"""
        with self._lock:
            now = time.time()
            minute_ago = now - 60

            current_requests = len([t for t in self.request_times if t > minute_ago])
            current_tokens = sum(tc[1] for tc in self.token_counts if tc[0] > minute_ago)

            return {
                "rpm_used": current_requests,
                "rpm_limit": self.config.requests_per_minute,
                "rpm_utilization": current_requests / self.config.requests_per_minute,
                "tpm_used": current_tokens,
                "tpm_limit": self.config.tokens_per_minute,
                "tpm_utilization": current_tokens / self.config.tokens_per_minute,
                "daily_used": self.daily_requests,
                "daily_limit": self.config.requests_per_day
            }

Token Bucket Algorithm

class TokenBucket:
    """Token bucket rate limiter"""

    def __init__(self, capacity: int, refill_rate: float):
        """
        capacity: Maximum tokens in bucket
        refill_rate: Tokens added per second
        """
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.tokens = capacity
        self.last_refill = time.time()
        self._lock = threading.Lock()

    def acquire(self, tokens: int = 1, blocking: bool = True) -> bool:
        """
        Acquire tokens from bucket
        Returns True if acquired, False if not (when non-blocking)
        """
        while True:
            with self._lock:
                self._refill()

                if self.tokens >= tokens:
                    self.tokens -= tokens
                    return True

                if not blocking:
                    return False

                # Calculate wait time
                tokens_needed = tokens - self.tokens
                wait_time = tokens_needed / self.refill_rate

            time.sleep(min(wait_time, 1.0))  # Wait in small increments

    def _refill(self):
        """Refill tokens based on elapsed time"""
        now = time.time()
        elapsed = now - self.last_refill
        new_tokens = elapsed * self.refill_rate
        self.tokens = min(self.capacity, self.tokens + new_tokens)
        self.last_refill = now

# Dual token bucket for requests and tokens
class DualTokenBucket:
    """Separate buckets for request rate and token rate"""

    def __init__(self, rpm: int, tpm: int):
        # Requests per second = rpm / 60
        self.request_bucket = TokenBucket(rpm, rpm / 60)
        # Tokens per second = tpm / 60
        self.token_bucket = TokenBucket(tpm, tpm / 60)

    def acquire(self, estimated_tokens: int = 1000) -> bool:
        """Acquire from both buckets"""
        # First check if we can get request slot
        if not self.request_bucket.acquire(1, blocking=True):
            return False

        # Then acquire token budget
        return self.token_bucket.acquire(estimated_tokens, blocking=True)

# Usage
rate_limiter = DualTokenBucket(rpm=500, tpm=30_000)

def rate_limited_call(prompt: str) -> str:
    # Estimate tokens
    estimated_tokens = len(prompt) // 4 + 1000  # rough estimate

    # Wait for rate limit
    rate_limiter.acquire(estimated_tokens)

    # Make the call
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}]
    )

    return response.choices[0].message.content

Adaptive Rate Limiting

class AdaptiveRateLimiter:
    """Adjust rate limiting based on actual API responses"""

    def __init__(self, initial_rpm: int, initial_tpm: int):
        self.rpm = initial_rpm
        self.tpm = initial_tpm
        self.rate_limit_hits = 0
        self.successful_requests = 0

    def adjust_from_headers(self, headers: dict):
        """Adjust limits based on API response headers"""

        # OpenAI includes rate limit info in headers
        if 'x-ratelimit-limit-requests' in headers:
            self.rpm = int(headers['x-ratelimit-limit-requests'])

        if 'x-ratelimit-limit-tokens' in headers:
            self.tpm = int(headers['x-ratelimit-limit-tokens'])

        # Track remaining
        remaining_requests = headers.get('x-ratelimit-remaining-requests')
        remaining_tokens = headers.get('x-ratelimit-remaining-tokens')

        return {
            "rpm_limit": self.rpm,
            "tpm_limit": self.tpm,
            "remaining_requests": remaining_requests,
            "remaining_tokens": remaining_tokens
        }

    def record_rate_limit(self, retry_after: float):
        """Record when we hit a rate limit"""
        self.rate_limit_hits += 1

        # Reduce our assumed limits
        if self.rate_limit_hits > 3:
            self.rpm = int(self.rpm * 0.8)
            self.tpm = int(self.tpm * 0.8)
            self.rate_limit_hits = 0
            logger.warning(f"Reducing rate limits to RPM={self.rpm}, TPM={self.tpm}")

    def record_success(self):
        """Record successful request"""
        self.successful_requests += 1

        # Gradually increase if we've been successful
        if self.successful_requests > 100:
            self.rpm = int(self.rpm * 1.1)
            self.tpm = int(self.tpm * 1.1)
            self.successful_requests = 0
            logger.info(f"Increasing rate limits to RPM={self.rpm}, TPM={self.tpm}")

Request Queue with Rate Limiting

import asyncio
from typing import List, Callable
from dataclasses import dataclass, field
from queue import PriorityQueue
import heapq

@dataclass(order=True)
class PrioritizedRequest:
    """Request with priority for queue ordering"""
    priority: int
    request_id: str = field(compare=False)
    func: Callable = field(compare=False)
    args: tuple = field(compare=False)
    kwargs: dict = field(compare=False)

class RateLimitedQueue:
    """Queue that processes requests within rate limits"""

    def __init__(self, rpm: int, tpm: int):
        self.rate_limiter = DualTokenBucket(rpm, tpm)
        self.queue: List = []
        self.results: Dict[str, Any] = {}
        self.running = False
        self._lock = threading.Lock()

    def submit(self, func: Callable, priority: int = 5,
               estimated_tokens: int = 1000, *args, **kwargs) -> str:
        """Submit request to queue"""
        import uuid
        request_id = str(uuid.uuid4())

        request = PrioritizedRequest(
            priority=priority,
            request_id=request_id,
            func=func,
            args=args,
            kwargs={**kwargs, "_estimated_tokens": estimated_tokens}
        )

        with self._lock:
            heapq.heappush(self.queue, request)

        return request_id

    def get_result(self, request_id: str, timeout: float = 60) -> Any:
        """Get result for a submitted request"""
        start = time.time()
        while time.time() - start < timeout:
            if request_id in self.results:
                return self.results.pop(request_id)
            time.sleep(0.1)
        raise TimeoutError(f"Request {request_id} timed out")

    def start_processing(self):
        """Start background processing"""
        self.running = True
        threading.Thread(target=self._process_loop, daemon=True).start()

    def _process_loop(self):
        """Process requests from queue"""
        while self.running:
            with self._lock:
                if not self.queue:
                    time.sleep(0.1)
                    continue

                request = heapq.heappop(self.queue)

            estimated_tokens = request.kwargs.pop("_estimated_tokens", 1000)

            # Wait for rate limit
            self.rate_limiter.acquire(estimated_tokens)

            # Execute
            try:
                result = request.func(*request.args, **request.kwargs)
                self.results[request.request_id] = {"success": True, "result": result}
            except Exception as e:
                self.results[request.request_id] = {"success": False, "error": str(e)}

# Usage
queue = RateLimitedQueue(rpm=500, tpm=30_000)
queue.start_processing()

# Submit multiple requests
request_ids = []
for prompt in prompts:
    req_id = queue.submit(
        lambda p=prompt: call_openai(p),
        priority=5,
        estimated_tokens=2000
    )
    request_ids.append(req_id)

# Collect results
results = [queue.get_result(rid) for rid in request_ids]

Rate Limit Error Handling

from openai import RateLimitError

def handle_rate_limit_error(error: RateLimitError) -> float:
    """Extract wait time from rate limit error"""

    # Check for retry-after header
    retry_after = error.response.headers.get('retry-after')
    if retry_after:
        return float(retry_after)

    # Check error message for hints
    message = str(error)
    if "Please retry after" in message:
        # Parse time from message
        import re
        match = re.search(r'(\d+)\s*seconds?', message)
        if match:
            return float(match.group(1))

    # Default backoff
    return 60

def call_with_rate_limit_handling(func: Callable, max_retries: int = 5) -> Any:
    """Call function with rate limit handling"""

    for attempt in range(max_retries):
        try:
            return func()

        except RateLimitError as e:
            if attempt == max_retries - 1:
                raise

            wait_time = handle_rate_limit_error(e)
            logger.warning(f"Rate limited. Waiting {wait_time}s before retry...")
            time.sleep(wait_time)

    raise RuntimeError("Max retries exceeded")

Effective rate limit handling is about working with the API, not against it. Use token buckets, queuing, and adaptive limits to maximize throughput while maintaining reliability.\n\n## Takeaways\n\nAdd a concise, personal takeaway and recommended next steps here.\n

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.