Temporal provides durable execution for your agent workflows, enabling automatic retries, pause/resume capabilities, and time-travel debugging. Perfect for production deployments.

Why Temporal?

mcp-agent supports both asyncio and temporal execution engines. While asyncio works great for development and simple workflows, Temporal is recommended for production deployments because it provides:

Durable Execution

Workflows survive failures, restarts, and infrastructure issues

Automatic Retries

Failed activities are automatically retried with configurable policies

Pause & Resume

Workflows can be paused indefinitely and resumed with new data

Observability

Complete workflow history and time-travel debugging via Temporal UI

Scalability

Distribute workflow execution across multiple workers

Long-Running Workflows

Support for workflows that run for days, weeks, or months

Quick Start

1

Install Temporal CLI

Install the Temporal CLI for local development:
# macOS
brew install temporal

# Linux/WSL
curl -sSf https://temporal.download/cli.sh | sh

# Windows
# Download from https://github.com/temporalio/cli/releases
2

Start Temporal Server

Run a local Temporal server for development:
temporal server start-dev
This starts:
  • Temporal Server on localhost:7233
  • Web UI on http://localhost:8233
3

Configure mcp-agent

Update your mcp_agent.config.yaml:
execution_engine: temporal

temporal:
  host: localhost
  port: 7233
  namespace: default
  task_queue: mcp-agent
  max_concurrent_activities: 10
4

Create Worker

Create a worker to process workflows:
worker.py
import asyncio
from mcp_agent.app import MCPApp
from mcp_agent.workflows import Workflow, WorkflowResult
from mcp_agent.executor.temporal import create_temporal_worker_for_app

app = MCPApp(name="my_agent")

# Define your workflows here
@app.workflow
class MyWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, input: str) -> WorkflowResult[str]:
        return WorkflowResult(value=f"Processed: {input}")

async def main():
    async with create_temporal_worker_for_app(app) as worker:
        await worker.run()

if __name__ == "__main__":
    asyncio.run(main())
5

Run Workflow

Execute your workflow:
main.py
import asyncio
from mcp_agent.app import MCPApp

app = MCPApp(name="my_agent")

async def main():
    async with app.run() as agent_app:
        executor = agent_app.executor
        
        # Start workflow
        handle = await executor.start_workflow(
            "MyWorkflow",
            "Hello Temporal!"
        )
        
        # Wait for result
        result = await handle.result()
        print(f"Result: {result}")

if __name__ == "__main__":
    asyncio.run(main())

Temporal Architecture

Core Components

Temporal’s architecture provides robust workflow orchestration through several key components:

Temporal Server

Manages workflow state, persists event history, and coordinates execution

Workers

Execute workflow and activity code, poll for tasks from the server

Event Store

Immutable log of all workflow events, enabling replay and fault tolerance

Task Queues

Distribute work between server and workers, enabling load balancing

Benefits of Temporal Architecture

Durability & Fault Tolerance: Temporal’s event sourcing model ensures that every workflow step is persisted. If a worker crashes, another worker can pick up where it left off by replaying the event history.
# This workflow will survive any infrastructure failure
@app.workflow
class ResilientWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, data: dict) -> WorkflowResult[dict]:
        # Step 1: Process data (checkpointed)
        result1 = await self.process_step_1(data)
        
        # Step 2: Validate results (checkpointed)
        result2 = await self.validate_step_2(result1)
        
        # Step 3: Finalize (checkpointed)
        # If worker crashes here, it will resume from this point
        result3 = await self.finalize_step_3(result2)
        
        return WorkflowResult(value=result3)
Automatic Retries & Exponential Backoff: Temporal handles activity failures with configurable retry policies:
from temporalio.common import RetryPolicy
from datetime import timedelta

