Skip to main content

Overview

mcp-agent gives you two complementary ways to expose agent behaviour:
  1. Decorator-based tools – mark a plain Python function with @app.tool or @app.async_tool to expose it as an MCP tool. This is the quickest way to add synchronous or long-running behaviour to your app.
  2. Workflow classes – build stateful, structured flows by subclassing Workflow[T]. Workflows give you fine-grained control over orchestration, retries, and Temporal integration.
Both options register MCP tools automatically, so any MCP client can invoke them. The high-level “workflow patterns” in examples/workflows (parallel, router, orchestrator, etc.) are built using these same primitives—they are patterns, not the Workflow base class itself. The rest of this page walks through the decorators first (because most apps start there) and then dives into the Workflow class.

Decorator-based tools

@app.tool – synchronous tools

Use @app.tool when the work can complete within a single MCP call. The return value is sent straight back to the client—no polling required.
from mcp_agent.app import MCPApp
from typing import Optional

app = MCPApp(name="utility_agent")

@app.tool
async def calculate_sum(numbers: List[float]) -> float:
    """Calculate the sum of a list of numbers."""
    return sum(numbers)

@app.tool(name="get-weather")
async def get_weather(
    city: str,
    units: str = "celsius",
    app_ctx: Optional[Context] = None,
) -> dict:
    if app_ctx:
        app_ctx.logger.info("Fetching weather", data={"city": city})
    return await fetch_weather_api(city, units)
Key points:
  • Works great for quick operations or simple glue code.
  • You can accept an optional app_ctx: Context parameter to access logging, server registry, etc.
  • The tool result is serialised and returned to the caller immediately.

@app.async_tool – long-running tools

Agents often need to run tasks that take longer than an MCP request allows (multi-step research, human-in-the-loop flows, durable Temporal runs). Decorate those entry points with @app.async_tool:
@app.async_tool(name="analyze-document")
async def analyze_document_async(
    document_url: str,
    analysis_type: str = "summary",
    app_ctx: Optional[Context] = None,
) -> dict:
    workflow = DocumentAnalysisWorkflow()
    handle = await app_ctx.executor.start_workflow(
        workflow,
        {"url": document_url, "type": analysis_type},
    )
    return {"workflow_id": workflow.id, "run_id": handle.id}
@app.async_tool starts a workflow in the background and returns identifiers that clients can poll via the built-in workflows-get_status tool. This pattern keeps your agent responsive even when the underlying work takes minutes or requires human decisions.
Tip: Agent servers rely heavily on these decorators—see Agent Servers for end-to-end examples.

The Workflow Class

The Workflow[T] base class lets you model multi-step or stateful logic while still exposing an MCP tool. Workflows are most useful when you need retries, shared state, or tight integration with the execution engine (asyncio or Temporal).

Basic workflow definition

from mcp_agent.executor.workflow import Workflow, WorkflowResult

# Assume `read_file` / `summarise` are helper functions you provide.

@app.workflow
class SummariseFile(Workflow[str]):
    @app.workflow_run
    async def run(self, path: str) -> WorkflowResult[str]:
        content = await read_file(path)
        summary = await summarise(content)
        return WorkflowResult(value=summary)
Decorate the class with @app.workflow and the entry point with @app.workflow_run. Whatever you return from the method becomes the MCP tool result.

Useful workflow features

  • Access self.context for logging, MCP connections, and configuration.
  • Store reusable helpers or caches on self inside __init__.
  • Raise exceptions to trigger retries (Temporal) or propagate errors to the caller.
  • Combine with @app.workflow_task / @app.workflow_signal when you need durable activities or signal handlers.
See the sections below for more elaborate compositions.

Workflow patterns (examples/workflows)

The repository has an examples/workflows directory that demonstrates higher-level agent patterns: router, parallel fan-out, orchestrator, evaluator/optimizer, and more. These samples compose agents and AugmentedLLMs with helpers from mcp_agent.workflows.factory. They do not correspond one-to-one with the Workflow base class above—they are ready-made orchestration patterns you can adopt or customise. Use the patterns when you want opinionated orchestration, and drop down to the Workflow class (or @app.async_tool) when you need bespoke control flow.

Advanced Workflow Patterns

Workflow Composition

Compose complex workflows from simpler ones:
@app.workflow
class CompositeWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, request: dict) -> WorkflowResult[dict]:
        # Run sub-workflows
        step1 = DataFetchWorkflow()
        data = await step1.run(request["source"])
        
        step2 = DataProcessWorkflow()
        processed = await step2.run(data.value)
        
        step3 = ReportGenerationWorkflow()
        report = await step3.run(processed.value)
        
        return WorkflowResult(value={
            "data": data.value,
            "processed": processed.value,
            "report": report.value
        })

Workflow with Agents

Integrate agents into workflows:
from mcp_agent.agents.agent import Agent
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM

@app.workflow
class AgentWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, task: str) -> WorkflowResult[str]:
        # Create specialized agent
        agent = Agent(
            name="researcher",
            instruction="Research thoroughly and provide detailed analysis.",
            server_names=["fetch", "filesystem"]
        )
        
        async with agent:
            # Attach LLM
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            
            # Execute task
            result = await llm.generate_str(task)
            
            return WorkflowResult(value=result)

Parallel Workflow Execution

Execute multiple workflows in parallel:
import asyncio

