Documentation Index Fetch the complete documentation index at: https://docs.mcp-agent.com/llms.txt
Use this file to discover all available pages before exploring further.
Overview
mcp-agent provides two powerful ways to define agent logic:
Workflow Class : For complex, stateful agent workflows
Tool Decorators : For simple, stateless functions
Both approaches expose your agent logic as MCP tools that can be invoked by any MCP client.
The Workflow Class
The Workflow class is the foundation for building complex agent behaviors. It provides:
Type-safe input/output handling
Automatic MCP tool registration
Support for both asyncio and Temporal execution
Built-in error handling and retries
Workflow state management
Basic Workflow Definition
from mcp_agent.app import MCPApp
from mcp_agent.executor.workflow import Workflow, WorkflowResult
app = MCPApp( name = "my_agent" )
@app.workflow
class MyWorkflow (Workflow[ str ]):
"""A simple workflow that processes text."""
@app.workflow_run
async def run ( self , input_text : str ) -> WorkflowResult[ str ]:
# Your agent logic here
processed = await self .process_text(input_text)
return WorkflowResult( value = processed)
async def process_text ( self , text : str ) -> str :
# Helper method
return text.upper()
Generic Type Parameters
Workflows use Python generics to specify return types:
# String output
class TextWorkflow (Workflow[ str ]):
@app.workflow_run
async def run ( self , prompt : str ) -> WorkflowResult[ str ]:
return WorkflowResult( value = "response" )
# Dictionary output
class DataWorkflow (Workflow[ dict ]):
@app.workflow_run
async def run ( self , query : dict ) -> WorkflowResult[ dict ]:
return WorkflowResult( value = { "result" : "data" })
# Custom type output
from pydantic import BaseModel
class AnalysisResult ( BaseModel ):
sentiment: str
confidence: float
entities: List[ str ]
class AnalysisWorkflow (Workflow[AnalysisResult]):
@app.workflow_run
async def run ( self , text : str ) -> WorkflowResult[AnalysisResult]:
result = AnalysisResult(
sentiment = "positive" ,
confidence = 0.95 ,
entities = [ "Company A" , "Product B" ]
)
return WorkflowResult( value = result)
Workflow Properties
Every workflow has access to important properties:
@app.workflow
class StatefulWorkflow (Workflow[ dict ]):
@app.workflow_run
async def run ( self , data : dict ) -> WorkflowResult[ dict ]:
# Unique workflow instance ID
workflow_id = self .id
# Unique run ID (for this execution)
run_id = self .run_id
# Access app context
logger = app.context.logger
logger.info( f "Running workflow { workflow_id } , run { run_id } " )
# Access configuration
config = app.context.settings
return WorkflowResult( value = { "workflow_id" : workflow_id})
Error Handling
Workflows provide structured error handling:
@app.workflow
class RobustWorkflow (Workflow[ str ]):
@app.workflow_run
async def run ( self , input : str ) -> WorkflowResult[ str ]:
try :
result = await self .risky_operation( input )
return WorkflowResult( value = result)
except ValidationError as e:
# Return error in result
return WorkflowResult(
value = None ,
error = f "Validation failed: { e } " ,
metadata = { "error_type" : "validation" }
)
except Exception as e:
# Log and re-raise for retry
app.context.logger.error( f "Workflow failed: { e } " )
raise
For simpler use cases, mcp-agent provides decorator-based tool definition:
The @app.tool decorator creates tools that return results immediately:
from mcp_agent.app import MCPApp
from typing import Optional
app = MCPApp( name = "utility_agent" )
@app.tool
async def calculate_sum ( numbers : List[ float ]) -> float :
"""Calculate the sum of a list of numbers."""
return sum (numbers)
@app.tool ( name = "get-weather" )
async def get_weather (
city : str ,
units : str = "celsius" ,
app_ctx : Optional[Context] = None
) -> dict :
"""
Get weather for a city.
Args:
city: City name
units: Temperature units (celsius or fahrenheit)
"""
# Access app context if needed
if app_ctx:
logger = app_ctx.logger
logger.info( f "Getting weather for { city } " )
# Your logic here
weather = await fetch_weather_api(city, units)
return weather
Key features:
Returns final result directly
No workflow ID or polling needed
Best for quick operations
Supports optional app_ctx parameter for context access
The @app.async_tool decorator creates tools that start workflows asynchronously:
@app.async_tool ( name = "analyze-document" )
async def analyze_document_async (
document_url : str ,
analysis_type : str = "summary" ,
app_ctx : Optional[Context] = None
) -> dict :
"""
Start document analysis asynchronously.
Returns workflow_id and run_id for status polling.
"""
# Start long-running analysis
workflow = DocumentAnalysisWorkflow()
handle = await app_ctx.executor.start_workflow(
workflow,
{ "url" : document_url, "type" : analysis_type}
)
# Return IDs for polling
return {
"workflow_id" : workflow.id,
"run_id" : handle.id,
"message" : "Analysis started. Use workflows-get_status to check progress."
}
Key features:
Returns workflow/run IDs immediately
Client polls for results using workflows-get_status
Best for long-running operations
Enables progress tracking
Control how your tools appear to MCP clients:
@app.tool (
name = "search-knowledge-base" ,
description = "Search the knowledge base for relevant information"
)
async def search (
query : str ,
limit : int = 10 ,
filters : Optional[ dict ] = None
) -> List[ dict ]:
"""
Detailed search implementation.
Args:
query: Search query
limit: Maximum results
filters: Optional filters
"""
# The description parameter becomes the tool description
# The docstring provides implementation details
return await perform_search(query, limit, filters)
Advanced Workflow Patterns
Workflow Composition
Compose complex workflows from simpler ones:
@app.workflow
class CompositeWorkflow (Workflow[ dict ]):
@app.workflow_run
async def run ( self , request : dict ) -> WorkflowResult[ dict ]:
# Run sub-workflows
step1 = DataFetchWorkflow()
data = await step1.run(request[ "source" ])
step2 = DataProcessWorkflow()
processed = await step2.run(data.value)
step3 = ReportGenerationWorkflow()
report = await step3.run(processed.value)
return WorkflowResult( value = {
"data" : data.value,
"processed" : processed.value,
"report" : report.value
})
Workflow with Agents
Integrate agents into workflows:
from mcp_agent.agents.agent import Agent
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM
@app.workflow
class AgentWorkflow (Workflow[ str ]):
@app.workflow_run
async def run ( self , task : str ) -> WorkflowResult[ str ]:
# Create specialized agent
agent = Agent(
name = "researcher" ,
instruction = "Research thoroughly and provide detailed analysis." ,
server_names = [ "fetch" , "filesystem" ]
)
async with agent:
# Attach LLM
llm = await agent.attach_llm(OpenAIAugmentedLLM)
# Execute task
result = await llm.generate_str(task)
return WorkflowResult( value = result)
Parallel Workflow Execution
Execute multiple workflows in parallel:
import asyncio
@app.workflow
class ParallelWorkflow (Workflow[ dict ]):
@app.workflow_run
async def run ( self , tasks : List[ str ]) -> WorkflowResult[ dict ]:
# Create workflow instances
workflows = [
TaskWorkflow() for _ in tasks
]
# Run in parallel
results = await asyncio.gather( * [
w.run(task) for w, task in zip (workflows, tasks)
])
# Combine results
combined = {
f "task_ { i } " : r.value
for i, r in enumerate (results)
}
return WorkflowResult( value = combined)
Stateful Workflows
Maintain state across workflow executions:
@app.workflow
class StatefulWorkflow (Workflow[ dict ]):
def __init__ ( self ):
super (). __init__ ()
self .state = {}
@app.workflow_run
async def run ( self , action : dict ) -> WorkflowResult[ dict ]:
action_type = action.get( "type" )
if action_type == "set" :
self .state[action[ "key" ]] = action[ "value" ]
return WorkflowResult( value = { "status" : "set" })
elif action_type == "get" :
value = self .state.get(action[ "key" ])
return WorkflowResult( value = { "value" : value})
elif action_type == "clear" :
self .state.clear()
return WorkflowResult( value = { "status" : "cleared" })
return WorkflowResult( value = self .state)
Temporal Integration
Workflows seamlessly support Temporal for durable execution:
# Configure for Temporal
app = MCPApp(
name = "temporal_agent" ,
settings = Settings(
execution_engine = "temporal" ,
temporal = TemporalSettings(
host = "localhost" ,
port = 7233 ,
namespace = "default" ,
task_queue = "mcp-agent"
)
)
)
@app.workflow
class DurableWorkflow (Workflow[ str ]):
@app.workflow_run
async def run ( self , task : str ) -> WorkflowResult[ str ]:
# This workflow is now durable
# It can be paused, resumed, and retried
# Wait for signal (human-in-the-loop)
await app.context.executor.signal_bus.wait_for_signal(
Signal( name = "approve" , workflow_id = self .id)
)
# Continue after approval
result = await self .process_with_approval(task)
return WorkflowResult( value = result)
MCP Server Integration
Workflows and tools are automatically exposed when creating an MCP server:
from mcp_agent.mcp.server import create_mcp_server_for_app
# Define workflows and tools
@app.workflow
class MyWorkflow (Workflow[ str ]):
@app.workflow_run
async def run ( self , input : str ) -> WorkflowResult[ str ]:
return WorkflowResult( value = f "Processed: { input } " )
@app.tool
async def my_tool ( param : str ) -> str :
return f "Tool result: { param } "
# Create MCP server
async def main ():
async with app.run():
mcp_server = create_mcp_server_for_app(app)
# Available tools:
# - workflows-list
# - workflows-MyWorkflow-run
# - workflows-get_status
# - my_tool
await mcp_server.run_stdio_async()
MCP clients can discover available tools:
# From MCP client perspective
tools = await server.list_tools()
for tool in tools:
print ( f "Tool: { tool.name } " )
print ( f "Description: { tool.description } " )
print ( f "Parameters: { tool.input_schema } " )
Best Practices
Choose the Right Abstraction
Use @app.tool for simple, stateless operations
Use @app.async_tool for long-running operations that need polling
Use Workflow class for complex, multi-step processes
Type Hints and Documentation
Always provide type hints and docstrings: @app.tool
async def process_data (
data : dict ,
options : Optional[ dict ] = None
) -> dict :
"""
Process data with optional transformations.
Args:
data: Input data to process
options: Optional processing options
Returns:
Processed data dictionary
"""
# Implementation
Handle errors gracefully: @app.workflow
class SafeWorkflow (Workflow[ str ]):
@app.workflow_run
async def run ( self , input : str ) -> WorkflowResult[ str ]:
try :
result = await self .process( input )
return WorkflowResult( value = result)
except Exception as e:
logger.error( f "Processing failed: { e } " )
return WorkflowResult(
value = None ,
error = str (e)
)
Use context managers for resources: @app.workflow
class ResourceWorkflow (Workflow[ str ]):
@app.workflow_run
async def run ( self , query : str ) -> WorkflowResult[ str ]:
async with self .get_database() as db:
result = await db.query(query)
return WorkflowResult( value = result)
Logging and Observability
Use structured logging: @app.tool
async def monitored_tool ( input : str , app_ctx : Optional[Context] = None ) -> str :
if app_ctx:
logger = app_ctx.logger
logger.info( "Tool started" , data = { "input" : input })
try :
result = await process( input )
logger.info( "Tool completed" , data = { "result_length" : len (result)})
return result
except Exception as e:
logger.error( "Tool failed" , data = { "error" : str (e)})
raise
Testing Workflows
Test your workflows locally:
import asyncio
import pytest
@pytest.mark.asyncio
async def test_workflow ():
app = MCPApp( name = "test_app" )
@app.workflow
class TestWorkflow (Workflow[ str ]):
@app.workflow_run
async def run ( self , input : str ) -> WorkflowResult[ str ]:
return WorkflowResult( value = input .upper())
async with app.run():
workflow = TestWorkflow()
result = await workflow.run( "hello" )
assert result.value == "HELLO"
Migration Guide
# Before: Plain function
async def calculate ( x : int , y : int ) -> int :
return x + y
# After: MCP tool
@app.tool
async def calculate ( x : int , y : int ) -> int :
"""Calculate sum of two numbers."""
return x + y
From Scripts to Workflows
# Before: Script
async def main ():
data = await fetch_data()
processed = await process_data(data)
await save_results(processed)
# After: Workflow
@app.workflow
class DataPipeline (Workflow[ dict ]):
@app.workflow_run
async def run ( self , source : str ) -> WorkflowResult[ dict ]:
data = await self .fetch_data(source)
processed = await self .process_data(data)
await self .save_results(processed)
return WorkflowResult( value = processed)
Next Steps
Workflow Patterns Explore pre-built workflow patterns
Agent Server Deploy workflows as MCP servers
Temporal Integration Add durability with Temporal
Examples See workflows in action