Back to Blog
7 min read

Streaming Responses with Azure OpenAI: Real-Time AI Output

Streaming responses dramatically improve user experience by showing AI output in real-time instead of waiting for the complete response. Let’s explore how to implement streaming with Azure OpenAI across different platforms.

Why Streaming Matters

Without streaming, users wait for the entire response:

  • 500 tokens at ~50 tokens/second = 10 seconds of waiting
  • Users see nothing, then everything at once

With streaming:

  • First token appears in ~200ms
  • Users see progress as it generates
  • Perceived performance is much better

Python Streaming

import openai
from typing import Generator, Optional

def stream_completion(
    prompt: str,
    deployment: str,
    max_tokens: int = 500,
    temperature: float = 0.7
) -> Generator[str, None, None]:
    """Stream completion tokens."""
    response = openai.Completion.create(
        engine=deployment,
        prompt=prompt,
        max_tokens=max_tokens,
        temperature=temperature,
        stream=True
    )

    for chunk in response:
        if chunk.choices[0].text:
            yield chunk.choices[0].text

def stream_chat(
    messages: list,
    deployment: str,
    max_tokens: int = 500
) -> Generator[str, None, None]:
    """Stream chat completion tokens."""
    response = openai.ChatCompletion.create(
        engine=deployment,
        messages=messages,
        max_tokens=max_tokens,
        stream=True
    )

    for chunk in response:
        delta = chunk.choices[0].delta
        if hasattr(delta, 'content') and delta.content:
            yield delta.content

# Usage
print("Streaming completion:")
for token in stream_completion("Tell me about Azure:", "gpt35"):
    print(token, end="", flush=True)

print("\n\nStreaming chat:")
messages = [
    {"role": "system", "content": "You are helpful."},
    {"role": "user", "content": "What is cloud computing?"}
]
for token in stream_chat(messages, "gpt-35-turbo"):
    print(token, end="", flush=True)

Collecting Streamed Response

from dataclasses import dataclass
from typing import Generator, List, Optional

@dataclass
class StreamedResponse:
    """Complete response from streaming."""
    content: str
    chunks: List[str]
    total_chunks: int
    finish_reason: Optional[str] = None

class StreamCollector:
    """Collect and process streamed responses."""

    def __init__(self):
        self.chunks: List[str] = []
        self.finish_reason: Optional[str] = None

    def collect(
        self,
        stream: Generator[str, None, None],
        on_chunk: Optional[callable] = None
    ) -> StreamedResponse:
        """Collect all chunks from a stream."""
        self.chunks = []

        for chunk in stream:
            self.chunks.append(chunk)
            if on_chunk:
                on_chunk(chunk)

        return StreamedResponse(
            content="".join(self.chunks),
            chunks=self.chunks,
            total_chunks=len(self.chunks),
            finish_reason=self.finish_reason
        )

    def stream_with_callback(
        self,
        messages: list,
        deployment: str,
        on_token: callable,
        on_complete: Optional[callable] = None
    ) -> str:
        """Stream with callbacks for each token."""
        full_response = []

        response = openai.ChatCompletion.create(
            engine=deployment,
            messages=messages,
            stream=True
        )

        for chunk in response:
            delta = chunk.choices[0].delta

            if hasattr(delta, 'content') and delta.content:
                token = delta.content
                full_response.append(token)
                on_token(token)

            if chunk.choices[0].finish_reason:
                self.finish_reason = chunk.choices[0].finish_reason

        complete_text = "".join(full_response)

        if on_complete:
            on_complete(complete_text)

        return complete_text

# Usage with callbacks
collector = StreamCollector()

def print_token(token: str):
    print(token, end="", flush=True)

def on_complete(text: str):
    print(f"\n\nComplete! Total length: {len(text)}")

result = collector.stream_with_callback(
    messages=[{"role": "user", "content": "Explain APIs"}],
    deployment="gpt-35-turbo",
    on_token=print_token,
    on_complete=on_complete
)

