Orchestrator Workflow Pattern Orchestrator execution example

Overview

The Orchestrator pattern handles complex, multi-step tasks through dynamic planning, parallel execution, and intelligent result synthesis. It breaks down objectives into manageable steps and coordinates specialized agents.

Complete Implementation

The Orchestrator workflow handles complex multi-step tasks through dynamic planning and coordination. Here’s a comprehensive implementation:

Basic Orchestrator Setup

import asyncio
import os
from mcp_agent.app import MCPApp
from mcp_agent.agents.agent import Agent
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM
from mcp_agent.workflows.orchestrator.orchestrator import Orchestrator
from mcp_agent.workflows.llm.augmented_llm import RequestParams

app = MCPApp(name="assignment_grader_orchestrator")

async def run_orchestrator_example():
    async with app.run() as context:
        # Add current directory to filesystem server
        context.config.mcp.servers["filesystem"].args.extend([os.getcwd()])

        # Create specialized worker agents
        finder_agent = Agent(
            name="finder",
            instruction="""You are an agent with access to the filesystem and web fetching capabilities. 
            Your job is to identify the closest match to a user's request, make the appropriate tool calls, 
            and return the URI and CONTENTS of the closest match.""",
            server_names=["fetch", "filesystem"]
        )

        writer_agent = Agent(
            name="writer",
            instruction="""You are an agent that can write to the filesystem.
            You are tasked with taking the user's input, addressing it, and 
            writing the result to disk in the appropriate location.""",
            server_names=["filesystem"]
        )

        proofreader = Agent(
            name="proofreader",
            instruction="""Review the short story for grammar, spelling, and punctuation errors.
            Identify any awkward phrasing or structural issues that could improve clarity. 
            Provide detailed feedback on corrections.""",
            server_names=["fetch"]
        )

        fact_checker = Agent(
            name="fact_checker",
            instruction="""Verify the factual consistency within the story. Identify any contradictions,
            logical inconsistencies, or inaccuracies in the plot, character actions, or setting. 
            Highlight potential issues with reasoning or coherence.""",
            server_names=["fetch"]
        )

        style_enforcer = Agent(
            name="style_enforcer",
            instruction="""Analyze the story for adherence to style guidelines.
            Evaluate the narrative flow, clarity of expression, and tone. Suggest improvements to 
            enhance storytelling, readability, and engagement.""",
            server_names=["fetch"]
        )

        # Define the complex multi-step task
        task = """Load the student's short story from short_story.md, 
        and generate a report with feedback across proofreading, 
        factuality/logical consistency and style adherence. Use the style rules from 
        https://owl.purdue.edu/owl/research_and_citation/apa_style/apa_formatting_and_style_guide/general_format.html.
        Write the graded report to graded_report.md in the same directory as short_story.md"""

        # Create orchestrator with different planning strategies
        orchestrator = Orchestrator(
            llm_factory=OpenAIAugmentedLLM,
            available_agents=[
                finder_agent,
                writer_agent,
                proofreader,
                fact_checker,
                style_enforcer,
            ],
            plan_type="full",  # Generate complete plan upfront
            name="assignment_grader",
        )

        # Execute with custom parameters
        result = await orchestrator.generate_str(
            message=task,
            request_params=RequestParams(model="gpt-4o")
        )
        
        print("Orchestrator Result:")
        print(result)

        # Display token usage analysis
        node = await orchestrator.get_token_node()
        if node:
            display_token_usage(node, context)

        return result

def display_token_usage(node, context, indent="", is_last=True):
    """Display hierarchical token usage from orchestrator execution"""
    connector = "└── " if is_last else "├── "
    usage = node.get_usage()
    cost = node.get_cost() if hasattr(node, "get_cost") else 0.0
    
    if usage.total_tokens > 0:
        cost_str = f" (${cost:.4f})" if cost > 0 else ""
        print(f"{indent}{connector}{node.name} [{node.node_type}]")
        print(f"{indent}{'    ' if is_last else '│   '}├─ Total: {usage.total_tokens:,} tokens{cost_str}")
        print(f"{indent}{'    ' if is_last else '│   '}├─ Input: {usage.input_tokens:,}")
        print(f"{indent}{'    ' if is_last else '│   '}└─ Output: {usage.output_tokens:,}")

        if node.children:
            child_indent = indent + ("    " if is_last else "│   ")
            for i, child in enumerate(node.children):
                display_token_usage(child, context, child_indent, i == len(node.children) - 1)

if __name__ == "__main__":
    asyncio.run(run_orchestrator_example())

Advanced Planning Strategies

The orchestrator supports different planning approaches:

Full Planning

Generate complete execution plan upfront:
# Create orchestrator with full planning
orchestrator = Orchestrator(
    llm_factory=OpenAIAugmentedLLM,
    available_agents=agents,
    plan_type="full",  # Generate complete plan upfront
    max_planning_steps=10,
    name="full_planner",
)

# The orchestrator will create a plan like this:
# Step 1: Load the short story from short_story.md (finder_agent)
# Step 2: Generate feedback in parallel:
#   - Review for grammar/spelling (proofreader)
#   - Check factual consistency (fact_checker) 
#   - Evaluate style adherence (style_enforcer)
# Step 3: Compile feedback into report (writer_agent)
# Step 4: Write graded report to disk (writer_agent)

Iterative Planning

Plan one step at a time, adapting based on results:
# Create orchestrator with iterative planning
orchestrator = Orchestrator(
    llm_factory=OpenAIAugmentedLLM,
    available_agents=agents,
    plan_type="iterative",  # Plan step by step
    max_iterations=20,
    name="iterative_planner",
)