@app.workflow
class RetryWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, input: str) -> WorkflowResult[str]:
        # Configure retry policy for this activity
        retry_policy = RetryPolicy(
            initial_interval=timedelta(seconds=1),
            maximum_interval=timedelta(minutes=5),
            backoff_coefficient=2.0,
            maximum_attempts=10,
            non_retryable_error_types=["ValidationError"]
        )
        
        # This will automatically retry on failure
        result = await workflow.execute_activity(
            self.unreliable_activity,
            input,
            start_to_close_timeout=timedelta(minutes=5),
            retry_policy=retry_policy
        )
        
        return WorkflowResult(value=result)
    
    async def unreliable_activity(self, data: str) -> str:
        """Activity that might fail and needs retries."""
        # Simulate unreliable external API call
        agent = Agent(name="api_caller", server_names=["http"])
        async with agent:
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            return await llm.generate_str(f"Process via API: {data}")

Activity vs Workflow Distinction

Workflows are orchestration logic that must be deterministic:
  • No direct I/O operations
  • No random number generation without seeds
  • No current time checks (use workflow.now())
  • Pure coordination and decision making
Activities handle non-deterministic operations:
  • External API calls
  • Database operations
  • File I/O
  • Any side effects
@app.workflow
class ProperWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, input: dict) -> WorkflowResult[dict]:
        # ✅ Workflow: Pure orchestration logic
        if input.get("requires_validation"):
            # ✅ Call activity for external operations
            validated = await workflow.execute_activity(
                self.validate_data_activity,
                input,
                start_to_close_timeout=timedelta(minutes=2)
            )
            
            # ✅ Workflow: Decision making based on results
            if validated.get("is_valid"):
                return await self.process_valid_data(validated)
            else:
                return await self.handle_invalid_data(validated)
        
        return WorkflowResult(value=input)
    
    async def validate_data_activity(self, data: dict) -> dict:
        """❌ Activity: Non-deterministic operations allowed here."""
        agent = Agent(name="validator", server_names=["database", "api"])
        async with agent:
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            
            # ✅ External I/O operations in activities
            validation_result = await llm.generate_str(
                f"Validate this data against external service: {data}"
            )
            
            return {"is_valid": "valid" in validation_result.lower(), "result": validation_result}

Advanced Workflow Features

Signal and Query Handlers

Signals allow external systems to communicate with running workflows:
from temporalio import workflow
from typing import Optional

@app.workflow
class ApprovalWorkflow(Workflow[dict]):
    def __init__(self):
        self.approval_status: Optional[str] = None
        self.approval_comments: Optional[str] = None
    
    @workflow.signal
    async def approve_signal(self, comments: str):
        """Signal handler for approval."""
        self.approval_status = "approved"
        self.approval_comments = comments
    
    @workflow.signal
    async def reject_signal(self, reason: str):
        """Signal handler for rejection."""
        self.approval_status = "rejected"
        self.approval_comments = reason
    
    @workflow.query
    def get_status(self) -> dict:
        """Query handler to check current status."""
        return {
            "status": self.approval_status,
            "comments": self.approval_comments
        }
    
    @app.workflow_run
    async def run(self, document: dict) -> WorkflowResult[dict]:
        # Process initial document
        agent = Agent(name="processor", server_names=["filesystem"])
        async with agent:
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            processed = await llm.generate_str(f"Process document: {document}")
        
        # Wait for approval signal (can wait indefinitely)
        await workflow.wait_condition(lambda: self.approval_status is not None)
        
        if self.approval_status == "approved":
            # Continue with approved workflow
            async with agent:
                finalized = await llm.generate_str(
                    f"Finalize approved document: {processed}. Comments: {self.approval_comments}"
                )
            
            return WorkflowResult(value={
                "status": "completed",
                "document": finalized,
                "approval_comments": self.approval_comments
            })
        else:
            # Handle rejection
            return WorkflowResult(
                value=None,
                error=f"Document rejected: {self.approval_comments}"
            )

# Send signals from external code
async def send_approval():
    async with app.run() as agent_app:
        executor = agent_app.executor
        
        # Send approval signal to running workflow
        await executor.signal_workflow(
            "ApprovalWorkflow",
            "workflow-123",
            "approve_signal",
            "Document looks good after review!"
        )
        
        # Query workflow status
        status = await executor.query_workflow(
            "ApprovalWorkflow",
            "workflow-123",
            "get_status"
        )
        print(f"Workflow status: {status}")

Workflow Versioning

