Overview

mcp-agent provides two powerful ways to define agent logic:
  1. Workflow Class: For complex, stateful agent workflows
  2. Tool Decorators: For simple, stateless functions
Both approaches expose your agent logic as MCP tools that can be invoked by any MCP client.

The Workflow Class

The Workflow class is the foundation for building complex agent behaviors. It provides:
  • Type-safe input/output handling
  • Automatic MCP tool registration
  • Support for both asyncio and Temporal execution
  • Built-in error handling and retries
  • Workflow state management

Basic Workflow Definition

from mcp_agent.app import MCPApp
from mcp_agent.workflows import Workflow, WorkflowResult

app = MCPApp(name="my_agent")

@app.workflow
class MyWorkflow(Workflow[str]):
    """A simple workflow that processes text."""
    
    @app.workflow_run
    async def run(self, input_text: str) -> WorkflowResult[str]:
        # Your agent logic here
        processed = await self.process_text(input_text)
        return WorkflowResult(value=processed)
    
    async def process_text(self, text: str) -> str:
        # Helper method
        return text.upper()

Generic Type Parameters

Workflows use Python generics to specify return types:
# String output
class TextWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, prompt: str) -> WorkflowResult[str]:
        return WorkflowResult(value="response")

# Dictionary output
class DataWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, query: dict) -> WorkflowResult[dict]:
        return WorkflowResult(value={"result": "data"})

# Custom type output
from pydantic import BaseModel

class AnalysisResult(BaseModel):
    sentiment: str
    confidence: float
    entities: List[str]

class AnalysisWorkflow(Workflow[AnalysisResult]):
    @app.workflow_run
    async def run(self, text: str) -> WorkflowResult[AnalysisResult]:
        result = AnalysisResult(
            sentiment="positive",
            confidence=0.95,
            entities=["Company A", "Product B"]
        )
        return WorkflowResult(value=result)

Workflow Properties

Every workflow has access to important properties:
@app.workflow
class StatefulWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, data: dict) -> WorkflowResult[dict]:
        # Unique workflow instance ID
        workflow_id = self.id
        
        # Unique run ID (for this execution)
        run_id = self.run_id
        
        # Access app context
        logger = app.context.logger
        logger.info(f"Running workflow {workflow_id}, run {run_id}")
        
        # Access configuration
        config = app.context.settings
        
        return WorkflowResult(value={"workflow_id": workflow_id})

Error Handling

Workflows provide structured error handling:
@app.workflow
class RobustWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, input: str) -> WorkflowResult[str]:
        try:
            result = await self.risky_operation(input)
            return WorkflowResult(value=result)
        except ValidationError as e:
            # Return error in result
            return WorkflowResult(
                value=None,
                error=f"Validation failed: {e}",
                metadata={"error_type": "validation"}
            )
        except Exception as e:
            # Log and re-raise for retry
            app.context.logger.error(f"Workflow failed: {e}")
            raise

Tool Decorators

For simpler use cases, mcp-agent provides decorator-based tool definition:

@app.tool - Synchronous Tools

The @app.tool decorator creates tools that return results immediately:
from mcp_agent.app import MCPApp
from typing import Optional

app = MCPApp(name="utility_agent")

@app.tool
async def calculate_sum(numbers: List[float]) -> float:
    """Calculate the sum of a list of numbers."""
    return sum(numbers)

@app.tool(name="get-weather")
async def get_weather(
    city: str, 
    units: str = "celsius",
    app_ctx: Optional[Context] = None
) -> dict:
    """
    Get weather for a city.
    
    Args:
        city: City name
        units: Temperature units (celsius or fahrenheit)
    """
    # Access app context if needed
    if app_ctx:
        logger = app_ctx.logger
        logger.info(f"Getting weather for {city}")
    
    # Your logic here
    weather = await fetch_weather_api(city, units)
    return weather
Key features:
  • Returns final result directly
  • No workflow ID or polling needed
  • Best for quick operations
  • Supports optional app_ctx parameter for context access

@app.async_tool - Asynchronous Tools

The @app.async_tool decorator creates tools that start workflows asynchronously:
@app.async_tool(name="analyze-document")
async def analyze_document_async(
    document_url: str,
    analysis_type: str = "summary",
    app_ctx: Optional[Context] = None
) -> dict:
    """
    Start document analysis asynchronously.
    
    Returns workflow_id and run_id for status polling.
    """
    # Start long-running analysis
    workflow = DocumentAnalysisWorkflow()
    handle = await app_ctx.executor.start_workflow(
        workflow,
        {"url": document_url, "type": analysis_type}
    )
    
    # Return IDs for polling
    return {
        "workflow_id": workflow.id,
        "run_id": handle.id,
        "message": "Analysis started. Use workflows-get_status to check progress."
    }
Key features:
  • Returns workflow/run IDs immediately
  • Client polls for results using workflows-get_status
  • Best for long-running operations
  • Enables progress tracking

Tool Naming and Description

Control how your tools appear to MCP clients:
@app.tool(
    name="search-knowledge-base",
    description="Search the knowledge base for relevant information"
)
async def search(
    query: str,
    limit: int = 10,
    filters: Optional[dict] = None
) -> List[dict]:
    """
    Detailed search implementation.
    
    Args:
        query: Search query
        limit: Maximum results
        filters: Optional filters
    """
    # The description parameter becomes the tool description
    # The docstring provides implementation details
    return await perform_search(query, limit, filters)