Flask Server-Sent Events (SSE)

from flask import Flask, Response, request, stream_with_context
import json

app = Flask(__name__)

@app.route('/chat/stream', methods=['POST'])
def stream_chat_endpoint():
    """Stream chat responses via SSE."""
    data = request.json
    messages = data.get('messages', [])

    def generate():
        try:
            response = openai.ChatCompletion.create(
                engine="gpt-35-turbo",
                messages=messages,
                stream=True
            )

            for chunk in response:
                delta = chunk.choices[0].delta
                if hasattr(delta, 'content') and delta.content:
                    # SSE format
                    yield f"data: {json.dumps({'content': delta.content})}\n\n"

            yield "data: [DONE]\n\n"

        except Exception as e:
            yield f"data: {json.dumps({'error': str(e)})}\n\n"

    return Response(
        stream_with_context(generate()),
        mimetype='text/event-stream',
        headers={
            'Cache-Control': 'no-cache',
            'X-Accel-Buffering': 'no'
        }
    )

# Client-side JavaScript
"""
const eventSource = new EventSource('/chat/stream', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ messages: [...] })
});

// Or using fetch with ReadableStream
async function streamChat(messages) {
    const response = await fetch('/chat/stream', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ messages })
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();

    while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        const lines = chunk.split('\\n').filter(line => line.startsWith('data: '));

        for (const line of lines) {
            const data = line.slice(6);
            if (data === '[DONE]') return;

            const parsed = JSON.parse(data);
            if (parsed.content) {
                document.getElementById('output').textContent += parsed.content;
            }
        }
    }
}
"""

FastAPI Streaming

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import List
import asyncio

app = FastAPI()

class ChatMessage(BaseModel):
    role: str
    content: str

class ChatRequest(BaseModel):
    messages: List[ChatMessage]
    max_tokens: int = 500

@app.post("/chat/stream")
async def stream_chat(request: ChatRequest):
    """Stream chat responses."""

    async def generate():
        response = openai.ChatCompletion.create(
            engine="gpt-35-turbo",
            messages=[m.dict() for m in request.messages],
            max_tokens=request.max_tokens,
            stream=True
        )

        for chunk in response:
            delta = chunk.choices[0].delta
            if hasattr(delta, 'content') and delta.content:
                yield f"data: {json.dumps({'content': delta.content})}\n\n"
                await asyncio.sleep(0)  # Allow other tasks to run

        yield "data: [DONE]\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream"
    )

# Async version with httpx
import httpx

async def async_stream_chat(messages: list) -> AsyncGenerator[str, None]:
    """Async streaming with httpx."""
    async with httpx.AsyncClient() as client:
        async with client.stream(
            "POST",
            f"{ENDPOINT}/openai/deployments/gpt35/chat/completions",
            headers={"api-key": API_KEY, "Content-Type": "application/json"},
            params={"api-version": "2023-03-15-preview"},
            json={"messages": messages, "stream": True}
        ) as response:
            async for line in response.aiter_lines():
                if line.startswith("data: "):
                    data = line[6:]
                    if data == "[DONE]":
                        break
                    chunk = json.loads(data)
                    content = chunk["choices"][0].get("delta", {}).get("content")
                    if content:
                        yield content

.NET Streaming

using Azure.AI.OpenAI;

public class StreamingChatService
{
    private readonly OpenAIClient _client;
    private readonly string _deployment;

    public StreamingChatService(OpenAIClient client, string deployment)
    {
        _client = client;
        _deployment = deployment;
    }

    public async IAsyncEnumerable<string> StreamChatAsync(
        IEnumerable<ChatMessage> messages,
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        var options = new ChatCompletionsOptions();
        foreach (var message in messages)
        {
            options.Messages.Add(message);
        }

        await foreach (var streaming in
            _client.GetChatCompletionsStreamingAsync(_deployment, options, cancellationToken))
        {
            await foreach (var choice in streaming.GetChoicesStreaming(cancellationToken))
            {
                await foreach (var message in choice.GetMessageStreaming(cancellationToken))
                {
                    if (!string.IsNullOrEmpty(message.Content))
                    {
                        yield return message.Content;
                    }
                }
            }
        }
    }
}

