mcp-agent supports durable workflow execution using Temporal, providing reliability, scalability, and observability for complex AI workflows.

Why Use Temporal?

Durability

Workflows survive failures, restarts, and infrastructure changes

Scalability

Distribute execution across multiple workers and machines

Observability

Built-in monitoring, logging, and debugging through Temporal UI

Reliability

Automatic retries, timeouts, and error handling

Configuration

Configure Temporal in your mcp_agent.config.yaml:
execution_engine: temporal
temporal:
  host: "localhost"
  port: 7233
  namespace: "default"
  task_queue: "mcp-agent"
  max_concurrent_activities: 100
  max_concurrent_workflows: 1000

Quick Start

1

Start Temporal Server

# Install Temporal CLI
brew install temporal

# Start development server
temporal server start-dev
This starts:
  • Temporal server on localhost:7233
  • Temporal Web UI on http://localhost:8233
2

Create a Workflow

Create workflow.py:
workflow.py
from mcp_agent.app import MCPApp
from mcp_agent.agents.agent import Agent
from mcp_agent.workflows.workflow import Workflow, WorkflowResult

app = MCPApp(name="temporal_example")

@app.workflow
class DataProcessingWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, input: str) -> WorkflowResult[str]:
        # Create an agent with filesystem access
        agent = Agent(
            name="processor",
            instruction="Process files and return results",
            server_names=["filesystem"]
        )
        
        async with agent:
            # Agent operations are durable
            result = await agent.generate_str(f"Process this data: {input}")
            return WorkflowResult(value=result)
3

Start Worker

Create worker.py:
worker.py
import asyncio
from workflow import app
from mcp_agent.executor.temporal.worker import create_temporal_worker_for_app

async def main():
    async with create_temporal_worker_for_app(app) as worker:
        print("Worker started. Ctrl+C to exit.")
        await worker.run()

if __name__ == "__main__":
    asyncio.run(main())
Run the worker:
python worker.py
4

Execute Workflow

Create execute.py:
execute.py
import asyncio
from workflow import app

async def execute_workflow():
    async with app.run() as agent_app:
        executor = agent_app.executor
        
        # Start workflow
        handle = await executor.start_workflow(
            "DataProcessingWorkflow",
            "sample data to process"
        )
        
        # Get result
        result = await handle.result()
        print(f"Result: {result.value}")

if __name__ == "__main__":
    asyncio.run(execute_workflow())
Run the workflow:
python execute.py

Workflow Patterns

Basic Agent Workflow

Simple single-agent task execution:
@app.workflow
class SimpleAgentWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, query: str) -> WorkflowResult[str]:
        agent = Agent(
            name="researcher",
            instruction="Research and provide detailed answers",
            server_names=["fetch", "filesystem"]
        )
        
        async with agent:
            result = await agent.generate_str(query)
            return WorkflowResult(value=result)

Multi-Agent Orchestration

Coordinate multiple agents for complex tasks:
@app.workflow
class OrchestratorWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, task: str) -> WorkflowResult[str]:
        # Planning agent
        planner = Agent(
            name="planner",
            instruction="Break down tasks into steps",
            server_names=["fetch"]
        )
        
        # Execution agent
        executor = Agent(
            name="executor",
            instruction="Execute specific steps",
            server_names=["filesystem"]
        )
        
        async with planner, executor:
            # Step 1: Plan
            plan = await planner.generate_str(f"Create plan for: {task}")
            
            # Step 2: Execute
            result = await executor.generate_str(f"Execute plan: {plan}")
            
            return WorkflowResult(value=result)

Parallel Processing

Fan-out/fan-in pattern for concurrent execution:
@app.workflow
class ParallelWorkflow(Workflow[List[str]]):
    @app.workflow_run
    async def run(self, items: List[str]) -> WorkflowResult[List[str]]:
        async def process_item(item: str) -> str:
            agent = Agent(
                name=f"processor_{item}",
                instruction="Process individual items",
                server_names=["fetch"]
            )
            
            async with agent:
                return await agent.generate_str(f"Process: {item}")
        
        # Process items in parallel
        results = await asyncio.gather(*[
            process_item(item) for item in items
        ])
        
        return WorkflowResult(value=results)

Interactive Workflows

Human-in-the-loop workflows with pause/resume:
from mcp_agent.executor.temporal.interactive_workflow import InteractiveWorkflow

@app.workflow
class InteractiveReviewWorkflow(InteractiveWorkflow[str]):
    @app.workflow_run
    async def run(self, content: str) -> WorkflowResult[str]:
        agent = Agent(
            name="reviewer",
            instruction="Review and improve content",
            server_names=["fetch"]
        )
        
        async with agent:
            # Initial processing
            draft = await agent.generate_str(f"Review: {content}")
            
            # Wait for human input
            feedback = await self.wait_for_human_input(
                f"Review this draft: {draft}",
                timeout_seconds=3600  # 1 hour timeout
            )
            
            # Incorporate feedback
            final = await agent.generate_str(
                f"Improve draft based on feedback: {draft}\nFeedback: {feedback}"
            )
            
            return WorkflowResult(value=final)

Workflow Signals

Control running workflows with signals:
from temporalio import workflow

@app.workflow
class ControllableWorkflow(Workflow[str]):
    def __init__(self):
        self._paused = False
        
    @workflow.signal
    async def pause(self):
        self._paused = True
        
    @workflow.signal
    async def resume(self):
        self._paused = False
        
    @app.workflow_run
    async def run(self, input: str) -> WorkflowResult[str]:
        agent = Agent(name="worker", server_names=["fetch"])
        
        async with agent:
            # Check for pause signal
            await workflow.wait_condition(lambda: not self._paused)
            
            result = await agent.generate_str(input)
            return WorkflowResult(value=result)
Send signals to workflows:
# Get workflow handle
handle = await executor.start_workflow("ControllableWorkflow", "task")

# Send pause signal
await handle.signal("pause")

# Send resume signal
await handle.signal("resume")

Monitoring and Debugging

Temporal Web UI

Access the Temporal Web UI at http://localhost:8233 to:
  • View running and completed workflows
  • Inspect workflow history and state
  • Debug failed executions
  • Monitor performance metrics

Best Practices

Examples

Explore complete Temporal examples:

Troubleshooting

Next Steps