Back to Blog
6 min read

Multi-Agent Frameworks: Orchestrating AI Agent Collaboration

Single agents are powerful, but complex enterprise tasks often require multiple specialized agents working together. Let’s explore how to build multi-agent systems that can tackle sophisticated workflows.

Multi-Agent Architecture Patterns

Pattern 1: Sequential Pipeline

Agent A → Agent B → Agent C → Result
(Research)  (Analyze)  (Report)
from azure.ai.foundry.agents import Agent, Pipeline

class SequentialPipeline:
    """Agents execute in sequence, passing results forward."""

    def __init__(self, agents: list[Agent]):
        self.agents = agents

    async def run(self, initial_input: str) -> dict:
        current_input = initial_input
        results = []

        for agent in self.agents:
            result = await agent.run(current_input)
            results.append({
                "agent": agent.name,
                "input": current_input,
                "output": result.output
            })
            current_input = result.output

        return {
            "final_output": current_input,
            "pipeline_trace": results
        }

# Create specialized agents
research_agent = Agent(
    name="Researcher",
    model="gpt-4o",
    instructions="Research the given topic thoroughly. Output key findings."
)

analysis_agent = Agent(
    name="Analyst",
    model="gpt-4o",
    instructions="Analyze the research findings. Identify patterns and insights."
)

writer_agent = Agent(
    name="Writer",
    model="gpt-4o",
    instructions="Create a professional report from the analysis."
)

pipeline = SequentialPipeline([research_agent, analysis_agent, writer_agent])
result = await pipeline.run("Azure Fabric adoption trends in 2024")

Pattern 2: Parallel Execution

         ┌→ Agent A ─┐
Input ───┼→ Agent B ──┼→ Aggregator → Result
         └→ Agent C ─┘
import asyncio

class ParallelAgents:
    """Agents execute in parallel, results are aggregated."""

    def __init__(self, agents: list[Agent], aggregator: Agent):
        self.agents = agents
        self.aggregator = aggregator

    async def run(self, task: str) -> str:
        # Run all agents in parallel
        tasks = [agent.run(task) for agent in self.agents]
        results = await asyncio.gather(*tasks)

        # Aggregate results
        combined_input = "\n\n".join([
            f"=== {agent.name} Analysis ===\n{result.output}"
            for agent, result in zip(self.agents, results)
        ])

        aggregation_prompt = f"""Multiple experts have analyzed the following task:
Task: {task}

Expert Analyses:
{combined_input}

Synthesize these perspectives into a comprehensive response."""

        final_result = await self.aggregator.run(aggregation_prompt)
        return final_result.output

# Create parallel experts
technical_agent = Agent(
    name="TechnicalExpert",
    instructions="Analyze from a technical architecture perspective."
)

business_agent = Agent(
    name="BusinessExpert",
    instructions="Analyze from a business value and ROI perspective."
)

security_agent = Agent(
    name="SecurityExpert",
    instructions="Analyze from a security and compliance perspective."
)

synthesizer = Agent(
    name="Synthesizer",
    instructions="Combine multiple expert perspectives into actionable recommendations."
)

parallel_system = ParallelAgents(
    agents=[technical_agent, business_agent, security_agent],
    aggregator=synthesizer
)

result = await parallel_system.run("Should we migrate our data warehouse to Microsoft Fabric?")

Pattern 3: Router-Based Dispatch

              ┌→ Agent A (SQL)
Input → Router┼→ Agent B (Python)
              └→ Agent C (Analysis)
from azure.ai.foundry.agents import Router, RoutingStrategy

class SemanticRouter:
    """Route tasks to the most appropriate agent."""

    def __init__(self, agents: dict[str, Agent], router_model: str = "gpt-4o"):
        self.agents = agents
        self.router_model = router_model

    async def route(self, task: str) -> str:
        # Use LLM to classify the task
        routing_prompt = f"""Classify this task and select the best agent.

Available agents:
{self._format_agent_descriptions()}

Task: {task}

Respond with just the agent name."""

        classification = await self.client.chat.complete(
            model=self.router_model,
            messages=[{"role": "user", "content": routing_prompt}]
        )

        agent_name = classification.choices[0].message.content.strip()

        if agent_name not in self.agents:
            agent_name = "default"

        return await self.agents[agent_name].run(task)

    def _format_agent_descriptions(self) -> str:
        return "\n".join([
            f"- {name}: {agent.instructions[:100]}..."
            for name, agent in self.agents.items()
        ])

# Create specialized agents
sql_agent = Agent(
    name="SQLExpert",
    instructions="You write and optimize SQL queries for data warehouses."
)

python_agent = Agent(
    name="PythonExpert",
    instructions="You write Python code for data processing and analysis."
)

visualization_agent = Agent(
    name="VisualizationExpert",
    instructions="You create data visualizations and dashboards."
)

router = SemanticRouter({
    "SQLExpert": sql_agent,
    "PythonExpert": python_agent,
    "VisualizationExpert": visualization_agent,
    "default": sql_agent
})

result = await router.route("Write a query to find duplicate customer records")

