Back to Blog
5 min read

GPT-4o Speed Improvements: Building Low-Latency Applications

GPT-4o is 2x faster than GPT-4 Turbo. Today I’m exploring how to leverage this speed for responsive applications.

Latency Comparison

Based on my benchmarks:

ModelFirst Token (ms)Total (500 tokens)
GPT-4 Turbo800-12008-12s
GPT-4o200-4004-6s
GPT-3.5 Turbo150-3002-4s

GPT-4o gets you GPT-4 intelligence at near GPT-3.5 speeds.

Streaming for Perceived Speed

import time

def stream_response(prompt: str) -> tuple[str, dict]:
    """Stream response and measure time-to-first-token."""
    start_time = time.time()
    first_token_time = None
    full_response = ""

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )

    for chunk in response:
        if chunk.choices[0].delta.content:
            if first_token_time is None:
                first_token_time = time.time() - start_time

            content = chunk.choices[0].delta.content
            full_response += content
            print(content, end="", flush=True)

    total_time = time.time() - start_time

    return full_response, {
        "first_token_ms": first_token_time * 1000,
        "total_ms": total_time * 1000,
        "tokens": len(full_response) // 4  # Rough estimate
    }

# Usage
response, metrics = stream_response("Explain microservices architecture")
print(f"\n\nFirst token: {metrics['first_token_ms']:.0f}ms")
print(f"Total time: {metrics['total_ms']:.0f}ms")

Parallel Processing

import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict

class ParallelProcessor:
    def __init__(self, client, max_workers: int = 10):
        self.client = client
        self.executor = ThreadPoolExecutor(max_workers=max_workers)

    def _single_request(self, prompt: str, request_id: int) -> Dict:
        start = time.time()

        response = self.client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}],
            max_tokens=500
        )

        return {
            "id": request_id,
            "response": response.choices[0].message.content,
            "latency_ms": (time.time() - start) * 1000,
            "tokens": response.usage.total_tokens
        }

    async def process_batch(self, prompts: List[str]) -> List[Dict]:
        loop = asyncio.get_event_loop()

        tasks = [
            loop.run_in_executor(
                self.executor,
                self._single_request,
                prompt,
                i
            )
            for i, prompt in enumerate(prompts)
        ]

        results = await asyncio.gather(*tasks)
        return sorted(results, key=lambda x: x["id"])

# Usage
processor = ParallelProcessor(client, max_workers=10)

prompts = [
    "Summarize: " + doc for doc in documents[:10]
]

start = time.time()
results = asyncio.run(processor.process_batch(prompts))
total_time = time.time() - start

print(f"Processed {len(prompts)} requests in {total_time:.2f}s")
print(f"Average latency: {sum(r['latency_ms'] for r in results) / len(results):.0f}ms")

Optimizing for First Token Latency

def optimize_for_speed(prompt: str, context: str = None) -> str:
    """Optimize request for fastest first token."""

    messages = []

    # Shorter system prompt = faster
    messages.append({
        "role": "system",
        "content": "Be concise."
    })

    # Put context in user message if needed
    if context:
        messages.append({
            "role": "user",
            "content": f"Context: {context[:2000]}"  # Limit context
        })
        messages.append({
            "role": "assistant",
            "content": "I understand the context."
        })

    messages.append({
        "role": "user",
        "content": prompt
    })

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        max_tokens=300,  # Limit output
        temperature=0,   # Deterministic = slightly faster
        stream=True
    )

    result = ""
    for chunk in response:
        if chunk.choices[0].delta.content:
            result += chunk.choices[0].delta.content

    return result

Regional Latency Optimization

import statistics

