Swarm Workflow Pattern

Overview

The Swarm pattern implements OpenAI’s Swarm framework for multi-agent handoffs, enabling seamless context transfer between specialized agents based on conversation flow and requirements.

Complete Implementation

The Swarm pattern implements OpenAI’s Swarm framework for seamless multi-agent handoffs with context preservation. Here’s a comprehensive airline customer service implementation:

Basic Swarm Setup

import asyncio
import os
from mcp_agent.app import MCPApp
from mcp_agent.workflows.swarm.swarm import DoneAgent, SwarmAgent
from mcp_agent.workflows.swarm.swarm_anthropic import AnthropicSwarm
from mcp_agent.human_input.handler import console_input_callback

app = MCPApp(
    name="airline_customer_service", 
    human_input_callback=console_input_callback
)

# Define transfer functions between agents
def transfer_to_flight_modification():
    """Transfer to agent that handles flight modifications"""
    return flight_modification

def transfer_to_lost_baggage():
    """Transfer to agent that handles lost baggage"""
    return lost_baggage

def transfer_to_flight_cancel():
    """Transfer to agent that handles flight cancellations"""
    return flight_cancel

def transfer_to_flight_change():
    """Transfer to agent that handles flight changes"""
    return flight_change

def case_resolved():
    """Resolve the case and end the conversation"""
    return DoneAgent()

# Utility functions
def escalate_to_agent(reason=None):
    """Escalate to a human agent"""
    return f"Escalating to agent: {reason}" if reason else "Escalating to agent"

def change_flight():
    """Change the customer's flight"""
    return "Flight was successfully changed!"

def initiate_refund():
    """Process a refund for the customer"""
    return "Refund initiated successfully"

# Create specialized swarm agents
def create_triage_agent():
    """Creates the initial triage agent"""
    return SwarmAgent(
        name="Triage Agent",
        instruction=lambda context_variables: f"""
        You are to triage a user's request, and call a tool to transfer to the right intent.
        Once you are ready to transfer to the right intent, call the tool to transfer to the right intent.
        You don't need to know specifics, just the topic of the request.
        When you need more information to triage the request to an agent, ask a direct question.
        Do not share your thought process with the user!
        
        Customer context: {context_variables.get("customer_context", "None")}
        Flight context: {context_variables.get("flight_context", "None")}
        """,
        functions=[transfer_to_flight_modification, transfer_to_lost_baggage],
        human_input_callback=console_input_callback,
    )

def create_flight_modification_agent():
    """Creates the flight modification routing agent"""
    return SwarmAgent(
        name="Flight Modification Agent",
        instruction=lambda context_variables: f"""
        You are a Flight Modification Agent for a customer service airlines company.
        You are an expert customer service agent deciding which sub intent the user should be referred to.
        You already know the intent is for flight modification related questions.
        
        First, look at message history and see if you can determine if the user wants to 
        cancel or change their flight.
        
        Ask user clarifying questions until you know whether it is a cancel request 
        or change flight request. Once you know, call the appropriate transfer function.
        Either ask clarifying questions, or call one of your functions, every time.
        
        Customer context: {context_variables.get("customer_context", "None")}
        Flight context: {context_variables.get("flight_context", "None")}
        """,
        functions=[transfer_to_flight_cancel, transfer_to_flight_change],
        server_names=["fetch", "filesystem"],
        human_input_callback=console_input_callback,
    )

async def run_airline_swarm():
    async with app.run() as context:
        # Add current directory to filesystem server
        context.config.mcp.servers["filesystem"].args.extend([os.getcwd()])

        # Set up customer context
        context_variables = {
            "customer_context": """Customer details:
1. CUSTOMER_ID: customer_12345
2. NAME: John Doe
3. PHONE_NUMBER: (123) 456-7890
4. EMAIL: johndoe@example.com
5. STATUS: Premium
6. ACCOUNT_STATUS: Active
7. BALANCE: $0.00
8. LOCATION: 1234 Main St, San Francisco, CA 94123, USA
""",
            "flight_context": """Flight Information:
Flight from LGA (LaGuardia) NYC to LAX Los Angeles
Flight #: 1919
Departure: 3pm ET, 5/21/2024
"""
        }

        # Create and initialize the triage agent
        triage_agent = create_triage_agent()
        triage_agent.instruction = triage_agent.instruction(context_variables)
        
        # Initialize the swarm with triage agent
        swarm = AnthropicSwarm(
            agent=triage_agent, 
            context_variables=context_variables
        )

        # Test different customer inquiries
        test_inquiries = [
            "My bag was not delivered!",  # Should route to lost baggage
            "I want to cancel my flight please",  # Should route to flight modification
            "I want to change my flight to one day earlier!",  # Should route to flight change
        ]

        for inquiry in test_inquiries:
            print(f"\n=== Customer Inquiry: {inquiry} ===")
            result = await swarm.generate_str(inquiry)
            print(f"Swarm Response: {result}")
            
            # Reset to triage agent for next test
            await swarm.set_agent(triage_agent)

        await triage_agent.shutdown()

