Graph-Based AI Agents: Design Patterns and Architecture
Graph-based agents represent a paradigm shift from linear chains to flexible, stateful AI systems. Understanding the design patterns helps you build more capable and maintainable agents. Let’s explore the key patterns.
The Graph Mental Model
Think of your agent as a state machine:
- Nodes: Processing steps (functions that transform state)
- Edges: Transitions between steps
- State: Accumulated context that flows through the graph
from typing import TypedDict, Annotated, Literal
from operator import add
from langgraph.graph import StateGraph, END
# State accumulates through the graph
class AgentState(TypedDict):
# Accumulated messages (using Annotated with add operator)
messages: Annotated[list[dict], add]
# Current processing phase
phase: str
# Accumulated artifacts
artifacts: Annotated[list[str], add]
# Final output
output: str
Pattern 1: Sequential Pipeline
The simplest pattern - nodes execute in order:
class PipelineState(TypedDict):
raw_data: str
cleaned_data: str
transformed_data: str
validated_data: str
output: str
def clean_data(state: PipelineState) -> PipelineState:
# Remove duplicates, fix formatting
cleaned = state["raw_data"].strip().lower()
return {"cleaned_data": cleaned}
def transform_data(state: PipelineState) -> PipelineState:
# Apply transformations
transformed = state["cleaned_data"].upper()
return {"transformed_data": transformed}
def validate_data(state: PipelineState) -> PipelineState:
# Validate output
is_valid = len(state["transformed_data"]) > 0
return {
"validated_data": state["transformed_data"] if is_valid else "",
"output": "valid" if is_valid else "invalid"
}
graph = StateGraph(PipelineState)
graph.add_node("clean", clean_data)
graph.add_node("transform", transform_data)
graph.add_node("validate", validate_data)
graph.set_entry_point("clean")
graph.add_edge("clean", "transform")
graph.add_edge("transform", "validate")
graph.add_edge("validate", END)
pipeline = graph.compile()
Pattern 2: Router/Dispatcher
Route to different handlers based on input:
from langchain_openai import AzureChatOpenAI
class RouterState(TypedDict):
input: str
category: str
result: str
llm = AzureChatOpenAI(azure_deployment="gpt-4o")
def classify_input(state: RouterState) -> RouterState:
"""Classify the input to determine routing."""
prompt = f"""
Classify this input into one of: [data_query, code_request, explanation, other]
Input: {state['input']}
Return only the category name.
"""
response = llm.invoke(prompt)
return {"category": response.content.strip().lower()}
def handle_data_query(state: RouterState) -> RouterState:
prompt = f"Generate a SQL query for: {state['input']}"
response = llm.invoke(prompt)
return {"result": f"SQL Query: {response.content}"}
def handle_code_request(state: RouterState) -> RouterState:
prompt = f"Write code for: {state['input']}"
response = llm.invoke(prompt)
return {"result": f"Code: {response.content}"}
def handle_explanation(state: RouterState) -> RouterState:
prompt = f"Explain: {state['input']}"
response = llm.invoke(prompt)
return {"result": response.content}
def handle_other(state: RouterState) -> RouterState:
return {"result": "I'm not sure how to handle that request."}
def route_decision(state: RouterState) -> Literal["data", "code", "explain", "other"]:
category_map = {
"data_query": "data",
"code_request": "code",
"explanation": "explain"
}
return category_map.get(state["category"], "other")
graph = StateGraph(RouterState)
graph.add_node("classify", classify_input)
graph.add_node("data", handle_data_query)
graph.add_node("code", handle_code_request)
graph.add_node("explain", handle_explanation)
graph.add_node("other", handle_other)
graph.set_entry_point("classify")
graph.add_conditional_edges(
"classify",
route_decision,
{"data": "data", "code": "code", "explain": "explain", "other": "other"}
)
for node in ["data", "code", "explain", "other"]:
graph.add_edge(node, END)
router = graph.compile()
Pattern 3: Iterative Refinement
Loop until quality threshold is met:
class RefinementState(TypedDict):
task: str
draft: str
feedback: str
score: float
iterations: int
max_iterations: int
final: str
def generate_draft(state: RefinementState) -> RefinementState:
"""Generate or improve draft."""
if state["iterations"] == 0:
prompt = f"Create a first draft for: {state['task']}"
else:
prompt = f"""
Improve this draft based on feedback:
Draft: {state['draft']}
Feedback: {state['feedback']}
"""
response = llm.invoke(prompt)
return {
"draft": response.content,
"iterations": state["iterations"] + 1
}
def evaluate_draft(state: RefinementState) -> RefinementState:
"""Score the current draft."""
prompt = f"""
Rate this draft from 0-10 for the task "{state['task']}":
{state['draft']}
Return a JSON object: {{"score": N, "feedback": "..."}}
"""
response = llm.invoke(prompt)
# Parse response (simplified)
import json
try:
result = json.loads(response.content)
return {
"score": result["score"] / 10, # Normalize to 0-1
"feedback": result["feedback"]
}
except:
return {"score": 0.5, "feedback": "Could not parse evaluation"}
def finalize(state: RefinementState) -> RefinementState:
"""Finalize the output."""
return {"final": state["draft"]}
def should_continue(state: RefinementState) -> Literal["refine", "finalize"]:
"""Decide whether to continue refining."""
if state["score"] >= 0.8: # Quality threshold
return "finalize"
if state["iterations"] >= state["max_iterations"]:
return "finalize"
return "refine"
graph = StateGraph(RefinementState)
graph.add_node("generate", generate_draft)
graph.add_node("evaluate", evaluate_draft)
graph.add_node("finalize", finalize)
graph.set_entry_point("generate")
graph.add_edge("generate", "evaluate")
graph.add_conditional_edges(
"evaluate",
should_continue,
{"refine": "generate", "finalize": "finalize"}
)
graph.add_edge("finalize", END)
refiner = graph.compile()
Pattern 4: Parallel Execution
Execute multiple paths simultaneously:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from operator import add
import asyncio
class ParallelState(TypedDict):
query: str
sql_result: str
nosql_result: str
cache_result: str
combined_result: str
async def query_sql(state: ParallelState) -> ParallelState:
"""Query SQL database."""
await asyncio.sleep(0.5) # Simulate query
return {"sql_result": f"SQL results for: {state['query']}"}
async def query_nosql(state: ParallelState) -> ParallelState:
"""Query NoSQL database."""
await asyncio.sleep(0.3) # Simulate query
return {"nosql_result": f"NoSQL results for: {state['query']}"}
async def query_cache(state: ParallelState) -> ParallelState:
"""Query cache."""
await asyncio.sleep(0.1) # Simulate query
return {"cache_result": f"Cache results for: {state['query']}"}
def combine_results(state: ParallelState) -> ParallelState:
"""Combine all query results."""
combined = f"""
SQL: {state.get('sql_result', 'N/A')}
NoSQL: {state.get('nosql_result', 'N/A')}
Cache: {state.get('cache_result', 'N/A')}
"""
return {"combined_result": combined}
# Build graph with parallel branches
graph = StateGraph(ParallelState)
graph.add_node("sql", query_sql)
graph.add_node("nosql", query_nosql)
graph.add_node("cache", query_cache)
graph.add_node("combine", combine_results)
# Fan-out from entry to parallel nodes
graph.set_entry_point("sql") # Start with sql
# Note: True parallelism requires async execution
# Fan-in to combine
graph.add_edge("sql", "nosql")
graph.add_edge("nosql", "cache")
graph.add_edge("cache", "combine")
graph.add_edge("combine", END)
parallel_agent = graph.compile()
Pattern 5: Supervisor Agent
A coordinator that delegates to specialized sub-agents:
class SupervisorState(TypedDict):
task: str
subtasks: list[dict]
current_subtask_index: int
results: Annotated[list[str], add]
final_answer: str
def plan_subtasks(state: SupervisorState) -> SupervisorState:
"""Break task into subtasks."""
prompt = f"""
Break this task into subtasks:
{state['task']}
Return as JSON: [{{"id": 1, "description": "...", "agent": "sql|code|analysis"}}]
"""
response = llm.invoke(prompt)
import json
try:
subtasks = json.loads(response.content)
except:
subtasks = [{"id": 1, "description": state["task"], "agent": "analysis"}]
return {"subtasks": subtasks, "current_subtask_index": 0}
def execute_subtask(state: SupervisorState) -> SupervisorState:
"""Execute current subtask with appropriate agent."""
idx = state["current_subtask_index"]
subtask = state["subtasks"][idx]
# Dispatch to appropriate handler
if subtask["agent"] == "sql":
result = execute_sql_subtask(subtask["description"])
elif subtask["agent"] == "code":
result = execute_code_subtask(subtask["description"])
else:
result = execute_analysis_subtask(subtask["description"])
return {
"results": [result],
"current_subtask_index": idx + 1
}
def execute_sql_subtask(description: str) -> str:
response = llm.invoke(f"Generate SQL for: {description}")
return f"SQL: {response.content}"
def execute_code_subtask(description: str) -> str:
response = llm.invoke(f"Write code for: {description}")
return f"Code: {response.content}"
def execute_analysis_subtask(description: str) -> str:
response = llm.invoke(f"Analyze: {description}")
return f"Analysis: {response.content}"
def synthesize_results(state: SupervisorState) -> SupervisorState:
"""Combine all subtask results."""
prompt = f"""
Synthesize these results into a coherent answer:
Original task: {state['task']}
Results:
{chr(10).join(state['results'])}
"""
response = llm.invoke(prompt)
return {"final_answer": response.content}
def should_continue_subtasks(state: SupervisorState) -> Literal["execute", "synthesize"]:
"""Check if more subtasks remain."""
if state["current_subtask_index"] < len(state["subtasks"]):
return "execute"
return "synthesize"
graph = StateGraph(SupervisorState)
graph.add_node("plan", plan_subtasks)
graph.add_node("execute", execute_subtask)
graph.add_node("synthesize", synthesize_results)
graph.set_entry_point("plan")
graph.add_edge("plan", "execute")
graph.add_conditional_edges(
"execute",
should_continue_subtasks,
{"execute": "execute", "synthesize": "synthesize"}
)
graph.add_edge("synthesize", END)
supervisor = graph.compile()
Pattern 6: Human-in-the-Loop
Pause for human input:
from langgraph.checkpoint.sqlite import SqliteSaver
class HITLState(TypedDict):
request: str
proposed_action: str
human_approved: bool
execution_result: str
def propose_action(state: HITLState) -> HITLState:
"""Propose an action for human approval."""
prompt = f"Propose an action for: {state['request']}"
response = llm.invoke(prompt)
return {"proposed_action": response.content}
def await_human_approval(state: HITLState) -> HITLState:
"""This node pauses for human input."""
# In practice, this would wait for external input
# The graph execution pauses here
return {} # State updated externally
def execute_action(state: HITLState) -> HITLState:
"""Execute the approved action."""
if state["human_approved"]:
return {"execution_result": f"Executed: {state['proposed_action']}"}
return {"execution_result": "Action rejected by human"}
def route_approval(state: HITLState) -> Literal["execute", "end"]:
return "execute" if state.get("human_approved") else "end"
graph = StateGraph(HITLState)
graph.add_node("propose", propose_action)
graph.add_node("await_approval", await_human_approval)
graph.add_node("execute", execute_action)
graph.set_entry_point("propose")
graph.add_edge("propose", "await_approval")
graph.add_conditional_edges(
"await_approval",
route_approval,
{"execute": "execute", "end": END}
)
graph.add_edge("execute", END)
# Use checkpointer for persistence
checkpointer = SqliteSaver.from_conn_string(":memory:")
hitl_agent = graph.compile(checkpointer=checkpointer)
# Start execution (pauses at await_approval)
config = {"configurable": {"thread_id": "user-123"}}
result = hitl_agent.invoke(
{"request": "Delete all records from staging table"},
config
)
# Later: resume with human decision
hitl_agent.update_state(config, {"human_approved": True})
final_result = hitl_agent.invoke(None, config) # Resume from checkpoint
Composition Best Practices
- Single Responsibility: Each node does one thing
- Explicit State: All data flows through typed state
- Idempotent Nodes: Nodes should be safe to retry
- Clear Routing Logic: Decision functions should be testable
- Bounded Iterations: Always limit cycles
# Good: Clear, testable routing
def route_decision(state: State) -> Literal["a", "b", "c"]:
if state["condition_a"]:
return "a"
elif state["condition_b"]:
return "b"
return "c"
# Bad: Complex logic in routing
def route_decision(state: State) -> str:
# Don't do complex processing here
result = complex_calculation(state)
return result
Conclusion
Graph-based agents provide the flexibility needed for real-world AI applications. Master these patterns, and you can build agents that handle complex, multi-step tasks reliably.
Start with simple patterns, combine as needed, and always keep your graphs testable and observable.