Skip to main content

Overview

mcp-agent provides two execution engines that determine how agent workflows are executed and managed. Each engine offers different capabilities for reliability, persistence, and deployment scenarios.

Execution Engines

asyncio Engine

The asyncio engine runs workflows in-memory using Python’s native async/await capabilities. Characteristics:
  • In-memory execution
  • No external dependencies
  • Fast startup and iteration
  • Best for development and simple deployments
  • State lost on process restart
Configuration:
execution_engine: asyncio
Use cases:
  • Local development
  • Quick prototyping
  • Stateless operations
  • Single-node deployments

Temporal Engine

The Temporal engine provides durable workflow execution with automatic state persistence. Characteristics:
  • Durable execution across restarts
  • Automatic retry with exponential backoff
  • Workflow history and replay
  • Distributed execution support
  • Requires Temporal server
Configuration:
execution_engine: temporal
temporal:
  host: "localhost:7233"
  namespace: "default"
  task_queue: "mcp-agent"
Use cases:
  • Production deployments
  • Long-running workflows
  • Critical operations requiring reliability
  • Multi-node deployments
  • Workflows requiring pause/resume
📌 Example: The Temporal workflow gallery showcases orchestrator, router, and evaluator/optimizer patterns running on this engine.

Executors

Executors are the runtime components that actually execute workflows within an engine.

AsyncioExecutor

Handles workflow execution for the asyncio engine:
from mcp_agent.executor.executor import AsyncioExecutor

async def greet(name: str) -> str:
    return f"Hi {name}"

executor = AsyncioExecutor()
result = await executor.execute(greet, "Ada")
print(result)  # "Hi Ada"
Features:
  • Direct Python function execution
  • Native async/await support
  • Minimal overhead
See it in action in the basic workflows examples where tasks run entirely in-process.

TemporalExecutor

Manages workflow execution for the Temporal engine:
from mcp_agent.executor.temporal import TemporalExecutor
from mcp_agent.config import TemporalSettings

executor = TemporalExecutor(config=TemporalSettings(
    host="localhost:7233",
    namespace="default",
    task_queue="mcp-agent",
))
handle = await executor.start_workflow("ResearchWorkflow", {"topic": "LLMs"})
result = await handle.result()
Features:
  • Workflow versioning
  • Activity retries
  • Distributed execution
  • Workflow queries and signals
The examples/oauth/pre_authorize project combines this executor with OAuth-aware workflows.

Choosing an Execution Engine

Development Phase

Use asyncio engine during development:
  • Fast iteration cycles
  • No infrastructure requirements
  • Immediate feedback
  • Simple debugging

Production Phase

Consider Temporal engine for production:
  • Workflow reliability
  • Automatic failure handling
  • Audit trail via workflow history
  • Horizontal scaling

Execution Context

Both engines provide an execution context to workflows:
@app.workflow
async def my_workflow(ctx: WorkflowContext, params: dict):
    # Access execution context
    workflow_id = ctx.workflow_id
    run_id = ctx.run_id
    
    # Engine-specific features
    if ctx.engine == "temporal":
        # Temporal-specific operations
        await ctx.sleep(timedelta(hours=1))
    
    return result

Engine-Specific Features

asyncio Features

  • Direct execution: Workflows run as standard Python functions
  • Memory state: State maintained in process memory
  • Simple cancellation: Standard asyncio cancellation

Temporal Features

  • Workflow replay: Deterministic replay from history
  • Signals: Send data to running workflows
  • Queries: Query workflow state without affecting execution
  • Child workflows: Spawn and manage child workflow instances
  • Timers: Durable sleep and timeouts
  • Activities: Retryable units of work

Migration Between Engines

Workflows written for mcp-agent can run on either engine without modification:
# This workflow runs on both engines
@app.workflow
async def portable_workflow(ctx: WorkflowContext, input: dict):
    agent = Agent(
        name="researcher",
        instruction="Research the topic",
        server_names=["fetch"]
    )
    
    async with agent:
        llm = await agent.attach_llm(OpenAIAugmentedLLM)
        result = await llm.generate_str(input["query"])
    
    return result

Performance Considerations

asyncio Engine

  • Latency: Microseconds for workflow start
  • Throughput: Limited by single process
  • Memory: All state in RAM
  • Reliability: No persistence

Temporal Engine

  • Latency: Milliseconds for workflow start
  • Throughput: Horizontally scalable
  • Memory: State persisted to database
  • Reliability: Survives crashes and restarts

Configuration Examples

Basic asyncio Setup

execution_engine: asyncio
logger:
  level: info

Production Temporal Setup

execution_engine: temporal
temporal:
  host: "temporal.production.internal:7233"
  namespace: "production"
  task_queue: "agent-workflows"
  worker_count: 4
  max_concurrent_activities: 20

Accessing the executor in an application

MCPApp exposes the active executor and engine selection:
async with app.run() as running_app:
    executor = running_app.executor
    print(executor.execution_engine)  # "asyncio" or "temporal"
You typically call high-level helpers (workflow.execute(), executor.start_workflow) rather than invoking executor methods directly, but the property is available when you need advanced control or diagnostics.

Next Steps

I