if __name__ == "__main__":
    asyncio.run(run_airline_swarm())

Advanced Swarm Configuration

Multi-Provider Support

# Use different providers for different agents
from mcp_agent.workflows.swarm.swarm_openai import OpenAISwarm

# OpenAI-powered swarm for complex reasoning
openai_swarm = OpenAISwarm(
    agent=triage_agent,
    context_variables=context_variables,
    model="gpt-4o",
    temperature=0.3
)

# Anthropic-powered swarm for detailed analysis
anthropic_swarm = AnthropicSwarm(
    agent=analysis_agent,
    context_variables=context_variables,
    model="claude-3-5-sonnet-20241022"
)

Complex Agent Hierarchies

# Create a comprehensive customer service swarm
def create_comprehensive_swarm():
    # Specialized domain agents
    flight_cancel_agent = SwarmAgent(
        name="Flight Cancellation Specialist",
        instruction="""Handle flight cancellation requests following company policy.
        Check eligibility, process refunds or credits, and resolve the case.""",
        functions=[
            escalate_to_agent,
            initiate_refund,
            initiate_flight_credits,
            case_resolved,
        ],
        server_names=["fetch", "filesystem"],
    )

    flight_change_agent = SwarmAgent(
        name="Flight Change Specialist", 
        instruction="""Handle flight change requests following company policy.
        Validate eligibility, process changes, and confirm new booking.""",
        functions=[
            escalate_to_agent,
            change_flight,
            valid_to_change_flight,
            case_resolved,
        ],
        server_names=["fetch", "filesystem"],
    )

    baggage_agent = SwarmAgent(
        name="Baggage Specialist",
        instruction="""Handle lost baggage inquiries following company policy.
        Initiate searches, provide updates, and resolve cases.""",
        functions=[
            escalate_to_agent,
            initiate_baggage_search,
            case_resolved,
        ],
        server_names=["fetch", "filesystem"],
    )

    # Update transfer functions to use these agents
    def transfer_to_flight_cancel():
        return flight_cancel_agent
    
    def transfer_to_flight_change():
        return flight_change_agent
    
    def transfer_to_lost_baggage():
        return baggage_agent

    return {
        "triage": create_triage_agent(),
        "flight_modification": create_flight_modification_agent(), 
        "flight_cancel": flight_cancel_agent,
        "flight_change": flight_change_agent,
        "baggage": baggage_agent,
    }

Context Management

# Advanced context management for swarm agents
async def run_contextual_swarm():
    context_variables = {
        "customer_context": get_customer_details(),
        "flight_context": get_flight_details(),
        "conversation_history": [],
        "escalation_count": 0,
        "resolution_attempts": 0,
    }

    swarm = AnthropicSwarm(
        agent=triage_agent,
        context_variables=context_variables
    )

    # Context is preserved across agent handoffs
    result = await swarm.generate_str(
        "I need to cancel my flight and get a refund"
    )
    
    # Access updated context after processing
    updated_context = swarm.context_variables
    print(f"Escalations: {updated_context['escalation_count']}")
    print(f"Resolution attempts: {updated_context['resolution_attempts']}")

Key Features

  • Automatic Handoffs: Context-aware agent switching based on conversation flow
  • Context Preservation: Full conversation history maintained across handoffs
  • Trigger-Based Routing: Configurable keywords and confidence thresholds
  • Bidirectional Communication: Agents can hand back to previous agents
  • State Management: Maintains conversation state and agent history

Use Cases

Customer Service Operations

