Temporal provides durable execution for your agent workflows, enabling automatic retries, pause/resume capabilities, and time-travel debugging. Perfect for production deployments.
Why Temporal?
mcp-agent supports both asyncio
and temporal
execution engines. While asyncio works great for development and simple workflows, Temporal is recommended for production deployments because it provides:
Durable Execution Workflows survive failures, restarts, and infrastructure issues
Automatic Retries Failed activities are automatically retried with configurable policies
Pause & Resume Workflows can be paused indefinitely and resumed with new data
Observability Complete workflow history and time-travel debugging via Temporal UI
Scalability Distribute workflow execution across multiple workers
Long-Running Workflows Support for workflows that run for days, weeks, or months
Quick Start
Install Temporal CLI
Install the Temporal CLI for local development: # macOS
brew install temporal
# Linux/WSL
curl -sSf https://temporal.download/cli.sh | sh
# Windows
# Download from https://github.com/temporalio/cli/releases
Start Temporal Server
Run a local Temporal server for development: temporal server start-dev
This starts:
Temporal Server on localhost:7233
Web UI on http://localhost:8233
Configure mcp-agent
Update your mcp_agent.config.yaml
: execution_engine : temporal
temporal :
host : localhost
port : 7233
namespace : default
task_queue : mcp-agent
max_concurrent_activities : 10
Create Worker
Create a worker to process workflows: import asyncio
from mcp_agent.app import MCPApp
from mcp_agent.workflows import Workflow, WorkflowResult
from mcp_agent.executor.temporal import create_temporal_worker_for_app
app = MCPApp( name = "my_agent" )
# Define your workflows here
@app.workflow
class MyWorkflow (Workflow[ str ]):
@app.workflow_run
async def run ( self , input : str ) -> WorkflowResult[ str ]:
return WorkflowResult( value = f "Processed: { input } " )
async def main ():
async with create_temporal_worker_for_app(app) as worker:
await worker.run()
if __name__ == "__main__" :
asyncio.run(main())
Run Workflow
Execute your workflow: import asyncio
from mcp_agent.app import MCPApp
app = MCPApp( name = "my_agent" )
async def main ():
async with app.run() as agent_app:
executor = agent_app.executor
# Start workflow
handle = await executor.start_workflow(
"MyWorkflow" ,
"Hello Temporal!"
)
# Wait for result
result = await handle.result()
print ( f "Result: { result } " )
if __name__ == "__main__" :
asyncio.run(main())
Temporal Architecture
Core Components
Temporal’s architecture provides robust workflow orchestration through several key components:
Temporal Server Manages workflow state, persists event history, and coordinates execution
Workers Execute workflow and activity code, poll for tasks from the server
Event Store Immutable log of all workflow events, enabling replay and fault tolerance
Task Queues Distribute work between server and workers, enabling load balancing
Benefits of Temporal Architecture
Durability & Fault Tolerance:
Temporal’s event sourcing model ensures that every workflow step is persisted. If a worker crashes, another worker can pick up where it left off by replaying the event history.
# This workflow will survive any infrastructure failure
@app.workflow
class ResilientWorkflow (Workflow[ dict ]):
@app.workflow_run
async def run ( self , data : dict ) -> WorkflowResult[ dict ]:
# Step 1: Process data (checkpointed)
result1 = await self .process_step_1(data)
# Step 2: Validate results (checkpointed)
result2 = await self .validate_step_2(result1)
# Step 3: Finalize (checkpointed)
# If worker crashes here, it will resume from this point
result3 = await self .finalize_step_3(result2)
return WorkflowResult( value = result3)
Automatic Retries & Exponential Backoff:
Temporal handles activity failures with configurable retry policies:
from temporalio.common import RetryPolicy
from datetime import timedelta
@app.workflow
class RetryWorkflow (Workflow[ str ]):
@app.workflow_run
async def run ( self , input : str ) -> WorkflowResult[ str ]:
# Configure retry policy for this activity
retry_policy = RetryPolicy(
initial_interval = timedelta( seconds = 1 ),
maximum_interval = timedelta( minutes = 5 ),
backoff_coefficient = 2.0 ,
maximum_attempts = 10 ,
non_retryable_error_types = [ "ValidationError" ]
)
# This will automatically retry on failure
result = await workflow.execute_activity(
self .unreliable_activity,
input ,
start_to_close_timeout = timedelta( minutes = 5 ),
retry_policy = retry_policy
)
return WorkflowResult( value = result)
async def unreliable_activity ( self , data : str ) -> str :
"""Activity that might fail and needs retries."""
# Simulate unreliable external API call
agent = Agent( name = "api_caller" , server_names = [ "http" ])
async with agent:
llm = await agent.attach_llm(OpenAIAugmentedLLM)
return await llm.generate_str( f "Process via API: { data } " )
Activity vs Workflow Distinction
Workflows are orchestration logic that must be deterministic:
No direct I/O operations
No random number generation without seeds
No current time checks (use workflow.now()
)
Pure coordination and decision making
Activities handle non-deterministic operations:
External API calls
Database operations
File I/O
Any side effects
@app.workflow
class ProperWorkflow (Workflow[ dict ]):
@app.workflow_run
async def run ( self , input : dict ) -> WorkflowResult[ dict ]:
# ✅ Workflow: Pure orchestration logic
if input .get( "requires_validation" ):
# ✅ Call activity for external operations
validated = await workflow.execute_activity(
self .validate_data_activity,
input ,
start_to_close_timeout = timedelta( minutes = 2 )
)
# ✅ Workflow: Decision making based on results
if validated.get( "is_valid" ):
return await self .process_valid_data(validated)
else :
return await self .handle_invalid_data(validated)
return WorkflowResult( value = input )
async def validate_data_activity ( self , data : dict ) -> dict :
"""❌ Activity: Non-deterministic operations allowed here."""
agent = Agent( name = "validator" , server_names = [ "database" , "api" ])
async with agent:
llm = await agent.attach_llm(OpenAIAugmentedLLM)
# ✅ External I/O operations in activities
validation_result = await llm.generate_str(
f "Validate this data against external service: { data } "
)
return { "is_valid" : "valid" in validation_result.lower(), "result" : validation_result}
Advanced Workflow Features
Signal and Query Handlers
Signals allow external systems to communicate with running workflows:
from temporalio import workflow
from typing import Optional
@app.workflow
class ApprovalWorkflow (Workflow[ dict ]):
def __init__ ( self ):
self .approval_status: Optional[ str ] = None
self .approval_comments: Optional[ str ] = None
@workflow.signal
async def approve_signal ( self , comments : str ):
"""Signal handler for approval."""
self .approval_status = "approved"
self .approval_comments = comments
@workflow.signal
async def reject_signal ( self , reason : str ):
"""Signal handler for rejection."""
self .approval_status = "rejected"
self .approval_comments = reason
@workflow.query
def get_status ( self ) -> dict :
"""Query handler to check current status."""
return {
"status" : self .approval_status,
"comments" : self .approval_comments
}
@app.workflow_run
async def run ( self , document : dict ) -> WorkflowResult[ dict ]:
# Process initial document
agent = Agent( name = "processor" , server_names = [ "filesystem" ])
async with agent:
llm = await agent.attach_llm(OpenAIAugmentedLLM)
processed = await llm.generate_str( f "Process document: { document } " )
# Wait for approval signal (can wait indefinitely)
await workflow.wait_condition( lambda : self .approval_status is not None )
if self .approval_status == "approved" :
# Continue with approved workflow
async with agent:
finalized = await llm.generate_str(
f "Finalize approved document: { processed } . Comments: { self .approval_comments } "
)
return WorkflowResult( value = {
"status" : "completed" ,
"document" : finalized,
"approval_comments" : self .approval_comments
})
else :
# Handle rejection
return WorkflowResult(
value = None ,
error = f "Document rejected: { self .approval_comments } "
)
# Send signals from external code
async def send_approval ():
async with app.run() as agent_app:
executor = agent_app.executor
# Send approval signal to running workflow
await executor.signal_workflow(
"ApprovalWorkflow" ,
"workflow-123" ,
"approve_signal" ,
"Document looks good after review!"
)
# Query workflow status
status = await executor.query_workflow(
"ApprovalWorkflow" ,
"workflow-123" ,
"get_status"
)
print ( f "Workflow status: { status } " )
Workflow Versioning
Handle workflow updates without breaking running instances:
@app.workflow
class VersionedWorkflow (Workflow[ dict ]):
@app.workflow_run
async def run ( self , data : dict ) -> WorkflowResult[ dict ]:
# Use versioning for backward compatibility
version = workflow.get_version( "data_processing_logic" , 1 , 3 )
if version == 1 :
# Original processing logic
result = await self .process_v1(data)
elif version == 2 :
# Enhanced processing with validation
validated = await self .validate_data(data)
result = await self .process_v2(validated)
else : # version == 3
# Latest version with advanced features
validated = await self .validate_data_v2(data)
enriched = await self .enrich_data(validated)
result = await self .process_v3(enriched)
# Common post-processing (no versioning needed)
final_result = await self .post_process(result)
return WorkflowResult( value = final_result)
async def process_v1 ( self , data : dict ) -> dict :
"""Original processing logic."""
agent = Agent( name = "processor_v1" , server_names = [ "filesystem" ])
async with agent:
llm = await agent.attach_llm(OpenAIAugmentedLLM)
return await llm.generate_str( f "Process v1: { data } " )
async def process_v2 ( self , data : dict ) -> dict :
"""Enhanced processing with validation."""
agent = Agent( name = "processor_v2" , server_names = [ "filesystem" , "validation" ])
async with agent:
llm = await agent.attach_llm(OpenAIAugmentedLLM)
return await llm.generate_str( f "Process v2 with validation: { data } " )
async def process_v3 ( self , data : dict ) -> dict :
"""Latest version with advanced features."""
agent = Agent( name = "processor_v3" , server_names = [ "filesystem" , "validation" , "ml" ])
async with agent:
llm = await agent.attach_llm(OpenAIAugmentedLLM)
return await llm.generate_str( f "Process v3 with ML enhancement: { data } " )
Workflow Timeouts and Cancellation
Configure comprehensive timeout policies:
from datetime import timedelta
@app.workflow
class TimeoutWorkflow (Workflow[ dict ]):
@app.workflow_run
async def run ( self , data : dict ) -> WorkflowResult[ dict ]:
try :
# Set workflow-level timeout
async with workflow.timeout(timedelta( hours = 2 )):
# Step 1: Quick processing (30 seconds max)
result1 = await workflow.execute_activity(
self .quick_process,
data,
start_to_close_timeout = timedelta( seconds = 30 )
)
# Step 2: Medium processing (5 minutes max)
result2 = await workflow.execute_activity(
self .medium_process,
result1,
start_to_close_timeout = timedelta( minutes = 5 ),
heartbeat_timeout = timedelta( seconds = 30 ) # For long-running activities
)
# Step 3: Long processing (1 hour max)
result3 = await workflow.execute_activity(
self .long_process,
result2,
start_to_close_timeout = timedelta( hours = 1 ),
schedule_to_close_timeout = timedelta( hours = 1 , minutes = 30 )
)
return WorkflowResult( value = result3)
except workflow.TimeoutError:
# Handle timeout gracefully
return WorkflowResult(
value = None ,
error = "Workflow timed out after 2 hours"
)
except workflow.CancelledError:
# Handle cancellation
return WorkflowResult(
value = None ,
error = "Workflow was cancelled"
)
async def long_process ( self , data : dict ) -> dict :
"""Long-running activity with heartbeat."""
agent = Agent( name = "long_processor" , server_names = [ "ml" , "database" ])
async with agent:
llm = await agent.attach_llm(OpenAIAugmentedLLM)
# Send heartbeats for long operations
for i in range ( 60 ): # Simulate 1-hour process
# Send heartbeat every minute
workflow.heartbeat( f "Processing step { i + 1 } /60" )
# Do some processing
partial_result = await llm.generate_str(
f "Process chunk { i } : { data } "
)
# Sleep for 1 minute (simulated)
await asyncio.sleep( 60 )
return { "processed" : True , "data" : data}
Core Concepts
Workflow Definition
Temporal workflows are defined the same way as asyncio workflows:
from mcp_agent.app import MCPApp
from mcp_agent.workflows import Workflow, WorkflowResult
from mcp_agent.agents.agent import Agent
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM
app = MCPApp( name = "temporal_agent" )
@app.workflow
class DurableWorkflow (Workflow[ dict ]):
"""A durable workflow that can survive failures."""
@app.workflow_run
async def run ( self , request : dict ) -> WorkflowResult[ dict ]:
# This workflow is durable - it will resume from
# where it left off if the worker crashes
agent = Agent(
name = "analyst" ,
instruction = "Analyze the provided data thoroughly." ,
server_names = [ "fetch" , "filesystem" ]
)
async with agent:
llm = await agent.attach_llm(OpenAIAugmentedLLM)
# Each step is automatically checkpointed
step1 = await llm.generate_str( f "Analyze: { request[ 'data' ] } " )
step2 = await llm.generate_str( f "Summarize findings: { step1 } " )
step3 = await llm.generate_str( f "Generate report: { step2 } " )
return WorkflowResult( value = {
"analysis" : step1,
"summary" : step2,
"report" : step3
})
Signals for Human-in-the-Loop
Implement workflows that wait for human input:
from mcp_agent.executor.temporal import Signal
@app.workflow
class ApprovalWorkflow (Workflow[ str ]):
@app.workflow_run
async def run ( self , document : str ) -> WorkflowResult[ str ]:
# Process document with AI
agent = Agent(
name = "processor" ,
instruction = "Process and improve the document." ,
server_names = [ "filesystem" ]
)
async with agent:
llm = await agent.attach_llm(OpenAIAugmentedLLM)
processed = await llm.generate_str( f "Improve this document: { document } " )
# Wait for human approval
print ( f "Waiting for approval. Workflow ID: { self .id } , Run ID: { self .run_id } " )
await app.context.executor.signal_bus.wait_for_signal(
Signal( name = "approve" , workflow_id = self .id, run_id = self .run_id)
)
# Continue after approval
print ( "Approval received! Finalizing document..." )
async with agent:
llm = await agent.attach_llm(OpenAIAugmentedLLM)
finalized = await llm.generate_str( f "Finalize approved document: { processed } " )
return WorkflowResult( value = finalized)
Send signals from external code:
# Send approval signal
await app.context.executor.signal_bus.send_signal(
Signal(
name = "approve" ,
workflow_id = "ApprovalWorkflow" ,
run_id = "run_abc123" ,
payload = { "approved_by" : "john.doe" , "comments" : "Looks good!" }
)
)
Long-Running Workflows
Handle workflows that run for extended periods:
import asyncio
from datetime import timedelta
@app.workflow
class MonitoringWorkflow (Workflow[ dict ]):
@app.workflow_run
async def run ( self , config : dict ) -> WorkflowResult[ dict ]:
monitoring_results = []
# Run for 30 days, checking every hour
for day in range ( 30 ):
for hour in range ( 24 ):
# Durable sleep - survives restarts
await asyncio.sleep( 3600 ) # 1 hour
# Check system status
agent = Agent(
name = "monitor" ,
instruction = "Check system health and report issues." ,
server_names = [ "fetch" ]
)
async with agent:
llm = await agent.attach_llm(OpenAIAugmentedLLM)
status = await llm.generate_str( f "Check status of: { config[ 'systems' ] } " )
monitoring_results.append({
"day" : day,
"hour" : hour,
"status" : status
})
# Alert if issues found
if "critical" in status.lower():
await self .send_alert(status)
return WorkflowResult( value = { "monitoring_complete" : monitoring_results})
Advanced Patterns
Parallel Agent Execution
Run multiple agents in parallel with Temporal:
import asyncio
@app.workflow
class ParallelAnalysisWorkflow (Workflow[ dict ]):
@app.workflow_run
async def run ( self , document : str ) -> WorkflowResult[ dict ]:
# Define parallel tasks
async def analyze_sentiment ():
agent = Agent( name = "sentiment" , instruction = "Analyze sentiment." )
async with agent:
llm = await agent.attach_llm(OpenAIAugmentedLLM)
return await llm.generate_str( f "Analyze sentiment: { document } " )
async def extract_entities ():
agent = Agent( name = "entities" , instruction = "Extract entities." )
async with agent:
llm = await agent.attach_llm(OpenAIAugmentedLLM)
return await llm.generate_str( f "Extract entities: { document } " )
async def summarize ():
agent = Agent( name = "summarizer" , instruction = "Summarize content." )
async with agent:
llm = await agent.attach_llm(OpenAIAugmentedLLM)
return await llm.generate_str( f "Summarize: { document } " )
# Execute in parallel - Temporal handles orchestration
sentiment, entities, summary = await asyncio.gather(
analyze_sentiment(),
extract_entities(),
summarize()
)
return WorkflowResult( value = {
"sentiment" : sentiment,
"entities" : entities,
"summary" : summary
})
Workflow Composition
Compose complex workflows from simpler ones:
@app.workflow
class DataPipelineWorkflow (Workflow[ dict ]):
@app.workflow_run
async def run ( self , source : str ) -> WorkflowResult[ dict ]:
# Step 1: Data extraction workflow
extraction = DataExtractionWorkflow()
data = await extraction.run(source)
# Step 2: Data validation workflow
validation = DataValidationWorkflow()
validated = await validation.run(data.value)
# Step 3: Data processing workflow
processing = DataProcessingWorkflow()
processed = await processing.run(validated.value)
# Step 4: Report generation workflow
reporting = ReportGenerationWorkflow()
report = await reporting.run(processed.value)
return WorkflowResult( value = {
"data" : data.value,
"validation" : validated.value,
"processed" : processed.value,
"report" : report.value
})
Error Handling with Compensations
Implement saga pattern for distributed transactions:
@app.workflow
class OrderProcessingWorkflow (Workflow[ dict ]):
@app.workflow_run
async def run ( self , order : dict ) -> WorkflowResult[ dict ]:
compensations = []
try :
# Step 1: Reserve inventory
inventory_agent = Agent( name = "inventory" , server_names = [ "database" ])
async with inventory_agent:
llm = await inventory_agent.attach_llm(OpenAIAugmentedLLM)
reservation = await llm.generate_str( f "Reserve items: { order[ 'items' ] } " )
compensations.append(( "inventory" , reservation))
# Step 2: Process payment
payment_agent = Agent( name = "payment" , server_names = [ "payment_api" ])
async with payment_agent:
llm = await payment_agent.attach_llm(OpenAIAugmentedLLM)
payment = await llm.generate_str( f "Process payment: { order[ 'total' ] } " )
compensations.append(( "payment" , payment))
# Step 3: Ship order
shipping_agent = Agent( name = "shipping" , server_names = [ "shipping_api" ])
async with shipping_agent:
llm = await shipping_agent.attach_llm(OpenAIAugmentedLLM)
shipment = await llm.generate_str( f "Ship to: { order[ 'address' ] } " )
return WorkflowResult( value = {
"success" : True ,
"reservation" : reservation,
"payment" : payment,
"shipment" : shipment
})
except Exception as e:
# Run compensations in reverse order
for service, data in reversed (compensations):
await self .compensate(service, data)
return WorkflowResult(
value = None ,
error = f "Order failed: { e } . Compensations executed."
)
async def compensate ( self , service : str , data : str ):
"""Execute compensation for failed step."""
agent = Agent( name = f " { service } _compensation" )
async with agent:
llm = await agent.attach_llm(OpenAIAugmentedLLM)
await llm.generate_str( f "Compensate { service } : { data } " )
Production Deployment
Infrastructure Requirements
Minimum Production Setup:
Temporal Server cluster (3+ nodes for HA)
PostgreSQL/MySQL database with replication
Elasticsearch for visibility (optional but recommended)
Load balancer for Temporal frontend
Monitoring stack (Prometheus, Grafana)
Resource Planning:
# Example Kubernetes deployment
apiVersion : apps/v1
kind : Deployment
metadata :
name : temporal-server
spec :
replicas : 3
selector :
matchLabels :
app : temporal-server
template :
spec :
containers :
- name : temporal
image : temporalio/auto-setup:latest
resources :
requests :
memory : "1Gi"
cpu : "500m"
limits :
memory : "2Gi"
cpu : "1000m"
env :
- name : DB
value : postgresql
- name : POSTGRES_SEEDS
value : postgres-primary:5432
- name : DYNAMIC_CONFIG_FILE_PATH
value : /etc/temporal/config/dynamicconfig.yaml
High Availability Configuration
Configure Temporal for production resilience:
# temporal-server-config.yaml
persistence :
defaultStore : default
visibilityStore : visibility
numHistoryShards : 512
datastores :
default :
sql :
pluginName : postgres
databaseName : temporal
connectAddr : postgres-cluster:5432
connectProtocol : tcp
maxConns : 20
maxIdleConns : 20
maxConnLifetime : 1h
visibility :
sql :
pluginName : postgres
databaseName : temporal_visibility
connectAddr : postgres-cluster:5432
global :
membership :
maxJoinDuration : 30s
broadcastAddress : 0.0.0.0
pprof :
port : 7936
services :
frontend :
rpc :
grpcPort : 7233
membershipPort : 6933
bindOnLocalHost : false
history :
rpc :
grpcPort : 7234
membershipPort : 6934
bindOnLocalHost : false
matching :
rpc :
grpcPort : 7235
membershipPort : 6935
bindOnLocalHost : false
worker :
rpc :
grpcPort : 7236
membershipPort : 6936
bindOnLocalHost : false
clusterMetadata :
enableGlobalNamespace : true
failoverVersionIncrement : 10
masterClusterName : primary
currentClusterName : primary
clusterInformation :
primary :
enabled : true
initialFailoverVersion : 0
rpcName : frontend
rpcAddress : 0.0.0.0:7233
Temporal Cloud
For production, use Temporal Cloud:
execution_engine : temporal
temporal :
host : your-namespace.tmprl.cloud
port : 7233
namespace : your-namespace
task_queue : mcp-agent-production
tls :
client_cert_path : /path/to/client.crt
client_key_path : /path/to/client.key
ca_cert_path : /path/to/ca.crt
server_name : your-namespace.tmprl.cloud
data_converter :
encryption_key : ${TEMPORAL_ENCRYPTION_KEY}
codec : aes256gcm
retry_policy :
initial_interval : 1
maximum_interval : 100
backoff_coefficient : 2
maximum_attempts : 50
auth :
api_key : ${TEMPORAL_API_KEY}
namespace : your-namespace
Security Best Practices
Data Encryption:
from temporalio.client import Client
from temporalio.converter import EncryptionConverter, CompositeConverter
from cryptography.fernet import Fernet
# Generate encryption key (store securely)
encryption_key = Fernet.generate_key()
# Create encrypted client
client = await Client.connect(
"your-namespace.tmprl.cloud:7233" ,
namespace = "your-namespace" ,
data_converter = CompositeConverter(
EncryptionConverter(
encryption_key,
compress = True # Enable compression
)
),
tls = True
)
Access Control:
# RBAC configuration for Temporal namespaces
namespaces :
production :
retention : "30d"
archival :
history :
state : "enabled"
uri : "s3://temporal-history-archive"
visibility :
state : "enabled"
uri : "s3://temporal-visibility-archive"
authorization :
default_role : "worker"
roles :
admin :
permissions :
- "namespace:*"
- "workflow:*"
- "activity:*"
worker :
permissions :
- "workflow:execute"
- "activity:execute"
monitor :
permissions :
- "workflow:read"
- "namespace:read"
Network Security:
# Network policies for Kubernetes
apiVersion : networking.k8s.io/v1
kind : NetworkPolicy
metadata :
name : temporal-network-policy
spec :
podSelector :
matchLabels :
app : temporal
policyTypes :
- Ingress
- Egress
ingress :
- from :
- podSelector :
matchLabels :
app : mcp-agent-worker
ports :
- protocol : TCP
port : 7233
egress :
- to :
- podSelector :
matchLabels :
app : postgres
ports :
- protocol : TCP
port : 5432
Worker Scaling
Scale workers for production workloads:
# worker.py for production
import asyncio
from concurrent.futures import ThreadPoolExecutor
from mcp_agent.executor.temporal import create_temporal_worker_for_app
async def main ():
# Create worker with production settings
worker = await create_temporal_worker_for_app(
app,
task_queue = "mcp-agent-production" ,
max_concurrent_activities = 50 ,
max_concurrent_workflows = 20 ,
max_cached_workflows = 100 ,
activity_executor = ThreadPoolExecutor( max_workers = 100 ),
)
# Run worker
await worker.run()
if __name__ == "__main__" :
# Run multiple worker instances for scaling
asyncio.run(main())
Monitoring and Observability
Monitor workflows with Temporal UI and custom metrics:
# Add custom metrics
from temporalio.runtime import Runtime, TelemetryConfig, PrometheusConfig
# Configure Prometheus metrics
runtime = Runtime(
telemetry = TelemetryConfig(
metrics = PrometheusConfig( bind_address = "0.0.0.0:9090" )
)
)
# Track custom metrics in workflows
@app.workflow
class MetricWorkflow (Workflow[ str ]):
@app.workflow_run
async def run ( self , input : str ) -> WorkflowResult[ str ]:
start_time = time.time()
# Your workflow logic
result = await self .process( input )
# Record metrics
duration = time.time() - start_time
app.context.metrics.record( "workflow_duration" , duration, {
"workflow" : "MetricWorkflow" ,
"status" : "success"
})
return WorkflowResult( value = result)
Debugging
Temporal Web UI
Access the Temporal Web UI at http://localhost:8233
to:
View all workflow executions
Inspect workflow history step-by-step
See pending activities and their retry attempts
Send signals and queries to running workflows
Download workflow history for offline debugging
Monitor worker health and task queues
Workflow Replay
Debug production issues by replaying workflow history:
from temporalio.worker import Replayer
import json
async def debug_workflow ():
# Download history from Temporal UI or API
with open ( "workflow_history.json" ) as f:
history = json.load(f)
# Create replayer with your workflow definitions
replayer = Replayer( workflows = [MyWorkflow])
# Replay workflow to debug
try :
await replayer.replay_workflow(history)
print ( "Replay successful - workflow logic is correct" )
except Exception as e:
print ( f "Replay failed - logic error: { e } " )
Testing with Time Skipping
Test long-running workflows efficiently:
import pytest
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
@pytest.mark.asyncio
async def test_long_running_workflow ():
# Start test environment with time skipping
async with await WorkflowEnvironment.start_time_skipping() as env:
# Create worker
worker = Worker(
env.client,
task_queue = "test-queue" ,
workflows = [MonitoringWorkflow],
)
async with worker:
# Start workflow
handle = await env.client.start_workflow(
MonitoringWorkflow.run,
{ "systems" : [ "api" , "database" ]},
id = "test-monitoring" ,
task_queue = "test-queue" ,
)
# Time automatically advances during sleep
# 30 days completes instantly in tests
result = await handle.result()
assert len (result[ "monitoring_complete" ]) == 720 # 30 days * 24 hours
Migration Guide
From Asyncio to Temporal
Your workflow code remains largely the same. Here’s what changes:
Before - mcp_agent.config.yaml
After - mcp_agent.config.yaml
execution_engine : asyncio
logger :
transports : [ console ]
level : info
Running Workflows
Before - Asyncio
After - Temporal
async with app.run():
workflow = MyWorkflow()
result = await workflow.run( "input" )
print (result.value)
Best Practices
Workflows must be deterministic. Avoid:
Random number generation without seeds
Current time checks (use workflow.now()
)
Direct I/O operations (use activities)
Non-deterministic data structures
Configure timeouts for workflows and activities: handle = await executor.start_workflow(
"MyWorkflow" ,
input_data,
execution_timeout = timedelta( hours = 1 ),
run_timeout = timedelta( minutes = 30 ),
task_timeout = timedelta( minutes = 5 ),
)
Set meaningful workflow IDs for idempotency: workflow_id = f "process-document- { document_id } "
handle = await executor.start_workflow(
"DocumentWorkflow" ,
document,
workflow_id = workflow_id,
id_reuse_policy = "allow_duplicate_failed_only" ,
)
Version your workflows for safe updates: @app.workflow
class VersionedWorkflow (Workflow[ str ]):
@app.workflow_run
async def run ( self , input : str ) -> WorkflowResult[ str ]:
version = workflow.get_version( "processing_logic" , 1 , 2 )
if version == 1 :
# Old logic
result = await self .process_v1( input )
else :
# New logic
result = await self .process_v2( input )
return WorkflowResult( value = result)
Common Patterns
Polling External Systems
@app.workflow
class PollingWorkflow (Workflow[ dict ]):
@app.workflow_run
async def run ( self , job_id : str ) -> WorkflowResult[ dict ]:
max_attempts = 100
for attempt in range (max_attempts):
# Check job status
agent = Agent( name = "checker" , server_names = [ "api" ])
async with agent:
llm = await agent.attach_llm(OpenAIAugmentedLLM)
status = await llm.generate_str( f "Check job status: { job_id } " )
if "completed" in status:
return WorkflowResult( value = { "status" : "completed" , "result" : status})
if "failed" in status:
return WorkflowResult( value = None , error = f "Job failed: { status } " )
# Wait before next poll (durable)
await asyncio.sleep( 60 ) # 1 minute
return WorkflowResult( value = None , error = "Job timed out" )
Scheduled Workflows
@app.workflow
class ScheduledWorkflow (Workflow[ None ]):
@app.workflow_run
async def run ( self , schedule : dict ) -> WorkflowResult[ None ]:
"""Run daily at specified time."""
while True :
# Wait until next scheduled time
next_run = self .calculate_next_run(schedule)
await workflow.sleep_until(next_run)
# Execute scheduled task
await self .execute_scheduled_task()
# Continue as new to prevent history growth
workflow.continue_as_new(schedule)
Examples
Explore complete Temporal examples:
Next Steps