Pattern 4: Hierarchical Teams

        Supervisor
       /    |    \
   Lead A  Lead B  Lead C
   /   \     |      |
  A1   A2    B1    C1
class AgentTeam:
    """Hierarchical team with supervisor and workers."""

    def __init__(
        self,
        supervisor: Agent,
        workers: dict[str, Agent],
        max_delegation_depth: int = 2
    ):
        self.supervisor = supervisor
        self.workers = workers
        self.max_depth = max_delegation_depth

    async def run(self, task: str, depth: int = 0) -> dict:
        if depth >= self.max_depth:
            # Direct execution at max depth
            return await self.supervisor.run(task)

        # Supervisor decides how to handle
        planning_prompt = f"""You are a team supervisor. Decide how to handle this task.

Available team members:
{self._format_workers()}

Task: {task}

Options:
1. Handle it yourself
2. Delegate to a team member
3. Break into subtasks for multiple team members

Respond in JSON:
{{"action": "self|delegate|split", "assignments": [{{"worker": "name", "subtask": "description"}}]}}"""

        decision = await self.supervisor.run(planning_prompt)
        plan = json.loads(decision.output)

        if plan["action"] == "self":
            return await self.supervisor.run(task)

        elif plan["action"] == "delegate":
            assignment = plan["assignments"][0]
            worker = self.workers[assignment["worker"]]
            return await worker.run(assignment["subtask"])

        else:  # split
            results = []
            for assignment in plan["assignments"]:
                worker = self.workers[assignment["worker"]]
                result = await worker.run(assignment["subtask"])
                results.append({
                    "worker": assignment["worker"],
                    "task": assignment["subtask"],
                    "result": result.output
                })

            # Supervisor synthesizes
            synthesis_prompt = f"""Original task: {task}

Subtask results:
{json.dumps(results, indent=2)}

Synthesize into a final response."""

            return await self.supervisor.run(synthesis_prompt)

Building a Data Engineering Multi-Agent System

class DataEngineeringTeam:
    """Complete data engineering multi-agent system."""

    def __init__(self):
        self.schema_agent = Agent(
            name="SchemaAnalyst",
            model="gpt-4o",
            instructions="Analyze database schemas and data models.",
            tools=[SchemaInspectorTool()]
        )

        self.query_agent = Agent(
            name="QueryWriter",
            model="gpt-4o",
            instructions="Write optimized SQL queries.",
            tools=[SQLExecutorTool()]
        )

        self.pipeline_agent = Agent(
            name="PipelineDesigner",
            model="gpt-4o",
            instructions="Design data pipelines and ETL processes."
        )

        self.quality_agent = Agent(
            name="QualityAnalyst",
            model="gpt-4o",
            instructions="Assess data quality and suggest improvements.",
            tools=[DataProfilingTool()]
        )

        self.coordinator = Agent(
            name="Coordinator",
            model="gpt-4o",
            instructions="""You coordinate a data engineering team.
            Delegate tasks appropriately and synthesize results."""
        )

    async def handle_request(self, request: str) -> str:
        # Coordinator decides approach
        plan = await self.coordinator.run(f"Plan approach for: {request}")

        # Execute based on plan
        if "schema" in plan.output.lower():
            schema_result = await self.schema_agent.run(request)

        if "query" in plan.output.lower():
            query_result = await self.query_agent.run(request)

        if "pipeline" in plan.output.lower():
            pipeline_result = await self.pipeline_agent.run(request)

        if "quality" in plan.output.lower():
            quality_result = await self.quality_agent.run(request)

        # Coordinator synthesizes
        synthesis = await self.coordinator.run(
            f"Synthesize team results for: {request}"
        )

        return synthesis.output

# Usage
team = DataEngineeringTeam()
result = await team.handle_request(
    "Design a customer 360 data pipeline with quality checks"
)

Handling Inter-Agent Communication

from dataclasses import dataclass
from typing import Any
from collections import deque

@dataclass
class Message:
    sender: str
    recipient: str
    content: Any
    message_type: str  # "request", "response", "broadcast"

class MessageBus:
    """Central communication hub for agents."""

    def __init__(self):
        self.queues: dict[str, deque] = {}
        self.handlers: dict[str, callable] = {}

    def register(self, agent_name: str, handler: callable):
        self.queues[agent_name] = deque()
        self.handlers[agent_name] = handler

    async def send(self, message: Message):
        if message.recipient == "broadcast":
            for queue in self.queues.values():
                queue.append(message)
        else:
            self.queues[message.recipient].append(message)

    async def process(self, agent_name: str):
        while self.queues[agent_name]:
            message = self.queues[agent_name].popleft()
            response = await self.handlers[agent_name](message)
            if response:
                await self.send(response)

# Usage
bus = MessageBus()
bus.register("QueryAgent", query_agent_handler)
bus.register("SchemaAgent", schema_agent_handler)

await bus.send(Message(
    sender="Coordinator",
    recipient="QueryAgent",
    content="Write a customer segmentation query",
    message_type="request"
))

Multi-agent systems unlock sophisticated automation capabilities. Start with simple patterns and evolve complexity as needed.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.