Perfect for complex customer service scenarios requiring specialized expertise:
  • Airline Support: Triage → Flight modifications → Cancellations/Changes → Resolution
  • Tech Support: L1 Support → L2 Technical → L3 Engineering → Management escalation
  • E-commerce: General inquiry → Product specialist → Payment issues → Fulfillment
  • Banking: Customer service → Account specialist → Fraud team → Branch manager

Multi-Domain Consultation

Handle requests requiring different areas of expertise:
  • Legal Services: Intake → Paralegal → Attorney → Specialist counsel
  • Healthcare: Nurse triage → General practitioner → Specialist → Care coordinator
  • Real Estate: Initial inquiry → Agent → Mortgage specialist → Closing coordinator
  • Education: Admissions → Academic advisor → Financial aid → Student services

Progressive Problem Solving

Start broad and become increasingly specialized:
  • Software Development: Help desk → Developer → Architect → Product manager
  • Research Projects: Research assistant → Subject expert → Principal investigator
  • Content Creation: Writer → Editor → SEO specialist → Publication manager
  • Sales Process: Lead qualification → Sales rep → Technical sales → Account manager

Workflow Processing Pipelines

Pass tasks through specialized processing stages:
  • Document Processing: OCR → Data extraction → Validation → Archive
  • Content Moderation: Auto-filter → Human review → Policy expert → Appeals
  • Quality Assurance: Automated testing → Manual QA → Security review → Release
  • Hiring Process: Resume screening → Phone screen → Technical interview → Final decision

Setup and Installation

Clone the repository and navigate to the swarm workflow example:
git clone https://github.com/lastmile-ai/mcp-agent.git
cd mcp-agent/examples/workflows/workflow_swarm
Install dependencies:
pip install uv
uv sync
uv pip install -r requirements.txt
Configure your environment:
cp mcp_agent.secrets.yaml.example mcp_agent.secrets.yaml
Add your API keys to mcp_agent.secrets.yaml:
openai_api_key: "your-openai-api-key"
anthropic_api_key: "your-anthropic-api-key"  # recommended for swarm
Run the airline customer service example:
uv run main.py

Configuration Examples

Human Input Integration

from mcp_agent.human_input.handler import console_input_callback

# Enable human input for agent interactions
app = MCPApp(
    name="customer_service_swarm",
    human_input_callback=console_input_callback
)

# Agents can request human input during conversations
agent = SwarmAgent(
    name="Customer Service Rep",
    instruction="Ask clarifying questions when needed",
    human_input_callback=console_input_callback
)

Policy-Driven Agents

Create agents that follow specific company policies:
# Create agent that follows documented policies
policy_agent = SwarmAgent(
    name="Policy Agent",
    instruction="""Follow the company policy strictly. 
    Read the policy file and execute each step in order.
    Policy file: policies/refund_policy.md""",
    functions=[process_refund, escalate_to_supervisor, case_resolved],
    server_names=["filesystem"]  # Access to policy files
)

Dynamic Context Variables

# Context variables that update during conversation
context_variables = {
    "customer_tier": "premium",
    "case_priority": "normal", 
    "escalation_count": 0,
    "policies_consulted": [],
    "resolution_attempts": 0,
    "customer_satisfaction": None
}

# Agents can update context during processing
def update_customer_tier(new_tier):
    context_variables["customer_tier"] = new_tier
    return f"Customer tier updated to {new_tier}"

Expected Output

The swarm will intelligently route customer inquiries and provide contextual responses:
=== Customer Inquiry: "My bag was not delivered!" ===
[Triage Agent] I understand you're having an issue with your baggage. 
Let me transfer you to our baggage specialist who can help locate your bag.

[Transferring to: Lost Baggage Agent]

[Baggage Specialist] I'm sorry to hear about your missing bag. Let me initiate 
a search using your flight information. I've started a baggage search for 
flight 1919 from LGA to LAX on 5/21/2024.

Search Result: Baggage was found!

[Baggage Specialist] Great news! We've located your bag. It will be delivered 
to your address within 24 hours. Is there anything else I can help you with?

=== Case Resolved ===
The swarm maintains conversation context, automatically hands off between appropriate specialists, and follows company policies throughout the interaction.

Full Implementation

See the complete swarm pattern implementation with OpenAI Swarm compatibility.