Advanced Workflow Patterns

Workflow Composition

Compose complex workflows from simpler ones:
@app.workflow
class CompositeWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, request: dict) -> WorkflowResult[dict]:
        # Run sub-workflows
        step1 = DataFetchWorkflow()
        data = await step1.run(request["source"])
        
        step2 = DataProcessWorkflow()
        processed = await step2.run(data.value)
        
        step3 = ReportGenerationWorkflow()
        report = await step3.run(processed.value)
        
        return WorkflowResult(value={
            "data": data.value,
            "processed": processed.value,
            "report": report.value
        })

Workflow with Agents

Integrate agents into workflows:
from mcp_agent.agents.agent import Agent
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM

@app.workflow
class AgentWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, task: str) -> WorkflowResult[str]:
        # Create specialized agent
        agent = Agent(
            name="researcher",
            instruction="Research thoroughly and provide detailed analysis.",
            server_names=["fetch", "filesystem"]
        )
        
        async with agent:
            # Attach LLM
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            
            # Execute task
            result = await llm.generate_str(task)
            
            return WorkflowResult(value=result)

Parallel Workflow Execution

Execute multiple workflows in parallel:
import asyncio

@app.workflow
class ParallelWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, tasks: List[str]) -> WorkflowResult[dict]:
        # Create workflow instances
        workflows = [
            TaskWorkflow() for _ in tasks
        ]
        
        # Run in parallel
        results = await asyncio.gather(*[
            w.run(task) for w, task in zip(workflows, tasks)
        ])
        
        # Combine results
        combined = {
            f"task_{i}": r.value 
            for i, r in enumerate(results)
        }
        
        return WorkflowResult(value=combined)

Stateful Workflows

Maintain state across workflow executions:
@app.workflow
class StatefulWorkflow(Workflow[dict]):
    def __init__(self):
        super().__init__()
        self.state = {}
    
    @app.workflow_run
    async def run(self, action: dict) -> WorkflowResult[dict]:
        action_type = action.get("type")
        
        if action_type == "set":
            self.state[action["key"]] = action["value"]
            return WorkflowResult(value={"status": "set"})
        
        elif action_type == "get":
            value = self.state.get(action["key"])
            return WorkflowResult(value={"value": value})
        
        elif action_type == "clear":
            self.state.clear()
            return WorkflowResult(value={"status": "cleared"})
        
        return WorkflowResult(value=self.state)

Temporal Integration

Workflows seamlessly support Temporal for durable execution:
# Configure for Temporal
app = MCPApp(
    name="temporal_agent",
    settings=Settings(
        execution_engine="temporal",
        temporal=TemporalSettings(
            host="localhost",
            port=7233,
            namespace="default",
            task_queue="mcp-agent"
        )
    )
)

@app.workflow
class DurableWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, task: str) -> WorkflowResult[str]:
        # This workflow is now durable
        # It can be paused, resumed, and retried
        
        # Wait for signal (human-in-the-loop)
        await app.context.executor.signal_bus.wait_for_signal(
            Signal(name="approve", workflow_id=self.id)
        )
        
        # Continue after approval
        result = await self.process_with_approval(task)
        return WorkflowResult(value=result)

MCP Server Integration

Exposing Workflows as MCP Tools

Workflows and tools are automatically exposed when creating an MCP server:
from mcp_agent.mcp.server import create_mcp_server_for_app

# Define workflows and tools
@app.workflow
class MyWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, input: str) -> WorkflowResult[str]:
        return WorkflowResult(value=f"Processed: {input}")

@app.tool
async def my_tool(param: str) -> str:
    return f"Tool result: {param}"

# Create MCP server
async def main():
    async with app.run():
        mcp_server = create_mcp_server_for_app(app)
        
        # Available tools:
        # - workflows-list
        # - workflows-MyWorkflow-run
        # - workflows-get_status
        # - my_tool
        
        await mcp_server.run_stdio_async()

Tool Discovery

MCP clients can discover available tools:
# From MCP client perspective
tools = await server.list_tools()
for tool in tools:
    print(f"Tool: {tool.name}")
    print(f"Description: {tool.description}")
    print(f"Parameters: {tool.input_schema}")

Best Practices

Testing Workflows

Test your workflows locally:
import asyncio
import pytest

@pytest.mark.asyncio
async def test_workflow():
    app = MCPApp(name="test_app")
    
    @app.workflow
    class TestWorkflow(Workflow[str]):
        @app.workflow_run
        async def run(self, input: str) -> WorkflowResult[str]:
            return WorkflowResult(value=input.upper())
    
    async with app.run():
        workflow = TestWorkflow()
        result = await workflow.run("hello")
        assert result.value == "HELLO"

Migration Guide

From Functions to Tools

# Before: Plain function
async def calculate(x: int, y: int) -> int:
    return x + y

# After: MCP tool
@app.tool
async def calculate(x: int, y: int) -> int:
    """Calculate sum of two numbers."""
    return x + y

From Scripts to Workflows

# Before: Script
async def main():
    data = await fetch_data()
    processed = await process_data(data)
    await save_results(processed)

# After: Workflow
@app.workflow
class DataPipeline(Workflow[dict]):
    @app.workflow_run
    async def run(self, source: str) -> WorkflowResult[dict]:
        data = await self.fetch_data(source)
        processed = await self.process_data(data)
        await self.save_results(processed)
        return WorkflowResult(value=processed)

Next Steps