Back to Blog
3 min read

Azure OpenAI Quotas: Understanding and Managing Limits

Understanding Azure OpenAI quotas and rate limits is essential for production applications. Today we explore how to work within these constraints.

Quota Types

quota_types = {
    "tokens_per_minute": {
        "description": "TPM - Total tokens processed per minute",
        "includes": "Input + output tokens",
        "default": "Varies by model and region"
    },
    "requests_per_minute": {
        "description": "RPM - API calls per minute",
        "default": "Varies by deployment"
    },
    "tokens_per_day": {
        "description": "Daily token limit (some regions)",
        "resets": "Daily at midnight UTC"
    }
}

# Default quotas (as of August 2023)
default_quotas = {
    "gpt-4": {"tpm": 10000, "rpm": 60},
    "gpt-4-32k": {"tpm": 30000, "rpm": 60},
    "gpt-35-turbo": {"tpm": 120000, "rpm": 720},
    "text-embedding-ada-002": {"tpm": 350000, "rpm": 720}
}

Checking Current Quotas

from azure.mgmt.cognitiveservices import CognitiveServicesManagementClient
from azure.identity import DefaultAzureCredential

def get_deployment_quotas(subscription_id, resource_group, account_name):
    credential = DefaultAzureCredential()
    client = CognitiveServicesManagementClient(credential, subscription_id)

    # Get account properties
    account = client.accounts.get(resource_group, account_name)

    # List deployments
    deployments = client.deployments.list(resource_group, account_name)

    quotas = []
    for deployment in deployments:
        quotas.append({
            "name": deployment.name,
            "model": deployment.properties.model.name,
            "capacity": deployment.sku.capacity,
            "tpm": deployment.sku.capacity * 1000  # Rough estimate
        })

    return quotas

Handling Rate Limits

import time
from openai import AzureOpenAI, RateLimitError

class RateLimitHandler:
    def __init__(self, client, max_retries=5, base_delay=1):
        self.client = client
        self.max_retries = max_retries
        self.base_delay = base_delay

    def chat_completion(self, **kwargs):
        for attempt in range(self.max_retries):
            try:
                return self.client.chat.completions.create(**kwargs)
            except RateLimitError as e:
                if attempt == self.max_retries - 1:
                    raise

                # Exponential backoff
                delay = self.base_delay * (2 ** attempt)

                # Check for retry-after header
                retry_after = getattr(e, 'retry_after', None)
                if retry_after:
                    delay = max(delay, retry_after)

                print(f"Rate limited. Retrying in {delay}s...")
                time.sleep(delay)

# Usage
client = AzureOpenAI(
    azure_endpoint="https://your-resource.openai.azure.com/",
    api_key="your-key",
    api_version="2024-02-15-preview"
)

handler = RateLimitHandler(client)
response = handler.chat_completion(
    model="gpt-4",
    messages=[{"role": "user", "content": "Hello"}]
)

Token Budgeting

import tiktoken

class TokenBudget:
    def __init__(self, tpm_limit, safety_margin=0.9):
        self.tpm_limit = int(tpm_limit * safety_margin)
        self.tokens_used = 0
        self.window_start = time.time()

    def can_send(self, estimated_tokens):
        self._maybe_reset_window()
        return (self.tokens_used + estimated_tokens) <= self.tpm_limit

    def record_usage(self, tokens):
        self.tokens_used += tokens

    def wait_time(self, estimated_tokens):
        if self.can_send(estimated_tokens):
            return 0
        return 60 - (time.time() - self.window_start)

    def _maybe_reset_window(self):
        if time.time() - self.window_start >= 60:
            self.tokens_used = 0
            self.window_start = time.time()

def estimate_tokens(text, model="gpt-4"):
    encoding = tiktoken.encoding_for_model(model)
    return len(encoding.encode(text))

# Usage
budget = TokenBudget(tpm_limit=10000)

for request in requests:
    estimated = estimate_tokens(request["prompt"]) + request["max_tokens"]

    wait = budget.wait_time(estimated)
    if wait > 0:
        time.sleep(wait)

    response = client.chat.completions.create(...)
    budget.record_usage(response.usage.total_tokens)

Request Queuing

import asyncio
from collections import deque

class RateLimitedQueue:
    def __init__(self, tpm_limit, rpm_limit):
        self.tpm_limit = tpm_limit
        self.rpm_limit = rpm_limit
        self.queue = deque()
        self.tokens_this_minute = 0
        self.requests_this_minute = 0
        self.window_start = time.time()

    async def add_request(self, request):
        future = asyncio.Future()
        self.queue.append((request, future))
        return await future

    async def process_loop(self):
        while True:
            self._maybe_reset_window()

            if self.queue:
                request, future = self.queue[0]
                estimated_tokens = estimate_tokens(request["prompt"])

                if (self.tokens_this_minute + estimated_tokens <= self.tpm_limit
                    and self.requests_this_minute < self.rpm_limit):

                    self.queue.popleft()
                    response = await self._send_request(request)

                    self.tokens_this_minute += response.usage.total_tokens
                    self.requests_this_minute += 1

                    future.set_result(response)

            await asyncio.sleep(0.01)

Requesting Quota Increases

quota_increase_tips = {
    "when_to_request": [
        "Consistent quota utilization > 80%",
        "Production workload requirements",
        "Predictable usage patterns"
    ],
    "how_to_request": [
        "Azure Portal > Azure OpenAI > Quotas",
        "Submit support ticket with justification",
        "Provide usage data and projections"
    ],
    "what_to_include": [
        "Current usage patterns",
        "Expected growth",
        "Business justification",
        "SLA requirements"
    ]
}

Tomorrow we’ll explore rate limit management strategies.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.