5 min read
Assistants API Deep Dive: Threads and Messages
Assistants API Deep Dive: Threads and Messages
Threads and Messages are the backbone of conversational state in the Assistants API. Understanding how to effectively manage them is crucial for building sophisticated AI applications.
Thread Architecture
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime
import json
@dataclass
class MessageContent:
type: str # "text", "image_file", "file_path"
data: dict
@dataclass
class Message:
id: str
role: str # "user" or "assistant"
content: List[MessageContent]
created_at: datetime
thread_id: str
file_ids: List[str] = field(default_factory=list)
metadata: Dict = field(default_factory=dict)
@dataclass
class Thread:
id: str
created_at: datetime
metadata: Dict = field(default_factory=dict)
messages: List[Message] = field(default_factory=list)
class ThreadManager:
def __init__(self, client):
self.client = client
self.threads: Dict[str, Thread] = {}
def create_thread(self, metadata: Dict = None) -> Thread:
"""Create a new conversation thread."""
thread = self.client.beta.threads.create(
metadata=metadata or {}
)
local_thread = Thread(
id=thread.id,
created_at=datetime.now(),
metadata=metadata or {}
)
self.threads[thread.id] = local_thread
return local_thread
def get_thread(self, thread_id: str) -> Thread:
"""Retrieve a thread with its messages."""
thread = self.client.beta.threads.retrieve(thread_id)
messages = self.client.beta.threads.messages.list(thread_id)
local_thread = Thread(
id=thread.id,
created_at=datetime.fromtimestamp(thread.created_at),
metadata=thread.metadata or {}
)
for msg in messages.data:
local_thread.messages.append(Message(
id=msg.id,
role=msg.role,
content=[MessageContent(c.type, {"value": c.text.value if hasattr(c, 'text') else str(c)})
for c in msg.content],
created_at=datetime.fromtimestamp(msg.created_at),
thread_id=thread_id,
file_ids=msg.file_ids or []
))
return local_thread
def delete_thread(self, thread_id: str) -> bool:
"""Delete a thread and all its messages."""
self.client.beta.threads.delete(thread_id)
if thread_id in self.threads:
del self.threads[thread_id]
return True
Advanced Message Management
class MessageManager:
def __init__(self, client):
self.client = client
def add_message(
self,
thread_id: str,
content: str,
role: str = "user",
file_ids: List[str] = None,
metadata: Dict = None
) -> Message:
"""Add a message to a thread."""
message = self.client.beta.threads.messages.create(
thread_id=thread_id,
role=role,
content=content,
file_ids=file_ids or [],
metadata=metadata or {}
)
return Message(
id=message.id,
role=message.role,
content=[MessageContent("text", {"value": content})],
created_at=datetime.now(),
thread_id=thread_id,
file_ids=file_ids or [],
metadata=metadata or {}
)
def get_messages(
self,
thread_id: str,
limit: int = 20,
order: str = "desc",
after: str = None,
before: str = None
) -> List[Message]:
"""Get messages with pagination support."""
params = {
"limit": limit,
"order": order
}
if after:
params["after"] = after
if before:
params["before"] = before
messages = self.client.beta.threads.messages.list(
thread_id=thread_id,
**params
)
return [
Message(
id=msg.id,
role=msg.role,
content=[MessageContent(c.type, self._extract_content(c))
for c in msg.content],
created_at=datetime.fromtimestamp(msg.created_at),
thread_id=thread_id,
file_ids=msg.file_ids or [],
metadata=msg.metadata or {}
)
for msg in messages.data
]
def _extract_content(self, content) -> dict:
"""Extract content based on type."""
if content.type == "text":
return {
"value": content.text.value,
"annotations": [
{"type": a.type, "text": a.text}
for a in content.text.annotations
] if content.text.annotations else []
}
elif content.type == "image_file":
return {"file_id": content.image_file.file_id}
return {}
def get_conversation_context(
self,
thread_id: str,
max_messages: int = 10
) -> str:
"""Get conversation context as formatted string."""
messages = self.get_messages(thread_id, limit=max_messages, order="asc")
context_parts = []
for msg in messages:
role = "User" if msg.role == "user" else "Assistant"
content = msg.content[0].data.get("value", "") if msg.content else ""
context_parts.append(f"{role}: {content}")
return "\n\n".join(context_parts)
Thread Persistence and Recovery
import sqlite3
from contextlib import contextmanager
class ThreadPersistence:
"""Persist thread metadata for recovery and analytics."""
def __init__(self, db_path: str = "threads.db"):
self.db_path = db_path
self._init_db()
def _init_db(self):
"""Initialize the database schema."""
with self._get_connection() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS threads (
thread_id TEXT PRIMARY KEY,
user_id TEXT,
assistant_id TEXT,
created_at TIMESTAMP,
last_active TIMESTAMP,
metadata TEXT,
status TEXT DEFAULT 'active'
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS thread_summaries (
thread_id TEXT,
summary TEXT,
message_count INTEGER,
updated_at TIMESTAMP,
FOREIGN KEY (thread_id) REFERENCES threads(thread_id)
)
""")
@contextmanager
def _get_connection(self):
conn = sqlite3.connect(self.db_path)
try:
yield conn
conn.commit()
finally:
conn.close()
def save_thread(
self,
thread_id: str,
user_id: str,
assistant_id: str,
metadata: Dict = None
):
"""Save thread information."""
with self._get_connection() as conn:
conn.execute("""
INSERT OR REPLACE INTO threads
(thread_id, user_id, assistant_id, created_at, last_active, metadata, status)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
thread_id,
user_id,
assistant_id,
datetime.now(),
datetime.now(),
json.dumps(metadata or {}),
'active'
))
def get_user_threads(
self,
user_id: str,
status: str = 'active'
) -> List[Dict]:
"""Get all threads for a user."""
with self._get_connection() as conn:
cursor = conn.execute("""
SELECT thread_id, assistant_id, created_at, last_active, metadata
FROM threads
WHERE user_id = ? AND status = ?
ORDER BY last_active DESC
""", (user_id, status))
return [
{
"thread_id": row[0],
"assistant_id": row[1],
"created_at": row[2],
"last_active": row[3],
"metadata": json.loads(row[4])
}
for row in cursor.fetchall()
]
def update_activity(self, thread_id: str):
"""Update last activity timestamp."""
with self._get_connection() as conn:
conn.execute("""
UPDATE threads SET last_active = ? WHERE thread_id = ?
""", (datetime.now(), thread_id))
def archive_thread(self, thread_id: str):
"""Archive a thread."""
with self._get_connection() as conn:
conn.execute("""
UPDATE threads SET status = 'archived' WHERE thread_id = ?
""", (thread_id,))
Conversation Flow Control
class ConversationController:
"""Control conversation flow and handle multi-turn interactions."""
def __init__(self, client, assistant_id: str):
self.client = client
self.assistant_id = assistant_id
self.thread_manager = ThreadManager(client)
self.message_manager = MessageManager(client)
def start_conversation(
self,
initial_message: str,
metadata: Dict = None
) -> Dict:
"""Start a new conversation."""
thread = self.thread_manager.create_thread(metadata)
# Add initial message
self.message_manager.add_message(thread.id, initial_message)
# Run assistant
response = self._run_and_wait(thread.id)
return {
"thread_id": thread.id,
"response": response
}
def continue_conversation(
self,
thread_id: str,
message: str
) -> str:
"""Continue an existing conversation."""
self.message_manager.add_message(thread_id, message)
return self._run_and_wait(thread_id)
def _run_and_wait(self, thread_id: str) -> str:
"""Run assistant and wait for response."""
import time
run = self.client.beta.threads.runs.create(
thread_id=thread_id,
assistant_id=self.assistant_id
)
while run.status in ["queued", "in_progress"]:
time.sleep(0.5)
run = self.client.beta.threads.runs.retrieve(
thread_id=thread_id,
run_id=run.id
)
if run.status == "completed":
messages = self.message_manager.get_messages(thread_id, limit=1)
if messages and messages[0].role == "assistant":
return messages[0].content[0].data.get("value", "")
return f"Error: Run ended with status {run.status}"
def get_conversation_summary(self, thread_id: str) -> str:
"""Generate a summary of the conversation."""
context = self.message_manager.get_conversation_context(thread_id)
# Use the assistant to summarize
summary_thread = self.thread_manager.create_thread()
self.message_manager.add_message(
summary_thread.id,
f"Summarize this conversation in 2-3 sentences:\n\n{context}"
)
summary = self._run_and_wait(summary_thread.id)
self.thread_manager.delete_thread(summary_thread.id)
return summary
Best Practices
- Implement thread persistence for user session recovery
- Set message limits to manage context and costs
- Use metadata to track conversation state
- Clean up old threads to manage storage
- Handle interruptions gracefully with run cancellation
Tomorrow, we’ll explore file handling and the retrieval tool in the Assistants API!