Handle workflow updates without breaking running instances:
@app.workflow
class VersionedWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, data: dict) -> WorkflowResult[dict]:
        # Use versioning for backward compatibility
        version = workflow.get_version("data_processing_logic", 1, 3)
        
        if version == 1:
            # Original processing logic
            result = await self.process_v1(data)
        elif version == 2:
            # Enhanced processing with validation
            validated = await self.validate_data(data)
            result = await self.process_v2(validated)
        else:  # version == 3
            # Latest version with advanced features
            validated = await self.validate_data_v2(data)
            enriched = await self.enrich_data(validated)
            result = await self.process_v3(enriched)
        
        # Common post-processing (no versioning needed)
        final_result = await self.post_process(result)
        
        return WorkflowResult(value=final_result)
    
    async def process_v1(self, data: dict) -> dict:
        """Original processing logic."""
        agent = Agent(name="processor_v1", server_names=["filesystem"])
        async with agent:
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            return await llm.generate_str(f"Process v1: {data}")
    
    async def process_v2(self, data: dict) -> dict:
        """Enhanced processing with validation."""
        agent = Agent(name="processor_v2", server_names=["filesystem", "validation"])
        async with agent:
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            return await llm.generate_str(f"Process v2 with validation: {data}")
    
    async def process_v3(self, data: dict) -> dict:
        """Latest version with advanced features."""
        agent = Agent(name="processor_v3", server_names=["filesystem", "validation", "ml"])
        async with agent:
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            return await llm.generate_str(f"Process v3 with ML enhancement: {data}")

Workflow Timeouts and Cancellation

Configure comprehensive timeout policies:
from datetime import timedelta

@app.workflow
class TimeoutWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, data: dict) -> WorkflowResult[dict]:
        try:
            # Set workflow-level timeout
            async with workflow.timeout(timedelta(hours=2)):
                # Step 1: Quick processing (30 seconds max)
                result1 = await workflow.execute_activity(
                    self.quick_process,
                    data,
                    start_to_close_timeout=timedelta(seconds=30)
                )
                
                # Step 2: Medium processing (5 minutes max)
                result2 = await workflow.execute_activity(
                    self.medium_process,
                    result1,
                    start_to_close_timeout=timedelta(minutes=5),
                    heartbeat_timeout=timedelta(seconds=30)  # For long-running activities
                )
                
                # Step 3: Long processing (1 hour max)
                result3 = await workflow.execute_activity(
                    self.long_process,
                    result2,
                    start_to_close_timeout=timedelta(hours=1),
                    schedule_to_close_timeout=timedelta(hours=1, minutes=30)
                )
                
                return WorkflowResult(value=result3)
                
        except workflow.TimeoutError:
            # Handle timeout gracefully
            return WorkflowResult(
                value=None,
                error="Workflow timed out after 2 hours"
            )
        except workflow.CancelledError:
            # Handle cancellation
            return WorkflowResult(
                value=None,
                error="Workflow was cancelled"
            )
    
    async def long_process(self, data: dict) -> dict:
        """Long-running activity with heartbeat."""
        agent = Agent(name="long_processor", server_names=["ml", "database"])
        async with agent:
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            
            # Send heartbeats for long operations
            for i in range(60):  # Simulate 1-hour process
                # Send heartbeat every minute
                workflow.heartbeat(f"Processing step {i+1}/60")
                
                # Do some processing
                partial_result = await llm.generate_str(
                    f"Process chunk {i}: {data}"
                )
                
                # Sleep for 1 minute (simulated)
                await asyncio.sleep(60)
            
            return {"processed": True, "data": data}

Core Concepts

Workflow Definition

Temporal workflows are defined the same way as asyncio workflows:
from mcp_agent.app import MCPApp
from mcp_agent.workflows import Workflow, WorkflowResult
from mcp_agent.agents.agent import Agent
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM

app = MCPApp(name="temporal_agent")