// ASP.NET Core Controller
[ApiController]
[Route("api/[controller]")]
public class ChatController : ControllerBase
{
    private readonly StreamingChatService _chatService;

    [HttpPost("stream")]
    public async Task StreamChat([FromBody] ChatRequest request)
    {
        Response.ContentType = "text/event-stream";
        Response.Headers.Add("Cache-Control", "no-cache");

        var messages = new[]
        {
            new ChatMessage(ChatRole.System, request.SystemPrompt ?? "You are helpful."),
            new ChatMessage(ChatRole.User, request.Message)
        };

        await foreach (var token in _chatService.StreamChatAsync(messages))
        {
            await Response.WriteAsync($"data: {JsonSerializer.Serialize(new { content = token })}\n\n");
            await Response.Body.FlushAsync();
        }

        await Response.WriteAsync("data: [DONE]\n\n");
    }
}

React Frontend

import React, { useState, useCallback } from 'react';

interface Message {
  role: 'user' | 'assistant';
  content: string;
}

const ChatComponent: React.FC = () => {
  const [messages, setMessages] = useState<Message[]>([]);
  const [input, setInput] = useState('');
  const [isStreaming, setIsStreaming] = useState(false);

  const streamMessage = useCallback(async (userMessage: string) => {
    setIsStreaming(true);

    // Add user message
    const newMessages = [...messages, { role: 'user' as const, content: userMessage }];
    setMessages(newMessages);

    // Add empty assistant message that we'll stream into
    setMessages([...newMessages, { role: 'assistant' as const, content: '' }]);

    try {
      const response = await fetch('/api/chat/stream', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          messages: newMessages.map(m => ({ role: m.role, content: m.content }))
        })
      });

      const reader = response.body?.getReader();
      const decoder = new TextDecoder();

      if (!reader) return;

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        const lines = chunk.split('\n').filter(line => line.startsWith('data: '));

        for (const line of lines) {
          const data = line.slice(6);
          if (data === '[DONE]') continue;

          try {
            const parsed = JSON.parse(data);
            if (parsed.content) {
              setMessages(prev => {
                const updated = [...prev];
                const lastIdx = updated.length - 1;
                updated[lastIdx] = {
                  ...updated[lastIdx],
                  content: updated[lastIdx].content + parsed.content
                };
                return updated;
              });
            }
          } catch (e) {
            // Skip invalid JSON
          }
        }
      }
    } catch (error) {
      console.error('Streaming error:', error);
    } finally {
      setIsStreaming(false);
    }
  }, [messages]);

  const handleSubmit = (e: React.FormEvent) => {
    e.preventDefault();
    if (input.trim() && !isStreaming) {
      streamMessage(input.trim());
      setInput('');
    }
  };

  return (
    <div className="chat-container">
      <div className="messages">
        {messages.map((msg, idx) => (
          <div key={idx} className={`message ${msg.role}`}>
            <strong>{msg.role}:</strong>
            <p>{msg.content}{isStreaming && idx === messages.length - 1 && '|'}</p>
          </div>
        ))}
      </div>
      <form onSubmit={handleSubmit}>
        <input
          value={input}
          onChange={e => setInput(e.target.value)}
          disabled={isStreaming}
          placeholder="Type a message..."
        />
        <button type="submit" disabled={isStreaming}>
          {isStreaming ? 'Streaming...' : 'Send'}
        </button>
      </form>
    </div>
  );
};

export default ChatComponent;

Best Practices

  1. Handle connection drops: Implement reconnection logic
  2. Show typing indicator: Display cursor or animation while streaming
  3. Buffer for display: Small buffer can smooth output
  4. Track completion: Know when stream is done
  5. Error handling: Gracefully handle mid-stream errors
  6. Cancel support: Allow users to stop generation

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.