# The orchestrator will:
# 1. Plan first step based on objective
# 2. Execute the step
# 3. Analyze results and plan next step
# 4. Repeat until objective is complete

Configuration and Monitoring

from mcp_agent.tracing.token_counter import TokenNode

# Advanced orchestrator configuration
orchestrator = Orchestrator(
    llm_factory=OpenAIAugmentedLLM,
    available_agents=agents,
    plan_type="full",
    
    # Execution parameters
    max_iterations=25,
    max_retries_per_task=3,
    parallel_execution=True,
    max_parallel_tasks=3,
    
    # Planning parameters  
    max_planning_steps=15,
    planning_temperature=0.7,
    planning_model="gpt-4o",
    
    # Monitoring
    enable_detailed_logging=True,
    name="production_orchestrator",
)

# Execute with monitoring
async def monitored_execution():
    result = await orchestrator.generate_str(
        message=complex_task,
        request_params=RequestParams(
            model="gpt-4o",
            temperature=0.3,
            max_tokens=4000
        )
    )
    
    # Get execution summary
    summary = await orchestrator.get_execution_summary()
    print(f"Total steps executed: {summary.steps_completed}")
    print(f"Parallel tasks run: {summary.parallel_tasks}")
    print(f"Total cost: ${summary.total_cost:.4f}")
    print(f"Execution time: {summary.execution_time:.2f}s")
    
    return result

Key Features

  • Dynamic Planning: Breaks down complex objectives into manageable steps
  • Parallel Execution: Tasks within each step run simultaneously
  • Iterative vs Full Planning: Choose between adaptive or upfront planning
  • Context Preservation: Previous results inform subsequent steps
  • Intelligent Synthesis: Combines results from multiple specialized agents

Planning Modes

Full Planning

orchestrator = Orchestrator(
    worker_agents=agents,
    plan_type="full"  # Generate complete plan upfront
)

Iterative Planning

orchestrator = Orchestrator(
    worker_agents=agents, 
    plan_type="iterative"  # Plan one step at a time
)

Use Cases

Complex Development Projects

Handle multi-step software development tasks with dependencies:
  • Code Refactoring: Analyze codebase, identify issues, implement fixes across multiple files
  • Feature Implementation: Requirements analysis, design, implementation, testing, documentation
  • Bug Resolution: Reproduce issue, analyze root cause, implement fix, verify solution
  • CI/CD Pipeline Setup: Configure build scripts, set up testing, deploy infrastructure

Research and Analysis

Coordinate comprehensive research workflows:
  • Literature Reviews: Search databases, analyze papers, synthesize findings, write summaries
  • Market Research: Gather competitor data, analyze trends, survey customers, compile reports
  • Financial Analysis: Collect financial data, perform calculations, generate insights, create presentations
  • Due Diligence: Legal review, technical assessment, financial audit, risk analysis

Content Production

Multi-stage content creation with quality assurance:
  • Technical Documentation: Research topic, write content, review accuracy, format for publication
  • Marketing Campaigns: Market analysis, content creation, design assets, campaign testing
  • Academic Papers: Literature review, data analysis, writing, peer review, revision
  • Product Launches: Requirements gathering, specification writing, testing, documentation

Operations and Automation

Complex operational workflows requiring coordination:
  • Incident Response: Alert analysis, impact assessment, resolution planning, implementation
  • Compliance Audits: Data collection, policy review, gap analysis, remediation planning
  • System Migrations: Current state analysis, migration planning, execution, validation
  • Performance Optimization: Monitoring analysis, bottleneck identification, optimization implementation

Setup and Installation

Clone the repository and navigate to the orchestrator workflow example:
git clone https://github.com/lastmile-ai/mcp-agent.git
cd mcp-agent/examples/workflows/workflow_orchestrator_worker
Install dependencies:
pip install uv
uv sync
uv pip install -r requirements.txt
Configure your environment:
cp mcp_agent.secrets.yaml.example mcp_agent.secrets.yaml
Add your API keys to mcp_agent.secrets.yaml:
openai_api_key: "your-openai-api-key"
anthropic_api_key: "your-anthropic-api-key"  # optional
Create a sample story file for the grading example:
echo "The Battle of Glimmerwood

In the heart of Glimmerwood, a mystical forest knowed for its radiant trees, a small village thrived. 
The villagers, who were live peacefully, shared their home with the forest's magical creatures, 
especially the Glimmerfoxes whose fur shimmer like moonlight." > short_story.md
Enable optional tracing in mcp_agent.config.yaml:
otel:
  enabled: true  # Enable OpenTelemetry tracing
Run the example:
uv run main.py
The orchestrator will:
  1. Load the story from short_story.md
  2. Run parallel analysis (grammar, style, factual consistency)
  3. Compile feedback into a comprehensive report
  4. Write the graded report to graded_report.md

Expected Output

The orchestrator generates a detailed execution plan and produces output similar to:
=== ORCHESTRATOR EXECUTION ===
Planning steps:
  1. Load short story content from file
  2. Parallel analysis (grammar, style, facts)
  3. Compile comprehensive feedback report
  4. Write graded report to disk

Execution Summary:
  Total steps executed: 4
  Parallel tasks run: 3
  Total cost: $0.0234
  Execution time: 45.67s
The final graded report includes:
  • Grammar and spelling corrections
  • Style adherence feedback based on APA guidelines
  • Factual consistency analysis
  • Overall assessment and recommendations

Full Implementation

See the complete orchestrator example with student assignment grading workflow.