@app.workflow
class DurableWorkflow(Workflow[dict]):
    """A durable workflow that can survive failures."""
    
    @app.workflow_run
    async def run(self, request: dict) -> WorkflowResult[dict]:
        # This workflow is durable - it will resume from
        # where it left off if the worker crashes
        
        agent = Agent(
            name="analyst",
            instruction="Analyze the provided data thoroughly.",
            server_names=["fetch", "filesystem"]
        )
        
        async with agent:
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            
            # Each step is automatically checkpointed
            step1 = await llm.generate_str(f"Analyze: {request['data']}")
            step2 = await llm.generate_str(f"Summarize findings: {step1}")
            step3 = await llm.generate_str(f"Generate report: {step2}")
            
            return WorkflowResult(value={
                "analysis": step1,
                "summary": step2,
                "report": step3
            })

Signals for Human-in-the-Loop

Implement workflows that wait for human input:
from mcp_agent.executor.temporal import Signal

@app.workflow
class ApprovalWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, document: str) -> WorkflowResult[str]:
        # Process document with AI
        agent = Agent(
            name="processor",
            instruction="Process and improve the document.",
            server_names=["filesystem"]
        )
        
        async with agent:
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            processed = await llm.generate_str(f"Improve this document: {document}")
        
        # Wait for human approval
        print(f"Waiting for approval. Workflow ID: {self.id}, Run ID: {self.run_id}")
        
        await app.context.executor.signal_bus.wait_for_signal(
            Signal(name="approve", workflow_id=self.id, run_id=self.run_id)
        )
        
        # Continue after approval
        print("Approval received! Finalizing document...")
        
        async with agent:
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            finalized = await llm.generate_str(f"Finalize approved document: {processed}")
        
        return WorkflowResult(value=finalized)
Send signals from external code:
# Send approval signal
await app.context.executor.signal_bus.send_signal(
    Signal(
        name="approve",
        workflow_id="ApprovalWorkflow",
        run_id="run_abc123",
        payload={"approved_by": "john.doe", "comments": "Looks good!"}
    )
)

Long-Running Workflows

Handle workflows that run for extended periods:
import asyncio
from datetime import timedelta

@app.workflow
class MonitoringWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, config: dict) -> WorkflowResult[dict]:
        monitoring_results = []
        
        # Run for 30 days, checking every hour
        for day in range(30):
            for hour in range(24):
                # Durable sleep - survives restarts
                await asyncio.sleep(3600)  # 1 hour
                
                # Check system status
                agent = Agent(
                    name="monitor",
                    instruction="Check system health and report issues.",
                    server_names=["fetch"]
                )
                
                async with agent:
                    llm = await agent.attach_llm(OpenAIAugmentedLLM)
                    status = await llm.generate_str(f"Check status of: {config['systems']}")
                    
                    monitoring_results.append({
                        "day": day,
                        "hour": hour,
                        "status": status
                    })
                    
                    # Alert if issues found
                    if "critical" in status.lower():
                        await self.send_alert(status)
        
        return WorkflowResult(value={"monitoring_complete": monitoring_results})

Advanced Patterns

Parallel Agent Execution

Run multiple agents in parallel with Temporal:
import asyncio

@app.workflow
class ParallelAnalysisWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, document: str) -> WorkflowResult[dict]:
        # Define parallel tasks
        async def analyze_sentiment():
            agent = Agent(name="sentiment", instruction="Analyze sentiment.")
            async with agent:
                llm = await agent.attach_llm(OpenAIAugmentedLLM)
                return await llm.generate_str(f"Analyze sentiment: {document}")
        
        async def extract_entities():
            agent = Agent(name="entities", instruction="Extract entities.")
            async with agent:
                llm = await agent.attach_llm(OpenAIAugmentedLLM)
                return await llm.generate_str(f"Extract entities: {document}")
        
        async def summarize():
            agent = Agent(name="summarizer", instruction="Summarize content.")
            async with agent:
                llm = await agent.attach_llm(OpenAIAugmentedLLM)
                return await llm.generate_str(f"Summarize: {document}")
        
        # Execute in parallel - Temporal handles orchestration
        sentiment, entities, summary = await asyncio.gather(
            analyze_sentiment(),
            extract_entities(),
            summarize()
        )
        
        return WorkflowResult(value={
            "sentiment": sentiment,
            "entities": entities,
            "summary": summary
        })

