Back to Blog
4 min read

Response Streaming: Real-Time LLM Output

Streaming responses improve perceived latency and user experience in LLM applications. Today, I will cover how to implement effective streaming patterns.

Why Streaming Matters

streaming_benefits = {
    "perceived_latency": "Users see content immediately instead of waiting",
    "time_to_first_token": "Response starts in ~100ms vs seconds for full response",
    "user_engagement": "Progressive display keeps users engaged",
    "cancelability": "Users can stop generation mid-stream"
}

Basic Streaming

from openai import AzureOpenAI

client = AzureOpenAI(
    api_key="your-key",
    api_version="2023-07-01-preview",
    azure_endpoint="https://your-resource.openai.azure.com"
)

def stream_chat(messages: list, model: str = "gpt-4"):
    """Basic streaming chat"""

    response = client.chat.completions.create(
        model=model,
        messages=messages,
        stream=True
    )

    full_response = ""

    for chunk in response:
        if chunk.choices and chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            full_response += content
            yield content

    return full_response

# Usage
for token in stream_chat([{"role": "user", "content": "Explain quantum computing"}]):
    print(token, end="", flush=True)

Async Streaming

import asyncio
from openai import AsyncAzureOpenAI

async_client = AsyncAzureOpenAI(
    api_key="your-key",
    api_version="2023-07-01-preview",
    azure_endpoint="https://your-resource.openai.azure.com"
)

async def async_stream_chat(messages: list, model: str = "gpt-4"):
    """Async streaming chat"""

    response = await async_client.chat.completions.create(
        model=model,
        messages=messages,
        stream=True
    )

    async for chunk in response:
        if chunk.choices and chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content

# Usage with asyncio
async def main():
    async for token in async_stream_chat([{"role": "user", "content": "Hello"}]):
        print(token, end="", flush=True)

asyncio.run(main())

FastAPI Streaming Endpoint

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import json

app = FastAPI()

async def generate_stream(messages: list):
    """Generate streaming response"""

    response = await async_client.chat.completions.create(
        model="gpt-4",
        messages=messages,
        stream=True
    )

    async for chunk in response:
        if chunk.choices and chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            # Send as Server-Sent Event
            yield f"data: {json.dumps({'content': content})}\n\n"

    yield "data: [DONE]\n\n"

@app.post("/chat/stream")
async def chat_stream(request: Request):
    """Streaming chat endpoint"""
    data = await request.json()
    messages = data.get("messages", [])

    return StreamingResponse(
        generate_stream(messages),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive"
        }
    )

Frontend Consumption

// JavaScript/TypeScript client for streaming
async function streamChat(messages: Array<{role: string, content: string}>) {
    const response = await fetch('/chat/stream', {
        method: 'POST',
        headers: {'Content-Type': 'application/json'},
        body: JSON.stringify({messages})
    });

    const reader = response.body?.getReader();
    const decoder = new TextDecoder();
    let fullResponse = '';

    while (reader) {
        const {done, value} = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        const lines = chunk.split('\n');

        for (const line of lines) {
            if (line.startsWith('data: ')) {
                const data = line.slice(6);
                if (data === '[DONE]') continue;

                try {
                    const parsed = JSON.parse(data);
                    fullResponse += parsed.content;
                    // Update UI
                    document.getElementById('response').innerText = fullResponse;
                } catch (e) {
                    // Handle parse errors
                }
            }
        }
    }

    return fullResponse;
}

Streaming with Function Calls

async def stream_with_functions(messages: list, functions: list):
    """Stream response that may include function calls"""

    response = await async_client.chat.completions.create(
        model="gpt-4",
        messages=messages,
        functions=functions,
        stream=True
    )

    full_content = ""
    function_call = {"name": "", "arguments": ""}
    current_mode = "content"  # or "function"

    async for chunk in response:
        choice = chunk.choices[0] if chunk.choices else None
        if not choice:
            continue

        delta = choice.delta

        # Check for function call
        if delta.function_call:
            current_mode = "function"
            if delta.function_call.name:
                function_call["name"] = delta.function_call.name
            if delta.function_call.arguments:
                function_call["arguments"] += delta.function_call.arguments

        # Regular content
        elif delta.content:
            current_mode = "content"
            full_content += delta.content
            yield {"type": "content", "content": delta.content}

        # Check finish reason
        if choice.finish_reason == "function_call":
            yield {
                "type": "function_call",
                "name": function_call["name"],
                "arguments": json.loads(function_call["arguments"])
            }
        elif choice.finish_reason == "stop":
            yield {"type": "done", "content": full_content}

Streaming with Progress Tracking

class StreamingResponse:
    """Track streaming progress"""

    def __init__(self):
        self.tokens = []
        self.start_time = None
        self.first_token_time = None
        self.end_time = None

    async def stream(self, messages: list, model: str = "gpt-4"):
        """Stream with metrics"""
        import time

        self.start_time = time.time()

        response = await async_client.chat.completions.create(
            model=model,
            messages=messages,
            stream=True
        )

        async for chunk in response:
            if chunk.choices and chunk.choices[0].delta.content:
                if self.first_token_time is None:
                    self.first_token_time = time.time()

                content = chunk.choices[0].delta.content
                self.tokens.append(content)
                yield content

        self.end_time = time.time()

    def get_metrics(self) -> dict:
        """Get streaming metrics"""
        if not self.start_time:
            return {}

        return {
            "time_to_first_token": self.first_token_time - self.start_time if self.first_token_time else None,
            "total_time": self.end_time - self.start_time if self.end_time else None,
            "token_count": len(self.tokens),
            "tokens_per_second": len(self.tokens) / (self.end_time - self.start_time) if self.end_time else None
        }

# Usage
streamer = StreamingResponse()
async for token in streamer.stream([{"role": "user", "content": "Hello"}]):
    print(token, end="")

print(f"\nMetrics: {streamer.get_metrics()}")

Cancellation Support

import asyncio

class CancellableStream:
    """Support cancelling streaming response"""

    def __init__(self, client):
        self.client = client
        self.cancelled = False

    def cancel(self):
        """Cancel the stream"""
        self.cancelled = True

    async def stream(self, messages: list, model: str = "gpt-4"):
        """Stream with cancellation support"""

        response = await self.client.chat.completions.create(
            model=model,
            messages=messages,
            stream=True
        )

        async for chunk in response:
            if self.cancelled:
                # Clean up and exit
                await response.aclose()
                yield {"type": "cancelled"}
                return

            if chunk.choices and chunk.choices[0].delta.content:
                yield {"type": "content", "content": chunk.choices[0].delta.content}

        yield {"type": "done"}

# Usage
streamer = CancellableStream(async_client)

# In another task/thread
# streamer.cancel()

Streaming improves the user experience significantly. Tomorrow, I will cover error handling for AI applications.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.