7 min read
Streaming Responses with Azure OpenAI: Real-Time AI Output
Streaming responses dramatically improve user experience by showing AI output in real-time instead of waiting for the complete response. Let’s explore how to implement streaming with Azure OpenAI across different platforms.
Why Streaming Matters
Without streaming, users wait for the entire response:
- 500 tokens at ~50 tokens/second = 10 seconds of waiting
- Users see nothing, then everything at once
With streaming:
- First token appears in ~200ms
- Users see progress as it generates
- Perceived performance is much better
Python Streaming
import openai
from typing import Generator, Optional
def stream_completion(
prompt: str,
deployment: str,
max_tokens: int = 500,
temperature: float = 0.7
) -> Generator[str, None, None]:
"""Stream completion tokens."""
response = openai.Completion.create(
engine=deployment,
prompt=prompt,
max_tokens=max_tokens,
temperature=temperature,
stream=True
)
for chunk in response:
if chunk.choices[0].text:
yield chunk.choices[0].text
def stream_chat(
messages: list,
deployment: str,
max_tokens: int = 500
) -> Generator[str, None, None]:
"""Stream chat completion tokens."""
response = openai.ChatCompletion.create(
engine=deployment,
messages=messages,
max_tokens=max_tokens,
stream=True
)
for chunk in response:
delta = chunk.choices[0].delta
if hasattr(delta, 'content') and delta.content:
yield delta.content
# Usage
print("Streaming completion:")
for token in stream_completion("Tell me about Azure:", "gpt35"):
print(token, end="", flush=True)
print("\n\nStreaming chat:")
messages = [
{"role": "system", "content": "You are helpful."},
{"role": "user", "content": "What is cloud computing?"}
]
for token in stream_chat(messages, "gpt-35-turbo"):
print(token, end="", flush=True)
Collecting Streamed Response
from dataclasses import dataclass
from typing import Generator, List, Optional
@dataclass
class StreamedResponse:
"""Complete response from streaming."""
content: str
chunks: List[str]
total_chunks: int
finish_reason: Optional[str] = None
class StreamCollector:
"""Collect and process streamed responses."""
def __init__(self):
self.chunks: List[str] = []
self.finish_reason: Optional[str] = None
def collect(
self,
stream: Generator[str, None, None],
on_chunk: Optional[callable] = None
) -> StreamedResponse:
"""Collect all chunks from a stream."""
self.chunks = []
for chunk in stream:
self.chunks.append(chunk)
if on_chunk:
on_chunk(chunk)
return StreamedResponse(
content="".join(self.chunks),
chunks=self.chunks,
total_chunks=len(self.chunks),
finish_reason=self.finish_reason
)
def stream_with_callback(
self,
messages: list,
deployment: str,
on_token: callable,
on_complete: Optional[callable] = None
) -> str:
"""Stream with callbacks for each token."""
full_response = []
response = openai.ChatCompletion.create(
engine=deployment,
messages=messages,
stream=True
)
for chunk in response:
delta = chunk.choices[0].delta
if hasattr(delta, 'content') and delta.content:
token = delta.content
full_response.append(token)
on_token(token)
if chunk.choices[0].finish_reason:
self.finish_reason = chunk.choices[0].finish_reason
complete_text = "".join(full_response)
if on_complete:
on_complete(complete_text)
return complete_text
# Usage with callbacks
collector = StreamCollector()
def print_token(token: str):
print(token, end="", flush=True)
def on_complete(text: str):
print(f"\n\nComplete! Total length: {len(text)}")
result = collector.stream_with_callback(
messages=[{"role": "user", "content": "Explain APIs"}],
deployment="gpt-35-turbo",
on_token=print_token,
on_complete=on_complete
)
Flask Server-Sent Events (SSE)
from flask import Flask, Response, request, stream_with_context
import json
app = Flask(__name__)
@app.route('/chat/stream', methods=['POST'])
def stream_chat_endpoint():
"""Stream chat responses via SSE."""
data = request.json
messages = data.get('messages', [])
def generate():
try:
response = openai.ChatCompletion.create(
engine="gpt-35-turbo",
messages=messages,
stream=True
)
for chunk in response:
delta = chunk.choices[0].delta
if hasattr(delta, 'content') and delta.content:
# SSE format
yield f"data: {json.dumps({'content': delta.content})}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return Response(
stream_with_context(generate()),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no'
}
)
# Client-side JavaScript
"""
const eventSource = new EventSource('/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages: [...] })
});
// Or using fetch with ReadableStream
async function streamChat(messages) {
const response = await fetch('/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\\n').filter(line => line.startsWith('data: '));
for (const line of lines) {
const data = line.slice(6);
if (data === '[DONE]') return;
const parsed = JSON.parse(data);
if (parsed.content) {
document.getElementById('output').textContent += parsed.content;
}
}
}
}
"""
FastAPI Streaming
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import List
import asyncio
app = FastAPI()
class ChatMessage(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
messages: List[ChatMessage]
max_tokens: int = 500
@app.post("/chat/stream")
async def stream_chat(request: ChatRequest):
"""Stream chat responses."""
async def generate():
response = openai.ChatCompletion.create(
engine="gpt-35-turbo",
messages=[m.dict() for m in request.messages],
max_tokens=request.max_tokens,
stream=True
)
for chunk in response:
delta = chunk.choices[0].delta
if hasattr(delta, 'content') and delta.content:
yield f"data: {json.dumps({'content': delta.content})}\n\n"
await asyncio.sleep(0) # Allow other tasks to run
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
# Async version with httpx
import httpx
async def async_stream_chat(messages: list) -> AsyncGenerator[str, None]:
"""Async streaming with httpx."""
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
f"{ENDPOINT}/openai/deployments/gpt35/chat/completions",
headers={"api-key": API_KEY, "Content-Type": "application/json"},
params={"api-version": "2023-03-15-preview"},
json={"messages": messages, "stream": True}
) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
break
chunk = json.loads(data)
content = chunk["choices"][0].get("delta", {}).get("content")
if content:
yield content
.NET Streaming
using Azure.AI.OpenAI;
public class StreamingChatService
{
private readonly OpenAIClient _client;
private readonly string _deployment;
public StreamingChatService(OpenAIClient client, string deployment)
{
_client = client;
_deployment = deployment;
}
public async IAsyncEnumerable<string> StreamChatAsync(
IEnumerable<ChatMessage> messages,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var options = new ChatCompletionsOptions();
foreach (var message in messages)
{
options.Messages.Add(message);
}
await foreach (var streaming in
_client.GetChatCompletionsStreamingAsync(_deployment, options, cancellationToken))
{
await foreach (var choice in streaming.GetChoicesStreaming(cancellationToken))
{
await foreach (var message in choice.GetMessageStreaming(cancellationToken))
{
if (!string.IsNullOrEmpty(message.Content))
{
yield return message.Content;
}
}
}
}
}
}
// ASP.NET Core Controller
[ApiController]
[Route("api/[controller]")]
public class ChatController : ControllerBase
{
private readonly StreamingChatService _chatService;
[HttpPost("stream")]
public async Task StreamChat([FromBody] ChatRequest request)
{
Response.ContentType = "text/event-stream";
Response.Headers.Add("Cache-Control", "no-cache");
var messages = new[]
{
new ChatMessage(ChatRole.System, request.SystemPrompt ?? "You are helpful."),
new ChatMessage(ChatRole.User, request.Message)
};
await foreach (var token in _chatService.StreamChatAsync(messages))
{
await Response.WriteAsync($"data: {JsonSerializer.Serialize(new { content = token })}\n\n");
await Response.Body.FlushAsync();
}
await Response.WriteAsync("data: [DONE]\n\n");
}
}
React Frontend
import React, { useState, useCallback } from 'react';
interface Message {
role: 'user' | 'assistant';
content: string;
}
const ChatComponent: React.FC = () => {
const [messages, setMessages] = useState<Message[]>([]);
const [input, setInput] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const streamMessage = useCallback(async (userMessage: string) => {
setIsStreaming(true);
// Add user message
const newMessages = [...messages, { role: 'user' as const, content: userMessage }];
setMessages(newMessages);
// Add empty assistant message that we'll stream into
setMessages([...newMessages, { role: 'assistant' as const, content: '' }]);
try {
const response = await fetch('/api/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
messages: newMessages.map(m => ({ role: m.role, content: m.content }))
})
});
const reader = response.body?.getReader();
const decoder = new TextDecoder();
if (!reader) return;
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n').filter(line => line.startsWith('data: '));
for (const line of lines) {
const data = line.slice(6);
if (data === '[DONE]') continue;
try {
const parsed = JSON.parse(data);
if (parsed.content) {
setMessages(prev => {
const updated = [...prev];
const lastIdx = updated.length - 1;
updated[lastIdx] = {
...updated[lastIdx],
content: updated[lastIdx].content + parsed.content
};
return updated;
});
}
} catch (e) {
// Skip invalid JSON
}
}
}
} catch (error) {
console.error('Streaming error:', error);
} finally {
setIsStreaming(false);
}
}, [messages]);
const handleSubmit = (e: React.FormEvent) => {
e.preventDefault();
if (input.trim() && !isStreaming) {
streamMessage(input.trim());
setInput('');
}
};
return (
<div className="chat-container">
<div className="messages">
{messages.map((msg, idx) => (
<div key={idx} className={`message ${msg.role}`}>
<strong>{msg.role}:</strong>
<p>{msg.content}{isStreaming && idx === messages.length - 1 && '|'}</p>
</div>
))}
</div>
<form onSubmit={handleSubmit}>
<input
value={input}
onChange={e => setInput(e.target.value)}
disabled={isStreaming}
placeholder="Type a message..."
/>
<button type="submit" disabled={isStreaming}>
{isStreaming ? 'Streaming...' : 'Send'}
</button>
</form>
</div>
);
};
export default ChatComponent;
Best Practices
- Handle connection drops: Implement reconnection logic
- Show typing indicator: Display cursor or animation while streaming
- Buffer for display: Small buffer can smooth output
- Track completion: Know when stream is done
- Error handling: Gracefully handle mid-stream errors
- Cancel support: Allow users to stop generation