Workflow Composition

Compose complex workflows from simpler ones:
@app.workflow
class DataPipelineWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, source: str) -> WorkflowResult[dict]:
        # Step 1: Data extraction workflow
        extraction = DataExtractionWorkflow()
        data = await extraction.run(source)
        
        # Step 2: Data validation workflow
        validation = DataValidationWorkflow()
        validated = await validation.run(data.value)
        
        # Step 3: Data processing workflow
        processing = DataProcessingWorkflow()
        processed = await processing.run(validated.value)
        
        # Step 4: Report generation workflow
        reporting = ReportGenerationWorkflow()
        report = await reporting.run(processed.value)
        
        return WorkflowResult(value={
            "data": data.value,
            "validation": validated.value,
            "processed": processed.value,
            "report": report.value
        })

Error Handling with Compensations

Implement saga pattern for distributed transactions:
@app.workflow
class OrderProcessingWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, order: dict) -> WorkflowResult[dict]:
        compensations = []
        
        try:
            # Step 1: Reserve inventory
            inventory_agent = Agent(name="inventory", server_names=["database"])
            async with inventory_agent:
                llm = await inventory_agent.attach_llm(OpenAIAugmentedLLM)
                reservation = await llm.generate_str(f"Reserve items: {order['items']}")
                compensations.append(("inventory", reservation))
            
            # Step 2: Process payment
            payment_agent = Agent(name="payment", server_names=["payment_api"])
            async with payment_agent:
                llm = await payment_agent.attach_llm(OpenAIAugmentedLLM)
                payment = await llm.generate_str(f"Process payment: {order['total']}")
                compensations.append(("payment", payment))
            
            # Step 3: Ship order
            shipping_agent = Agent(name="shipping", server_names=["shipping_api"])
            async with shipping_agent:
                llm = await shipping_agent.attach_llm(OpenAIAugmentedLLM)
                shipment = await llm.generate_str(f"Ship to: {order['address']}")
            
            return WorkflowResult(value={
                "success": True,
                "reservation": reservation,
                "payment": payment,
                "shipment": shipment
            })
            
        except Exception as e:
            # Run compensations in reverse order
            for service, data in reversed(compensations):
                await self.compensate(service, data)
            
            return WorkflowResult(
                value=None,
                error=f"Order failed: {e}. Compensations executed."
            )
    
    async def compensate(self, service: str, data: str):
        """Execute compensation for failed step."""
        agent = Agent(name=f"{service}_compensation")
        async with agent:
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            await llm.generate_str(f"Compensate {service}: {data}")

Production Deployment

Infrastructure Requirements

Minimum Production Setup:
  • Temporal Server cluster (3+ nodes for HA)
  • PostgreSQL/MySQL database with replication
  • Elasticsearch for visibility (optional but recommended)
  • Load balancer for Temporal frontend
  • Monitoring stack (Prometheus, Grafana)
Resource Planning:
# Example Kubernetes deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: temporal-server
spec:
  replicas: 3
  selector:
    matchLabels:
      app: temporal-server
  template:
    spec:
      containers:
      - name: temporal
        image: temporalio/auto-setup:latest
        resources:
          requests:
            memory: "1Gi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "1000m"
        env:
        - name: DB
          value: postgresql
        - name: POSTGRES_SEEDS
          value: postgres-primary:5432
        - name: DYNAMIC_CONFIG_FILE_PATH
          value: /etc/temporal/config/dynamicconfig.yaml

High Availability Configuration

Configure Temporal for production resilience:
# temporal-server-config.yaml
persistence:
  defaultStore: default
  visibilityStore: visibility
  numHistoryShards: 512
  datastores:
    default:
      sql:
        pluginName: postgres
        databaseName: temporal
        connectAddr: postgres-cluster:5432
        connectProtocol: tcp
        maxConns: 20
        maxIdleConns: 20
        maxConnLifetime: 1h
    visibility:
      sql:
        pluginName: postgres
        databaseName: temporal_visibility
        connectAddr: postgres-cluster:5432

global:
  membership:
    maxJoinDuration: 30s
    broadcastAddress: 0.0.0.0
  pprof:
    port: 7936