class RegionalOptimizer:
    REGIONS = [
        "https://eastus.api.cognitive.microsoft.com",
        "https://westus.api.cognitive.microsoft.com",
        "https://westeurope.api.cognitive.microsoft.com",
    ]

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.latencies = {region: [] for region in self.REGIONS}

    async def benchmark_regions(self, test_prompt: str = "Hello") -> Dict:
        results = {}

        for region in self.REGIONS:
            client = AzureOpenAI(
                api_version="2024-05-01-preview",
                azure_endpoint=region,
                api_key=self.api_key
            )

            latencies = []
            for _ in range(3):
                start = time.time()
                response = client.chat.completions.create(
                    model="gpt-4o",
                    messages=[{"role": "user", "content": test_prompt}],
                    max_tokens=10
                )
                latencies.append((time.time() - start) * 1000)

            results[region] = {
                "avg_ms": statistics.mean(latencies),
                "min_ms": min(latencies),
                "max_ms": max(latencies)
            }

        return results

    def get_fastest_region(self, results: Dict) -> str:
        return min(results, key=lambda r: results[r]["avg_ms"])

Client-Side Optimizations

Connection Pooling

import httpx

# Configure client with connection pooling
http_client = httpx.Client(
    limits=httpx.Limits(
        max_keepalive_connections=20,
        max_connections=100,
        keepalive_expiry=30.0
    ),
    timeout=httpx.Timeout(60.0, connect=10.0)
)

client = AzureOpenAI(
    api_version="2024-05-01-preview",
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    api_key=os.environ["AZURE_OPENAI_KEY"],
    http_client=http_client
)

Retry with Backoff

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10)
)
def resilient_completion(messages: list) -> str:
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=messages
    )
    return response.choices[0].message.content

Real-Time Application Pattern

from fastapi import FastAPI, WebSocket
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

@app.websocket("/chat")
async def websocket_chat(websocket: WebSocket):
    await websocket.accept()

    while True:
        # Receive user message
        data = await websocket.receive_text()

        # Stream response back
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": data}],
            stream=True
        )

        for chunk in response:
            if chunk.choices[0].delta.content:
                await websocket.send_text(chunk.choices[0].delta.content)

        # Signal end of response
        await websocket.send_text("[END]")

@app.get("/stream")
async def stream_endpoint(prompt: str):
    async def generate():
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}],
            stream=True
        )

        for chunk in response:
            if chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content

    return StreamingResponse(generate(), media_type="text/plain")

Benchmarking Framework

import statistics
from dataclasses import dataclass
from typing import List

@dataclass
class BenchmarkResult:
    model: str
    prompt_tokens: int
    completion_tokens: int
    first_token_ms: float
    total_ms: float
    tokens_per_second: float

class ModelBenchmark:
    def __init__(self, client):
        self.client = client
        self.results: List[BenchmarkResult] = []

    def run_benchmark(self, prompt: str, model: str, iterations: int = 5) -> dict:
        results = []

        for _ in range(iterations):
            start = time.time()
            first_token = None
            tokens = 0

            response = self.client.chat.completions.create(
                model=model,
                messages=[{"role": "user", "content": prompt}],
                stream=True
            )

            for chunk in response:
                if chunk.choices[0].delta.content:
                    if first_token is None:
                        first_token = time.time() - start
                    tokens += 1

            total = time.time() - start

            results.append(BenchmarkResult(
                model=model,
                prompt_tokens=len(prompt) // 4,
                completion_tokens=tokens,
                first_token_ms=first_token * 1000,
                total_ms=total * 1000,
                tokens_per_second=tokens / total
            ))

        return {
            "model": model,
            "iterations": iterations,
            "avg_first_token_ms": statistics.mean(r.first_token_ms for r in results),
            "avg_total_ms": statistics.mean(r.total_ms for r in results),
            "avg_tokens_per_second": statistics.mean(r.tokens_per_second for r in results),
            "p95_total_ms": sorted([r.total_ms for r in results])[int(iterations * 0.95)]
        }

# Usage
benchmark = ModelBenchmark(client)
gpt4o_results = benchmark.run_benchmark(
    "Explain the concept of eventual consistency in distributed systems.",
    "gpt-4o"
)
print(f"GPT-4o avg first token: {gpt4o_results['avg_first_token_ms']:.0f}ms")

Summary

OptimizationImpact
StreamingBetter perceived latency
Parallel processingHigher throughput
Regional selection20-50% latency reduction
Connection pooling10-20% improvement
Shorter promptsFaster processing

What’s Next

Tomorrow I’ll cover Microsoft Copilot+ PCs and their AI capabilities.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.