6 min read
OpenAI Assistants API: Production Patterns and Best Practices
The OpenAI Assistants API, released in November 2023, provides a managed runtime for building AI assistants with persistent threads, file handling, and tool execution. Here’s how to use it effectively in production.
Assistants API Fundamentals
The API introduces three key concepts:
- Assistants: Configured AI agents with instructions and tools
- Threads: Conversation sessions that persist messages
- Runs: Execution instances that process messages
from openai import OpenAI
import time
client = OpenAI()
# Create an assistant
assistant = client.beta.assistants.create(
name="Data Analyst",
instructions="""You are a data analyst assistant. You help users:
- Analyze data files they upload
- Generate insights and visualizations
- Answer questions about their data
Always explain your methodology and be precise with numbers.""",
model="gpt-4-turbo",
tools=[
{"type": "code_interpreter"},
{"type": "retrieval"}
]
)
print(f"Created assistant: {assistant.id}")
Production Architecture
Thread Management
Threads persist conversations, but you need a strategy for managing them:
from dataclasses import dataclass
from datetime import datetime, timedelta
import json
@dataclass
class ThreadMetadata:
thread_id: str
user_id: str
created_at: datetime
last_activity: datetime
purpose: str
class ThreadManager:
def __init__(self, client, storage):
self.client = client
self.storage = storage # Redis, database, etc.
def get_or_create_thread(
self,
user_id: str,
purpose: str,
max_age_hours: int = 24
) -> str:
"""Get existing thread or create new one."""
# Look for existing recent thread
key = f"thread:{user_id}:{purpose}"
existing = self.storage.get(key)
if existing:
metadata = ThreadMetadata(**json.loads(existing))
# Check if thread is still fresh
if datetime.utcnow() - metadata.last_activity < timedelta(hours=max_age_hours):
# Update last activity
metadata.last_activity = datetime.utcnow()
self.storage.set(key, json.dumps(metadata.__dict__, default=str))
return metadata.thread_id
# Create new thread
thread = self.client.beta.threads.create()
metadata = ThreadMetadata(
thread_id=thread.id,
user_id=user_id,
created_at=datetime.utcnow(),
last_activity=datetime.utcnow(),
purpose=purpose
)
self.storage.set(
key,
json.dumps(metadata.__dict__, default=str),
ex=max_age_hours * 3600
)
return thread.id
def cleanup_old_threads(self, max_age_days: int = 7):
"""Delete threads older than threshold."""
# Implementation depends on your storage backend
# Iterate through stored threads and delete old ones
pass
Async Run Processing
Production systems need robust run handling:
import asyncio
from enum import Enum
class RunStatus(Enum):
QUEUED = "queued"
IN_PROGRESS = "in_progress"
REQUIRES_ACTION = "requires_action"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
EXPIRED = "expired"
class AssistantRunner:
def __init__(self, client, assistant_id: str):
self.client = client
self.assistant_id = assistant_id
async def run_with_message(
self,
thread_id: str,
message: str,
timeout_seconds: int = 300,
poll_interval: float = 1.0
) -> str:
"""Add message to thread and run assistant."""
# Add the user message
self.client.beta.threads.messages.create(
thread_id=thread_id,
role="user",
content=message
)
# Create a run
run = self.client.beta.threads.runs.create(
thread_id=thread_id,
assistant_id=self.assistant_id
)
# Poll for completion
start_time = asyncio.get_event_loop().time()
while True:
elapsed = asyncio.get_event_loop().time() - start_time
if elapsed > timeout_seconds:
self.client.beta.threads.runs.cancel(
thread_id=thread_id,
run_id=run.id
)
raise TimeoutError(f"Run timed out after {timeout_seconds}s")
run = self.client.beta.threads.runs.retrieve(
thread_id=thread_id,
run_id=run.id
)
status = RunStatus(run.status)
if status == RunStatus.COMPLETED:
return self._get_latest_response(thread_id)
elif status == RunStatus.REQUIRES_ACTION:
await self._handle_tool_calls(thread_id, run)
elif status in [RunStatus.FAILED, RunStatus.CANCELLED, RunStatus.EXPIRED]:
raise RuntimeError(f"Run failed with status: {status.value}")
await asyncio.sleep(poll_interval)
async def _handle_tool_calls(self, thread_id: str, run):
"""Process required tool calls."""
tool_outputs = []
for tool_call in run.required_action.submit_tool_outputs.tool_calls:
result = await self._execute_tool(
tool_call.function.name,
json.loads(tool_call.function.arguments)
)
tool_outputs.append({
"tool_call_id": tool_call.id,
"output": json.dumps(result)
})
self.client.beta.threads.runs.submit_tool_outputs(
thread_id=thread_id,
run_id=run.id,
tool_outputs=tool_outputs
)
async def _execute_tool(self, name: str, args: dict) -> dict:
"""Execute a custom tool - override in subclass."""
raise NotImplementedError(f"Tool not implemented: {name}")
def _get_latest_response(self, thread_id: str) -> str:
"""Get the assistant's latest message."""
messages = self.client.beta.threads.messages.list(
thread_id=thread_id,
order="desc",
limit=1
)
if messages.data and messages.data[0].role == "assistant":
return messages.data[0].content[0].text.value
return ""
Custom Function Tools
Extend assistants with custom tools:
class DataAnalystRunner(AssistantRunner):
def __init__(self, client, assistant_id: str, database_connection):
super().__init__(client, assistant_id)
self.db = database_connection
self.allowed_tables = ["sales", "customers", "products"]
def get_tool_definitions(self) -> list[dict]:
"""Define custom tools for the assistant."""
return [
{
"type": "function",
"function": {
"name": "query_database",
"description": "Execute a read-only SQL query against the database",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "SQL SELECT query to execute"
}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "get_table_schema",
"description": "Get the schema of a database table",
"parameters": {
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "Name of the table"
}
},
"required": ["table_name"]
}
}
}
]
async def _execute_tool(self, name: str, args: dict) -> dict:
"""Execute custom tools."""
if name == "query_database":
return await self._safe_query(args["query"])
elif name == "get_table_schema":
return await self._get_schema(args["table_name"])
return {"error": f"Unknown tool: {name}"}
async def _safe_query(self, query: str) -> dict:
"""Execute query with safety checks."""
# Basic SQL injection prevention
query_upper = query.upper()
if any(kw in query_upper for kw in ["INSERT", "UPDATE", "DELETE", "DROP", "ALTER"]):
return {"error": "Only SELECT queries are allowed"}
# Check table access
for table in self.allowed_tables:
if table.upper() in query_upper:
break
else:
return {"error": f"Access only allowed to: {self.allowed_tables}"}
try:
result = await self.db.fetch_all(query)
return {"data": result, "row_count": len(result)}
except Exception as e:
return {"error": str(e)}
async def _get_schema(self, table_name: str) -> dict:
"""Get table schema."""
if table_name not in self.allowed_tables:
return {"error": f"Table not accessible: {table_name}"}
schema = await self.db.get_table_schema(table_name)
return {"schema": schema}
File Handling
Handle file uploads for code interpreter and retrieval:
class FileManager:
def __init__(self, client, storage_client):
self.client = client
self.storage = storage_client # Azure Blob, S3, etc.
async def upload_for_assistant(
self,
file_path: str,
purpose: str = "assistants"
) -> str:
"""Upload file for use with assistant."""
with open(file_path, "rb") as f:
file = self.client.files.create(
file=f,
purpose=purpose
)
return file.id
async def attach_files_to_thread(
self,
thread_id: str,
file_ids: list[str],
message: str
):
"""Attach files to a thread message."""
self.client.beta.threads.messages.create(
thread_id=thread_id,
role="user",
content=message,
file_ids=file_ids
)
async def download_generated_files(self, thread_id: str) -> list[str]:
"""Download files generated by code interpreter."""
messages = self.client.beta.threads.messages.list(thread_id=thread_id)
downloaded = []
for message in messages.data:
if message.role == "assistant":
for content in message.content:
if hasattr(content, "image_file"):
file_id = content.image_file.file_id
file_data = self.client.files.content(file_id)
# Save locally or upload to storage
path = f"/tmp/{file_id}.png"
with open(path, "wb") as f:
f.write(file_data.read())
downloaded.append(path)
return downloaded
Error Handling and Monitoring
import logging
from dataclasses import dataclass
from typing import Optional
logger = logging.getLogger(__name__)
@dataclass
class RunMetrics:
run_id: str
thread_id: str
status: str
duration_seconds: float
prompt_tokens: int
completion_tokens: int
tool_calls: int
error: Optional[str] = None
class MonitoredAssistantRunner(AssistantRunner):
def __init__(self, client, assistant_id: str, metrics_collector):
super().__init__(client, assistant_id)
self.metrics = metrics_collector
async def run_with_message(self, thread_id: str, message: str, **kwargs) -> str:
start_time = asyncio.get_event_loop().time()
run_id = None
error = None
tool_calls = 0
try:
result = await super().run_with_message(thread_id, message, **kwargs)
return result
except Exception as e:
error = str(e)
logger.error(f"Run failed: {e}")
raise
finally:
duration = asyncio.get_event_loop().time() - start_time
if run_id:
run = self.client.beta.threads.runs.retrieve(
thread_id=thread_id,
run_id=run_id
)
metrics = RunMetrics(
run_id=run_id,
thread_id=thread_id,
status=run.status,
duration_seconds=duration,
prompt_tokens=run.usage.prompt_tokens if run.usage else 0,
completion_tokens=run.usage.completion_tokens if run.usage else 0,
tool_calls=tool_calls,
error=error
)
await self.metrics.record(metrics)
Best Practices
- Thread lifecycle management - Don’t create threads for every message
- Timeout handling - Runs can take minutes for complex tasks
- File cleanup - Delete uploaded files when no longer needed
- Cost monitoring - Track token usage per run
- Graceful degradation - Fall back to direct API calls if needed
Conclusion
The Assistants API simplifies building conversational AI but requires careful production engineering. Focus on thread management, robust polling, and comprehensive monitoring. The patterns here provide a foundation for reliable, scalable assistant deployments.