services:
  frontend:
    rpc:
      grpcPort: 7233
      membershipPort: 6933
      bindOnLocalHost: false
  history:
    rpc:
      grpcPort: 7234
      membershipPort: 6934
      bindOnLocalHost: false
  matching:
    rpc:
      grpcPort: 7235
      membershipPort: 6935
      bindOnLocalHost: false
  worker:
    rpc:
      grpcPort: 7236
      membershipPort: 6936
      bindOnLocalHost: false

clusterMetadata:
  enableGlobalNamespace: true
  failoverVersionIncrement: 10
  masterClusterName: primary
  currentClusterName: primary
  clusterInformation:
    primary:
      enabled: true
      initialFailoverVersion: 0
      rpcName: frontend
      rpcAddress: 0.0.0.0:7233

Temporal Cloud

For production, use Temporal Cloud:
mcp_agent.config.yaml
execution_engine: temporal

temporal:
  host: your-namespace.tmprl.cloud
  port: 7233
  namespace: your-namespace
  task_queue: mcp-agent-production
  tls:
    client_cert_path: /path/to/client.crt
    client_key_path: /path/to/client.key
    ca_cert_path: /path/to/ca.crt
    server_name: your-namespace.tmprl.cloud
  data_converter:
    encryption_key: ${TEMPORAL_ENCRYPTION_KEY}
    codec: aes256gcm
  retry_policy:
    initial_interval: 1
    maximum_interval: 100
    backoff_coefficient: 2
    maximum_attempts: 50
  auth:
    api_key: ${TEMPORAL_API_KEY}
    namespace: your-namespace

Security Best Practices

Data Encryption:
from temporalio.client import Client
from temporalio.converter import EncryptionConverter, CompositeConverter
from cryptography.fernet import Fernet

# Generate encryption key (store securely)
encryption_key = Fernet.generate_key()

# Create encrypted client
client = await Client.connect(
    "your-namespace.tmprl.cloud:7233",
    namespace="your-namespace",
    data_converter=CompositeConverter(
        EncryptionConverter(
            encryption_key,
            compress=True  # Enable compression
        )
    ),
    tls=True
)
Access Control:
# RBAC configuration for Temporal namespaces
namespaces:
  production:
    retention: "30d"
    archival:
      history:
        state: "enabled"
        uri: "s3://temporal-history-archive"
      visibility:
        state: "enabled"
        uri: "s3://temporal-visibility-archive"
    authorization:
      default_role: "worker"
      roles:
        admin:
          permissions:
            - "namespace:*"
            - "workflow:*"
            - "activity:*"
        worker:
          permissions:
            - "workflow:execute"
            - "activity:execute"
        monitor:
          permissions:
            - "workflow:read"
            - "namespace:read"
Network Security:
# Network policies for Kubernetes
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: temporal-network-policy
spec:
  podSelector:
    matchLabels:
      app: temporal
  policyTypes:
  - Ingress
  - Egress
  ingress:
  - from:
    - podSelector:
        matchLabels:
          app: mcp-agent-worker
    ports:
    - protocol: TCP
      port: 7233
  egress:
  - to:
    - podSelector:
        matchLabels:
          app: postgres
    ports:
    - protocol: TCP
      port: 5432

Worker Scaling

Scale workers for production workloads:
# worker.py for production
import asyncio
from concurrent.futures import ThreadPoolExecutor
from mcp_agent.executor.temporal import create_temporal_worker_for_app

async def main():
    # Create worker with production settings
    worker = await create_temporal_worker_for_app(
        app,
        task_queue="mcp-agent-production",
        max_concurrent_activities=50,
        max_concurrent_workflows=20,
        max_cached_workflows=100,
        activity_executor=ThreadPoolExecutor(max_workers=100),
    )
    
    # Run worker
    await worker.run()

if __name__ == "__main__":
    # Run multiple worker instances for scaling
    asyncio.run(main())

Monitoring and Observability

Monitor workflows with Temporal UI and custom metrics:
# Add custom metrics
from temporalio.runtime import Runtime, TelemetryConfig, PrometheusConfig