@app.workflow
class ParallelWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, tasks: List[str]) -> WorkflowResult[dict]:
        # Create workflow instances
        workflows = [
            TaskWorkflow() for _ in tasks
        ]
        
        # Run in parallel
        results = await asyncio.gather(*[
            w.run(task) for w, task in zip(workflows, tasks)
        ])
        
        # Combine results
        combined = {
            f"task_{i}": r.value 
            for i, r in enumerate(results)
        }
        
        return WorkflowResult(value=combined)

Stateful Workflows

Maintain state across workflow executions:
@app.workflow
class StatefulWorkflow(Workflow[dict]):
    def __init__(self):
        super().__init__()
        self.state = {}
    
    @app.workflow_run
    async def run(self, action: dict) -> WorkflowResult[dict]:
        action_type = action.get("type")
        
        if action_type == "set":
            self.state[action["key"]] = action["value"]
            return WorkflowResult(value={"status": "set"})
        
        elif action_type == "get":
            value = self.state.get(action["key"])
            return WorkflowResult(value={"value": value})
        
        elif action_type == "clear":
            self.state.clear()
            return WorkflowResult(value={"status": "cleared"})
        
        return WorkflowResult(value=self.state)

Temporal Integration

Workflows seamlessly support Temporal for durable execution:
# Configure for Temporal
app = MCPApp(
    name="temporal_agent",
    settings=Settings(
        execution_engine="temporal",
        temporal=TemporalSettings(
            host="localhost",
            port=7233,
            namespace="default",
            task_queue="mcp-agent"
        )
    )
)

@app.workflow
class DurableWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, task: str) -> WorkflowResult[str]:
        # This workflow is now durable
        # It can be paused, resumed, and retried
        
        # Wait for signal (human-in-the-loop)
        await app.context.executor.signal_bus.wait_for_signal(
            Signal(name="approve", workflow_id=self.id)
        )
        
        # Continue after approval
        result = await self.process_with_approval(task)
        return WorkflowResult(value=result)

MCP Server Integration

Exposing Workflows as MCP Tools

Workflows and tools are automatically exposed when creating an MCP server:
from mcp_agent.mcp.server import create_mcp_server_for_app

# Define workflows and tools
@app.workflow
class MyWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, input: str) -> WorkflowResult[str]:
        return WorkflowResult(value=f"Processed: {input}")

@app.tool
async def my_tool(param: str) -> str:
    return f"Tool result: {param}"

# Create MCP server
async def main():
    async with app.run():
        mcp_server = create_mcp_server_for_app(app)
        
        # Available tools:
        # - workflows-list
        # - workflows-MyWorkflow-run
        # - workflows-get_status
        # - my_tool
        
        await mcp_server.run_stdio_async()

Tool Discovery

MCP clients can discover available tools:
# From MCP client perspective
tools = await server.list_tools()
for tool in tools:
    print(f"Tool: {tool.name}")
    print(f"Description: {tool.description}")
    print(f"Parameters: {tool.input_schema}")

Best Practices

  • Use @app.tool for simple, stateless operations
  • Use @app.async_tool for long-running operations that need polling
  • Use Workflow class for complex, multi-step processes
Always provide type hints and docstrings:
@app.tool
async def process_data(
    data: dict,
    options: Optional[dict] = None
) -> dict:
    """
    Process data with optional transformations.
    
    Args:
        data: Input data to process
        options: Optional processing options
        
    Returns:
        Processed data dictionary
    """
    # Implementation
Handle errors gracefully:
@app.workflow
class SafeWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, input: str) -> WorkflowResult[str]:
        try:
            result = await self.process(input)
            return WorkflowResult(value=result)
        except Exception as e:
            logger.error(f"Processing failed: {e}")
            return WorkflowResult(
                value=None,
                error=str(e)
            )
Use context managers for resources:
@app.workflow
class ResourceWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, query: str) -> WorkflowResult[str]:
        async with self.get_database() as db:
            result = await db.query(query)
            return WorkflowResult(value=result)
Use structured logging:
@app.tool
async def monitored_tool(input: str, app_ctx: Optional[Context] = None) -> str:
    if app_ctx:
        logger = app_ctx.logger
        logger.info("Tool started", data={"input": input})
        
        try:
            result = await process(input)
            logger.info("Tool completed", data={"result_length": len(result)})
            return result
        except Exception as e:
            logger.error("Tool failed", data={"error": str(e)})
            raise

Testing Workflows

Test your workflows locally:
import asyncio
import pytest

@pytest.mark.asyncio
async def test_workflow():
    app = MCPApp(name="test_app")
    
    @app.workflow
    class TestWorkflow(Workflow[str]):
        @app.workflow_run
        async def run(self, input: str) -> WorkflowResult[str]:
            return WorkflowResult(value=input.upper())
    
    async with app.run():
        workflow = TestWorkflow()
        result = await workflow.run("hello")
        assert result.value == "HELLO"

Migration Guide

From Functions to Tools

# Before: Plain function
async def calculate(x: int, y: int) -> int:
    return x + y

# After: MCP tool
@app.tool
async def calculate(x: int, y: int) -> int:
    """Calculate sum of two numbers."""
    return x + y

From Scripts to Workflows

# Before: Script
async def main():
    data = await fetch_data()
    processed = await process_data(data)
    await save_results(processed)

# After: Workflow
@app.workflow
class DataPipeline(Workflow[dict]):
    @app.workflow_run
    async def run(self, source: str) -> WorkflowResult[dict]:
        data = await self.fetch_data(source)
        processed = await self.process_data(data)
        await self.save_results(processed)
        return WorkflowResult(value=processed)

Next Steps

I