# Configure Prometheus metrics
runtime = Runtime(
    telemetry=TelemetryConfig(
        metrics=PrometheusConfig(bind_address="0.0.0.0:9090")
    )
)

# Track custom metrics in workflows
@app.workflow
class MetricWorkflow(Workflow[str]):
    @app.workflow_run
    async def run(self, input: str) -> WorkflowResult[str]:
        start_time = time.time()
        
        # Your workflow logic
        result = await self.process(input)
        
        # Record metrics
        duration = time.time() - start_time
        app.context.metrics.record("workflow_duration", duration, {
            "workflow": "MetricWorkflow",
            "status": "success"
        })
        
        return WorkflowResult(value=result)

Debugging

Temporal Web UI

Access the Temporal Web UI at http://localhost:8233 to:
  • View all workflow executions
  • Inspect workflow history step-by-step
  • See pending activities and their retry attempts
  • Send signals and queries to running workflows
  • Download workflow history for offline debugging
  • Monitor worker health and task queues

Workflow Replay

Debug production issues by replaying workflow history:
from temporalio.worker import Replayer
import json

async def debug_workflow():
    # Download history from Temporal UI or API
    with open("workflow_history.json") as f:
        history = json.load(f)
    
    # Create replayer with your workflow definitions
    replayer = Replayer(workflows=[MyWorkflow])
    
    # Replay workflow to debug
    try:
        await replayer.replay_workflow(history)
        print("Replay successful - workflow logic is correct")
    except Exception as e:
        print(f"Replay failed - logic error: {e}")

Testing with Time Skipping

Test long-running workflows efficiently:
import pytest
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker

@pytest.mark.asyncio
async def test_long_running_workflow():
    # Start test environment with time skipping
    async with await WorkflowEnvironment.start_time_skipping() as env:
        # Create worker
        worker = Worker(
            env.client,
            task_queue="test-queue",
            workflows=[MonitoringWorkflow],
        )
        
        async with worker:
            # Start workflow
            handle = await env.client.start_workflow(
                MonitoringWorkflow.run,
                {"systems": ["api", "database"]},
                id="test-monitoring",
                task_queue="test-queue",
            )
            
            # Time automatically advances during sleep
            # 30 days completes instantly in tests
            result = await handle.result()
            
            assert len(result["monitoring_complete"]) == 720  # 30 days * 24 hours

Migration Guide

From Asyncio to Temporal

Your workflow code remains largely the same. Here’s what changes:
execution_engine: asyncio
logger:
  transports: [console]
  level: info

Running Workflows

async with app.run():
    workflow = MyWorkflow()
    result = await workflow.run("input")
    print(result.value)

Best Practices

Common Patterns

Polling External Systems

@app.workflow
class PollingWorkflow(Workflow[dict]):
    @app.workflow_run
    async def run(self, job_id: str) -> WorkflowResult[dict]:
        max_attempts = 100
        
        for attempt in range(max_attempts):
            # Check job status
            agent = Agent(name="checker", server_names=["api"])
            async with agent:
                llm = await agent.attach_llm(OpenAIAugmentedLLM)
                status = await llm.generate_str(f"Check job status: {job_id}")
            
            if "completed" in status:
                return WorkflowResult(value={"status": "completed", "result": status})
            
            if "failed" in status:
                return WorkflowResult(value=None, error=f"Job failed: {status}")
            
            # Wait before next poll (durable)
            await asyncio.sleep(60)  # 1 minute
        
        return WorkflowResult(value=None, error="Job timed out")

Scheduled Workflows

@app.workflow
class ScheduledWorkflow(Workflow[None]):
    @app.workflow_run
    async def run(self, schedule: dict) -> WorkflowResult[None]:
        """Run daily at specified time."""
        while True:
            # Wait until next scheduled time
            next_run = self.calculate_next_run(schedule)
            await workflow.sleep_until(next_run)
            
            # Execute scheduled task
            await self.execute_scheduled_task()
            
            # Continue as new to prevent history growth
            workflow.continue_as_new(schedule)

Examples

Explore complete Temporal examples:

Next Steps