Set up production-grade observability for your agent workflows with OpenTelemetry, metrics collection, distributed tracing, and intelligent alerting systems.

Observability Overview

Production agent workflows require comprehensive monitoring to ensure reliability, performance, and troubleshooting capabilities:

Metrics Collection

Track performance, throughput, and system health metrics

Distributed Tracing

Follow requests across agents, workflows, and external services

Structured Logging

Centralized, searchable logs with contextual information

Alerting

Proactive notifications for issues and anomalies

OpenTelemetry Configuration

Core Setup

Configure OpenTelemetry for comprehensive observability:
# observability/telemetry.py
import asyncio
from opentelemetry import trace, metrics
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor
from opentelemetry.instrumentation.logging import LoggingInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.resources import Resource
from prometheus_client import start_http_server

class ObservabilityManager:
    """Manages observability configuration for MCP Agent workflows."""
    
    def __init__(self, service_name: str, service_version: str = "1.0.0"):
        self.service_name = service_name
        self.service_version = service_version
        self.resource = Resource.create({
            "service.name": service_name,
            "service.version": service_version,
            "telemetry.sdk.name": "opentelemetry",
            "telemetry.sdk.language": "python",
        })
        
        self.tracer_provider = None
        self.meter_provider = None
        self.tracer = None
        self.meter = None
        
    async def initialize(self, config: dict):
        """Initialize all observability components."""
        await self.setup_tracing(config.get("tracing", {}))
        await self.setup_metrics(config.get("metrics", {}))
        await self.setup_logging(config.get("logging", {}))
        await self.instrument_libraries()
        
        print(f" Observability initialized for {self.service_name}")
    
    async def setup_tracing(self, tracing_config: dict):
        """Configure distributed tracing."""
        # Create tracer provider
        self.tracer_provider = TracerProvider(resource=self.resource)
        trace.set_tracer_provider(self.tracer_provider)
        
        # Configure Jaeger exporter
        jaeger_exporter = JaegerExporter(
            agent_host_name=tracing_config.get("jaeger_host", "localhost"),
            agent_port=tracing_config.get("jaeger_port", 6831),
            collector_endpoint=tracing_config.get("jaeger_endpoint")
        )
        
        # Add span processor
        span_processor = BatchSpanProcessor(jaeger_exporter)
        self.tracer_provider.add_span_processor(span_processor)
        
        # Get tracer instance
        self.tracer = trace.get_tracer(self.service_name)
    
    async def setup_metrics(self, metrics_config: dict):
        """Configure metrics collection."""
        # Start Prometheus metrics server
        prometheus_port = metrics_config.get("prometheus_port", 8000)
        start_http_server(prometheus_port)
        
        # Create metric reader
        metric_reader = PrometheusMetricReader()
        
        # Create meter provider
        self.meter_provider = MeterProvider(
            resource=self.resource,
            metric_readers=[metric_reader]
        )
        metrics.set_meter_provider(self.meter_provider)
        
        # Get meter instance
        self.meter = metrics.get_meter(self.service_name)
    
    async def setup_logging(self, logging_config: dict):
        """Configure structured logging."""
        LoggingInstrumentor().instrument(
            set_logging_format=True,
            log_correlation=True
        )
    
    async def instrument_libraries(self):
        """Instrument common libraries."""
        AsyncioInstrumentor().instrument()
        
        # Add more instrumentations as needed
        # HTTPXInstrumentor().instrument()
        # SQLAlchemyInstrumentor().instrument()

# Global observability manager
observability_manager = None

async def initialize_observability(config: dict):
    """Initialize global observability."""
    global observability_manager
    observability_manager = ObservabilityManager(
        service_name=config.get("service_name", "mcp-agent"),
        service_version=config.get("service_version", "1.0.0")
    )
    await observability_manager.initialize(config)

def get_tracer():
    """Get the global tracer instance."""
    return observability_manager.tracer if observability_manager else None

def get_meter():
    """Get the global meter instance."""
    return observability_manager.meter if observability_manager else None

MCP Agent Integration

Integrate observability into your MCP Agent workflows:
# workflows/observable_workflow.py
from mcp_agent.app import MCPApp
from mcp_agent.workflows import Workflow, WorkflowResult
from mcp_agent.agents.agent import Agent
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM
from opentelemetry import trace, metrics
from observability.telemetry import get_tracer, get_meter
import time
import logging

logger = logging.getLogger(__name__)

app = MCPApp(name="observable_agent")

class ObservableWorkflow(Workflow[dict]):
    """Base workflow class with built-in observability."""
    
    def __init__(self):
        super().__init__()
        self.tracer = get_tracer()
        self.meter = get_meter()
        
        # Create metrics
        if self.meter:
            self.workflow_duration = self.meter.create_histogram(
                "workflow_duration_seconds",
                description="Duration of workflow execution",
                unit="s"
            )
            
            self.workflow_counter = self.meter.create_counter(
                "workflow_executions_total",
                description="Total number of workflow executions"
            )
            
            self.agent_calls = self.meter.create_counter(
                "agent_calls_total",
                description="Total number of agent calls"
            )
            
            self.llm_tokens = self.meter.create_histogram(
                "llm_tokens_used",
                description="Number of LLM tokens used",
                unit="tokens"
            )

@app.workflow
class DataProcessingWorkflow(ObservableWorkflow):
    """Observable data processing workflow with comprehensive tracking."""
    
    @app.workflow_run
    async def run(self, input_data: dict) -> WorkflowResult[dict]:
        workflow_start = time.time()
        
        with self.tracer.start_as_current_span("data_processing_workflow") as span:
            # Add workflow attributes to span
            span.set_attributes({
                "workflow.name": "DataProcessingWorkflow",
                "workflow.input_size": len(str(input_data)),
                "workflow.version": "1.0.0"
            })
            
            try:
                # Track workflow execution
                self.workflow_counter.add(1, {"workflow": "data_processing"})
                
                # Step 1: Data validation with tracing
                validation_result = await self.trace_step(
                    "data_validation",
                    self.validate_data,
                    input_data
                )
                
                # Step 2: Data processing with tracing
                processing_result = await self.trace_step(
                    "data_processing", 
                    self.process_data,
                    validation_result
                )
                
                # Step 3: Result synthesis with tracing
                final_result = await self.trace_step(
                    "result_synthesis",
                    self.synthesize_results,
                    processing_result
                )
                
                # Record successful completion
                workflow_duration = time.time() - workflow_start
                self.workflow_duration.record(workflow_duration, {
                    "workflow": "data_processing",
                    "status": "success"
                })
                
                span.set_attribute("workflow.status", "success")
                span.set_attribute("workflow.duration", workflow_duration)
                
                logger.info(
                    "Workflow completed successfully",
                    extra={
                        "workflow": "data_processing",
                        "duration": workflow_duration,
                        "input_size": len(str(input_data)),
                        "output_size": len(str(final_result))
                    }
                )
                
                return WorkflowResult(value=final_result)
                
            except Exception as e:
                # Record failure
                workflow_duration = time.time() - workflow_start
                self.workflow_duration.record(workflow_duration, {
                    "workflow": "data_processing",
                    "status": "error"
                })
                
                span.set_attribute("workflow.status", "error")
                span.set_attribute("error.message", str(e))
                span.record_exception(e)
                
                logger.error(
                    "Workflow failed",
                    extra={
                        "workflow": "data_processing",
                        "error": str(e),
                        "duration": workflow_duration
                    },
                    exc_info=True
                )
                
                return WorkflowResult(value=None, error=str(e))
    
    async def trace_step(self, step_name: str, step_function, data):
        """Execute a workflow step with tracing."""
        with self.tracer.start_as_current_span(f"step_{step_name}") as span:
            step_start = time.time()
            
            span.set_attributes({
                "step.name": step_name,
                "step.input_size": len(str(data))
            })
            
            try:
                result = await step_function(data)
                step_duration = time.time() - step_start
                
                span.set_attribute("step.status", "success")
                span.set_attribute("step.duration", step_duration)
                span.set_attribute("step.output_size", len(str(result)))
                
                return result
                
            except Exception as e:
                span.set_attribute("step.status", "error")
                span.record_exception(e)
                raise
    
    async def validate_data(self, data: dict) -> dict:
        """Data validation step with agent observability."""
        validator_agent = Agent(
            name="data_validator",
            instruction="Validate data quality and format.",
            server_names=["validation_service"]
        )
        
        with self.tracer.start_as_current_span("agent_validation") as span:
            # Track agent usage
            self.agent_calls.add(1, {"agent": "data_validator", "step": "validation"})
            
            span.set_attributes({
                "agent.name": "data_validator",
                "agent.instruction": validator_agent.instruction,
                "agent.servers": str(validator_agent.server_names)
            })
            
            async with validator_agent:
                llm = await validator_agent.attach_llm(OpenAIAugmentedLLM)
                
                # Track LLM usage
                with self.tracer.start_as_current_span("llm_validation") as llm_span:
                    validation_result = await llm.generate_str(
                        f"Validate this data for quality and format: {data}"
                    )
                    
                    # Record LLM token usage (approximate)
                    estimated_tokens = len(str(data)) // 4 + len(validation_result) // 4
                    self.llm_tokens.record(estimated_tokens, {
                        "model": "openai",
                        "operation": "validation"
                    })
                    
                    llm_span.set_attributes({
                        "llm.model": "openai",
                        "llm.operation": "validation",
                        "llm.estimated_tokens": estimated_tokens
                    })
                
                span.set_attribute("validation.result_size", len(validation_result))
                
                return {
                    "original_data": data,
                    "validation_result": validation_result,
                    "is_valid": "valid" in validation_result.lower()
                }
    
    async def process_data(self, validation_data: dict) -> dict:
        """Data processing step with detailed tracing."""
        if not validation_data["is_valid"]:
            raise ValueError("Data validation failed")
        
        processor_agent = Agent(
            name="data_processor",
            instruction="Process and enrich validated data.",
            server_names=["processing_service", "ml_service"]
        )
        
        with self.tracer.start_as_current_span("agent_processing") as span:
            self.agent_calls.add(1, {"agent": "data_processor", "step": "processing"})
            
            span.set_attributes({
                "agent.name": "data_processor",
                "processing.input_valid": validation_data["is_valid"]
            })
            
            async with processor_agent:
                llm = await processor_agent.attach_llm(OpenAIAugmentedLLM)
                
                with self.tracer.start_as_current_span("llm_processing") as llm_span:
                    processed_result = await llm.generate_str(
                        f"Process and enrich this validated data: {validation_data['original_data']}"
                    )
                    
                    # Track LLM usage
                    estimated_tokens = len(str(validation_data)) // 4 + len(processed_result) // 4
                    self.llm_tokens.record(estimated_tokens, {
                        "model": "openai",
                        "operation": "processing"
                    })
                    
                    llm_span.set_attributes({
                        "llm.model": "openai",
                        "llm.operation": "processing",
                        "llm.estimated_tokens": estimated_tokens
                    })
                
                return {
                    "validation_data": validation_data,
                    "processed_result": processed_result
                }
    
    async def synthesize_results(self, processing_data: dict) -> dict:
        """Final synthesis step."""
        synthesizer_agent = Agent(
            name="result_synthesizer",
            instruction="Synthesize final results from processed data.",
            server_names=["synthesis_service"]
        )
        
        with self.tracer.start_as_current_span("agent_synthesis") as span:
            self.agent_calls.add(1, {"agent": "result_synthesizer", "step": "synthesis"})
            
            async with synthesizer_agent:
                llm = await synthesizer_agent.attach_llm(OpenAIAugmentedLLM)
                
                with self.tracer.start_as_current_span("llm_synthesis"):
                    synthesis = await llm.generate_str(
                        f"Synthesize final comprehensive results: {processing_data}"
                    )
                    
                    # Track final LLM usage
                    estimated_tokens = len(str(processing_data)) // 4 + len(synthesis) // 4
                    self.llm_tokens.record(estimated_tokens, {
                        "model": "openai",
                        "operation": "synthesis"
                    })
                
                return {
                    "processing_data": processing_data,
                    "final_synthesis": synthesis,
                    "completion_timestamp": time.time()
                }

Metrics Collection

Custom Metrics for Agent Workflows

Define domain-specific metrics for agent workflows:
# metrics/agent_metrics.py
from opentelemetry import metrics
from typing import Dict, Any
import time
from contextlib import asynccontextmanager

class AgentMetrics:
    """Custom metrics collection for agent workflows."""
    
    def __init__(self, meter):
        self.meter = meter
        
        # Workflow metrics
        self.workflow_executions = meter.create_counter(
            "agent_workflow_executions_total",
            description="Total number of workflow executions"
        )
        
        self.workflow_duration = meter.create_histogram(
            "agent_workflow_duration_seconds",
            description="Duration of workflow executions"
        )
        
        self.workflow_success_rate = meter.create_up_down_counter(
            "agent_workflow_success_rate",
            description="Success rate of workflow executions"
        )
        
        # Agent metrics
        self.agent_creations = meter.create_counter(
            "agent_creations_total",
            description="Total number of agent creations"
        )
        
        self.agent_active_count = meter.create_up_down_counter(
            "agent_active_count",
            description="Number of currently active agents"
        )
        
        self.agent_execution_duration = meter.create_histogram(
            "agent_execution_duration_seconds",
            description="Duration of agent executions"
        )
        
        # LLM metrics
        self.llm_requests = meter.create_counter(
            "llm_requests_total",
            description="Total number of LLM requests"
        )
        
        self.llm_tokens_consumed = meter.create_counter(
            "llm_tokens_consumed_total",
            description="Total number of LLM tokens consumed"
        )
        
        self.llm_cost = meter.create_counter(
            "llm_cost_total",
            description="Total LLM usage cost",
            unit="USD"
        )
        
        # System metrics
        self.memory_usage = meter.create_up_down_counter(
            "agent_memory_usage_bytes",
            description="Memory usage by agent processes"
        )
        
        self.error_count = meter.create_counter(
            "agent_errors_total",
            description="Total number of agent errors"
        )
        
        # Business metrics
        self.tasks_completed = meter.create_counter(
            "business_tasks_completed_total",
            description="Total number of business tasks completed"
        )
        
        self.data_processed = meter.create_counter(
            "business_data_processed_bytes",
            description="Total amount of data processed"
        )
    
    @asynccontextmanager
    async def track_workflow_execution(self, workflow_name: str, attributes: Dict[str, Any] = None):
        """Context manager to track workflow execution metrics."""
        start_time = time.time()
        attrs = {"workflow": workflow_name}
        if attributes:
            attrs.update(attributes)
        
        # Increment execution counter
        self.workflow_executions.add(1, attrs)
        
        try:
            yield
            # Success
            duration = time.time() - start_time
            self.workflow_duration.record(duration, {**attrs, "status": "success"})
            self.workflow_success_rate.add(1, {**attrs, "status": "success"})
            
        except Exception as e:
            # Failure
            duration = time.time() - start_time
            self.workflow_duration.record(duration, {**attrs, "status": "error"})
            self.workflow_success_rate.add(-1, {**attrs, "status": "error"})
            self.error_count.add(1, {**attrs, "error_type": type(e).__name__})
            raise
    
    @asynccontextmanager
    async def track_agent_execution(self, agent_name: str, attributes: Dict[str, Any] = None):
        """Context manager to track agent execution metrics."""
        start_time = time.time()
        attrs = {"agent": agent_name}
        if attributes:
            attrs.update(attributes)
        
        # Increment active agent count
        self.agent_creations.add(1, attrs)
        self.agent_active_count.add(1, attrs)
        
        try:
            yield
            
        finally:
            # Always decrement active count and record duration
            duration = time.time() - start_time
            self.agent_execution_duration.record(duration, attrs)
            self.agent_active_count.add(-1, attrs)
    
    def track_llm_usage(self, model: str, tokens: int, cost: float = 0, operation: str = "generate"):
        """Track LLM usage metrics."""
        attrs = {"model": model, "operation": operation}
        
        self.llm_requests.add(1, attrs)
        self.llm_tokens_consumed.add(tokens, attrs)
        
        if cost > 0:
            self.llm_cost.add(cost, attrs)
    
    def track_business_metrics(self, metric_type: str, value: float, attributes: Dict[str, Any] = None):
        """Track business-specific metrics."""
        attrs = attributes or {}
        
        if metric_type == "tasks_completed":
            self.tasks_completed.add(value, attrs)
        elif metric_type == "data_processed":
            self.data_processed.add(value, attrs)
    
    def track_system_metrics(self, memory_bytes: int, attributes: Dict[str, Any] = None):
        """Track system resource metrics."""
        attrs = attributes or {}
        self.memory_usage.add(memory_bytes, attrs)

# Usage example in workflow
@app.workflow
class MetricsEnabledWorkflow(Workflow[dict]):
    def __init__(self):
        super().__init__()
        self.metrics = AgentMetrics(get_meter())
    
    @app.workflow_run
    async def run(self, data: dict) -> WorkflowResult[dict]:
        async with self.metrics.track_workflow_execution("data_processing", {"version": "1.0"}):
            # Agent execution with metrics
            async with self.metrics.track_agent_execution("processor", {"type": "data_processor"}):
                agent = Agent(name="processor", server_names=["api"])
                async with agent:
                    llm = await agent.attach_llm(OpenAIAugmentedLLM)
                    result = await llm.generate_str(f"Process: {data}")
                    
                    # Track LLM usage
                    self.metrics.track_llm_usage("openai-gpt-4", 150, 0.003)
                    
                    # Track business metrics
                    self.metrics.track_business_metrics("tasks_completed", 1)
                    self.metrics.track_business_metrics("data_processed", len(str(data)))
                    
                    return WorkflowResult(value={"processed": result})

Distributed Tracing

Advanced Tracing Patterns

Implement sophisticated tracing for complex workflows:
# tracing/advanced_tracing.py
from opentelemetry import trace, baggage
from opentelemetry.trace import Status, StatusCode
from typing import Dict, Any, Optional
import json
import asyncio
from contextlib import asynccontextmanager

class AdvancedTracer:
    """Advanced tracing utilities for agent workflows."""
    
    def __init__(self, tracer):
        self.tracer = tracer
    
    @asynccontextmanager
    async def trace_workflow_execution(
        self,
        workflow_name: str,
        workflow_id: str,
        input_data: Any,
        attributes: Dict[str, Any] = None
    ):
        """Comprehensive workflow tracing with correlation."""
        with self.tracer.start_as_current_span(
            f"workflow.{workflow_name}",
            kind=trace.SpanKind.SERVER
        ) as span:
            # Set standard workflow attributes
            span.set_attributes({
                "workflow.name": workflow_name,
                "workflow.id": workflow_id,
                "workflow.input.size": len(str(input_data)),
                "workflow.version": "1.0.0"
            })
            
            # Add custom attributes
            if attributes:
                span.set_attributes(attributes)
            
            # Set baggage for cross-service correlation
            ctx = baggage.set_baggage("workflow.id", workflow_id)
            ctx = baggage.set_baggage("workflow.name", workflow_name, ctx)
            
            try:
                # Create workflow context
                workflow_context = {
                    "workflow_id": workflow_id,
                    "span_context": span.get_span_context(),
                    "trace_id": format(span.get_span_context().trace_id, '032x')
                }
                
                yield workflow_context
                
                # Mark successful completion
                span.set_status(Status(StatusCode.OK))
                span.set_attribute("workflow.status", "completed")
                
            except Exception as e:
                # Mark error and add exception details
                span.record_exception(e)
                span.set_status(Status(StatusCode.ERROR, str(e)))
                span.set_attribute("workflow.status", "failed")
                span.set_attribute("workflow.error.type", type(e).__name__)
                raise
    
    @asynccontextmanager
    async def trace_agent_interaction(
        self,
        agent_name: str,
        operation: str,
        parent_context: Optional[Dict] = None
    ):
        """Trace agent interactions with detailed context."""
        span_name = f"agent.{agent_name}.{operation}"
        
        with self.tracer.start_as_current_span(
            span_name,
            kind=trace.SpanKind.INTERNAL
        ) as span:
            span.set_attributes({
                "agent.name": agent_name,
                "agent.operation": operation,
                "agent.type": "mcp_agent"
            })
            
            # Link to parent workflow if provided
            if parent_context:
                span.set_attribute("workflow.id", parent_context.get("workflow_id"))
            
            try:
                agent_context = {
                    "agent_name": agent_name,
                    "operation": operation,
                    "span_context": span.get_span_context(),
                    "parent_context": parent_context
                }
                
                yield agent_context
                
                span.set_status(Status(StatusCode.OK))
                
            except Exception as e:
                span.record_exception(e)
                span.set_status(Status(StatusCode.ERROR, str(e)))
                span.set_attribute("agent.error.type", type(e).__name__)
                raise
    
    @asynccontextmanager
    async def trace_external_call(
        self,
        service_name: str,
        operation: str,
        endpoint: str = None,
        request_data: Any = None
    ):
        """Trace external service calls."""
        with self.tracer.start_as_current_span(
            f"external.{service_name}.{operation}",
            kind=trace.SpanKind.CLIENT
        ) as span:
            span.set_attributes({
                "service.name": service_name,
                "service.operation": operation,
                "http.method": "POST",  # Assuming most agent calls are POST
                "external.call": True
            })
            
            if endpoint:
                span.set_attribute("http.url", endpoint)
            
            if request_data:
                span.set_attribute("request.size", len(str(request_data)))
            
            try:
                yield span
                span.set_status(Status(StatusCode.OK))
                
            except Exception as e:
                span.record_exception(e)
                span.set_status(Status(StatusCode.ERROR, str(e)))
                raise
    
    async def trace_parallel_execution(
        self,
        tasks: Dict[str, asyncio.Task],
        operation_name: str = "parallel_execution"
    ):
        """Trace parallel task execution with individual spans."""
        with self.tracer.start_as_current_span(f"parallel.{operation_name}") as parent_span:
            parent_span.set_attributes({
                "parallel.task_count": len(tasks),
                "parallel.operation": operation_name
            })
            
            # Create child spans for each task
            task_spans = {}
            for task_name, task in tasks.items():
                child_span = self.tracer.start_span(
                    f"parallel.task.{task_name}",
                    kind=trace.SpanKind.INTERNAL
                )
                child_span.set_attributes({
                    "task.name": task_name,
                    "task.parallel": True
                })
                task_spans[task_name] = child_span
            
            try:
                # Wait for all tasks to complete
                results = {}
                for task_name, task in tasks.items():
                    span = task_spans[task_name]
                    try:
                        result = await task
                        results[task_name] = result
                        span.set_status(Status(StatusCode.OK))
                        span.set_attribute("task.status", "completed")
                    except Exception as e:
                        span.record_exception(e)
                        span.set_status(Status(StatusCode.ERROR, str(e)))
                        span.set_attribute("task.status", "failed")
                        results[task_name] = {"error": str(e)}
                    finally:
                        span.end()
                
                # Update parent span
                successful_tasks = sum(1 for r in results.values() if "error" not in r)
                parent_span.set_attributes({
                    "parallel.successful_tasks": successful_tasks,
                    "parallel.failed_tasks": len(tasks) - successful_tasks
                })
                
                return results
                
            except Exception as e:
                parent_span.record_exception(e)
                parent_span.set_status(Status(StatusCode.ERROR, str(e)))
                # Close any remaining spans
                for span in task_spans.values():
                    if not span.is_recording():
                        continue
                    span.set_status(Status(StatusCode.ERROR, "Parent operation failed"))
                    span.end()
                raise

# Usage in workflow
@app.workflow
class TracedWorkflow(Workflow[dict]):
    def __init__(self):
        super().__init__()
        self.tracer = AdvancedTracer(get_tracer())
    
    @app.workflow_run
    async def run(self, data: dict) -> WorkflowResult[dict]:
        workflow_id = f"traced_workflow_{int(time.time())}"
        
        async with self.tracer.trace_workflow_execution(
            "traced_data_processing",
            workflow_id,
            data,
            {"user_id": data.get("user_id"), "priority": data.get("priority", "normal")}
        ) as workflow_ctx:
            
            # Sequential processing with tracing
            validation_result = await self.trace_validation_step(data, workflow_ctx)
            processing_result = await self.trace_processing_step(validation_result, workflow_ctx)
            
            # Parallel analysis with tracing
            analysis_tasks = {
                "sentiment": self.analyze_sentiment(processing_result, workflow_ctx),
                "entities": self.extract_entities(processing_result, workflow_ctx),
                "summary": self.generate_summary(processing_result, workflow_ctx)
            }
            
            parallel_results = await self.tracer.trace_parallel_execution(
                analysis_tasks,
                "data_analysis"
            )
            
            # Final synthesis
            final_result = await self.trace_synthesis_step(
                processing_result,
                parallel_results,
                workflow_ctx
            )
            
            return WorkflowResult(value=final_result)
    
    async def trace_validation_step(self, data: dict, workflow_ctx: dict):
        """Validation step with detailed tracing."""
        async with self.tracer.trace_agent_interaction(
            "validator",
            "validate_data",
            workflow_ctx
        ) as agent_ctx:
            
            agent = Agent(name="validator", server_names=["validation"])
            async with agent:
                llm = await agent.attach_llm(OpenAIAugmentedLLM)
                
                # Trace the LLM call
                async with self.tracer.trace_external_call(
                    "openai",
                    "generate",
                    "https://api.openai.com/v1/chat/completions",
                    {"data": data}
                ) as llm_span:
                    result = await llm.generate_str(f"Validate: {data}")
                    llm_span.set_attribute("llm.response_length", len(result))
                
                return {"validated_data": data, "validation_result": result}

Log Aggregation

Structured Logging Setup

Configure structured logging for comprehensive log aggregation:
# logging/structured_logging.py
import logging
import json
from datetime import datetime
from typing import Any, Dict
from opentelemetry.trace import get_current_span
from opentelemetry import baggage

class StructuredFormatter(logging.Formatter):
    """Custom formatter for structured JSON logs."""
    
    def format(self, record: logging.LogRecord) -> str:
        # Base log structure
        log_entry = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno
        }
        
        # Add trace context if available
        span = get_current_span()
        if span and span.is_recording():
            span_context = span.get_span_context()
            log_entry.update({
                "trace_id": format(span_context.trace_id, '032x'),
                "span_id": format(span_context.span_id, '016x')
            })
        
        # Add baggage context
        workflow_id = baggage.get_baggage("workflow.id")
        if workflow_id:
            log_entry["workflow_id"] = workflow_id
        
        workflow_name = baggage.get_baggage("workflow.name")
        if workflow_name:
            log_entry["workflow_name"] = workflow_name
        
        # Add custom fields from record
        if hasattr(record, "custom_fields"):
            log_entry.update(record.custom_fields)
        
        # Add exception information
        if record.exc_info:
            log_entry["exception"] = {
                "type": record.exc_info[0].__name__,
                "message": str(record.exc_info[1]),
                "traceback": self.formatException(record.exc_info)
            }
        
        return json.dumps(log_entry, ensure_ascii=False)

class AgentLogger:
    """Enhanced logger for agent workflows with context."""
    
    def __init__(self, name: str):
        self.logger = logging.getLogger(name)
        self.setup_handler()
    
    def setup_handler(self):
        """Setup structured logging handler."""
        if not self.logger.handlers:
            handler = logging.StreamHandler()
            formatter = StructuredFormatter()
            handler.setFormatter(formatter)
            self.logger.addHandler(handler)
            self.logger.setLevel(logging.INFO)
    
    def info(self, message: str, **kwargs):
        """Log info message with custom fields."""
        extra = {"custom_fields": kwargs} if kwargs else {}
        self.logger.info(message, extra=extra)
    
    def error(self, message: str, **kwargs):
        """Log error message with custom fields."""
        extra = {"custom_fields": kwargs} if kwargs else {}
        self.logger.error(message, extra=extra)
    
    def warning(self, message: str, **kwargs):
        """Log warning message with custom fields."""
        extra = {"custom_fields": kwargs} if kwargs else {}
        self.logger.warning(message, extra=extra)
    
    def debug(self, message: str, **kwargs):
        """Log debug message with custom fields."""
        extra = {"custom_fields": kwargs} if kwargs else {}
        self.logger.debug(message, extra=extra)
    
    def workflow_start(self, workflow_name: str, workflow_id: str, input_data: Any):
        """Log workflow start."""
        self.info(
            "Workflow started",
            workflow_name=workflow_name,
            workflow_id=workflow_id,
            input_size=len(str(input_data)),
            event_type="workflow_start"
        )
    
    def workflow_complete(self, workflow_name: str, workflow_id: str, duration: float, output_data: Any):
        """Log workflow completion."""
        self.info(
            "Workflow completed successfully",
            workflow_name=workflow_name,
            workflow_id=workflow_id,
            duration_seconds=duration,
            output_size=len(str(output_data)),
            event_type="workflow_complete"
        )
    
    def workflow_error(self, workflow_name: str, workflow_id: str, error: Exception, duration: float):
        """Log workflow error."""
        self.error(
            "Workflow failed",
            workflow_name=workflow_name,
            workflow_id=workflow_id,
            error_type=type(error).__name__,
            error_message=str(error),
            duration_seconds=duration,
            event_type="workflow_error",
            exc_info=True
        )
    
    def agent_interaction(self, agent_name: str, operation: str, duration: float, success: bool, **kwargs):
        """Log agent interaction."""
        level = self.info if success else self.error
        level(
            f"Agent {operation} {'completed' if success else 'failed'}",
            agent_name=agent_name,
            operation=operation,
            duration_seconds=duration,
            success=success,
            event_type="agent_interaction",
            **kwargs
        )
    
    def llm_usage(self, model: str, operation: str, tokens: int, cost: float, duration: float):
        """Log LLM usage."""
        self.info(
            "LLM request completed",
            model=model,
            operation=operation,
            tokens_used=tokens,
            cost_usd=cost,
            duration_seconds=duration,
            event_type="llm_usage"
        )
    
    def external_service_call(self, service: str, endpoint: str, method: str, status_code: int, duration: float):
        """Log external service calls."""
        level = self.info if 200 <= status_code < 400 else self.error
        level(
            f"External service call to {service}",
            service=service,
            endpoint=endpoint,
            method=method,
            status_code=status_code,
            duration_seconds=duration,
            event_type="external_service_call"
        )

# Usage in workflows
@app.workflow
class LoggedWorkflow(Workflow[dict]):
    def __init__(self):
        super().__init__()
        self.logger = AgentLogger(f"workflow.{self.__class__.__name__}")
    
    @app.workflow_run
    async def run(self, data: dict) -> WorkflowResult[dict]:
        workflow_id = f"logged_workflow_{int(time.time())}"
        start_time = time.time()
        
        self.logger.workflow_start("LoggedWorkflow", workflow_id, data)
        
        try:
            # Your workflow logic here
            result = await self.process_data(data)
            
            duration = time.time() - start_time
            self.logger.workflow_complete("LoggedWorkflow", workflow_id, duration, result)
            
            return WorkflowResult(value=result)
            
        except Exception as e:
            duration = time.time() - start_time
            self.logger.workflow_error("LoggedWorkflow", workflow_id, e, duration)
            raise
    
    async def process_data(self, data: dict) -> dict:
        """Process data with logging."""
        agent_start = time.time()
        
        try:
            agent = Agent(name="processor", server_names=["api"])
            async with agent:
                llm = await agent.attach_llm(OpenAIAugmentedLLM)
                
                llm_start = time.time()
                result = await llm.generate_str(f"Process: {data}")
                llm_duration = time.time() - llm_start
                
                # Log LLM usage
                self.logger.llm_usage(
                    model="openai-gpt-4",
                    operation="process_data",
                    tokens=150,  # Estimated
                    cost=0.003,
                    duration=llm_duration
                )
                
                agent_duration = time.time() - agent_start
                self.logger.agent_interaction(
                    agent_name="processor",
                    operation="process_data", 
                    duration=agent_duration,
                    success=True,
                    input_size=len(str(data)),
                    output_size=len(result)
                )
                
                return {"processed": result}
                
        except Exception as e:
            agent_duration = time.time() - agent_start
            self.logger.agent_interaction(
                agent_name="processor",
                operation="process_data",
                duration=agent_duration,
                success=False,
                error=str(e)
            )
            raise

Dashboard Setup

Grafana Dashboard Configuration

Create comprehensive Grafana dashboards for monitoring:
{
  "dashboard": {
    "title": "MCP Agent Workflows Dashboard",
    "tags": ["mcp-agent", "workflows", "observability"],
    "time": {
      "from": "now-1h",
      "to": "now"
    },
    "panels": [
      {
        "title": "Workflow Execution Rate",
        "type": "stat",
        "targets": [
          {
            "expr": "rate(agent_workflow_executions_total[5m])",
            "legendFormat": "{{workflow}}"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "unit": "ops",
            "thresholds": {
              "steps": [
                {"color": "green", "value": 0},
                {"color": "yellow", "value": 10},
                {"color": "red", "value": 50}
              ]
            }
          }
        }
      },
      {
        "title": "Workflow Success Rate",
        "type": "stat",
        "targets": [
          {
            "expr": "rate(agent_workflow_executions_total{status=\"success\"}[5m]) / rate(agent_workflow_executions_total[5m]) * 100",
            "legendFormat": "Success Rate"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "unit": "percent",
            "min": 0,
            "max": 100,
            "thresholds": {
              "steps": [
                {"color": "red", "value": 0},
                {"color": "yellow", "value": 90},
                {"color": "green", "value": 95}
              ]
            }
          }
        }
      },
      {
        "title": "Workflow Duration",
        "type": "graph",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, rate(agent_workflow_duration_seconds_bucket[5m]))",
            "legendFormat": "95th percentile"
          },
          {
            "expr": "histogram_quantile(0.50, rate(agent_workflow_duration_seconds_bucket[5m]))", 
            "legendFormat": "50th percentile"
          }
        ],
        "yAxes": [
          {
            "unit": "s",
            "min": 0
          }
        ]
      },
      {
        "title": "Active Agents",
        "type": "graph",
        "targets": [
          {
            "expr": "agent_active_count",
            "legendFormat": "{{agent}}"
          }
        ],
        "yAxes": [
          {
            "unit": "short",
            "min": 0
          }
        ]
      },
      {
        "title": "LLM Token Usage",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(llm_tokens_consumed_total[5m])",
            "legendFormat": "{{model}} - {{operation}}"
          }
        ],
        "yAxes": [
          {
            "unit": "short",
            "min": 0
          }
        ]
      },
      {
        "title": "LLM Costs",
        "type": "stat",
        "targets": [
          {
            "expr": "increase(llm_cost_total[1h])",
            "legendFormat": "Hourly Cost"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "unit": "currencyUSD",
            "thresholds": {
              "steps": [
                {"color": "green", "value": 0},
                {"color": "yellow", "value": 10},
                {"color": "red", "value": 50}
              ]
            }
          }
        }
      },
      {
        "title": "Error Rate by Type",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(agent_errors_total[5m])",
            "legendFormat": "{{error_type}}"
          }
        ],
        "yAxes": [
          {
            "unit": "ops",
            "min": 0
          }
        ]
      },
      {
        "title": "Memory Usage",
        "type": "graph", 
        "targets": [
          {
            "expr": "agent_memory_usage_bytes",
            "legendFormat": "{{instance}}"
          }
        ],
        "yAxes": [
          {
            "unit": "bytes",
            "min": 0
          }
        ]
      }
    ]
  }
}

Kubernetes Monitoring Setup

Deploy monitoring stack in Kubernetes:
# monitoring/prometheus-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: prometheus-config
data:
  prometheus.yml: |
    global:
      scrape_interval: 15s
      evaluation_interval: 15s
    
    rule_files:
      - "agent_alerts.yml"
    
    scrape_configs:
      - job_name: 'mcp-agent-workflows'
        static_configs:
          - targets: ['mcp-agent-service:8000']
        metrics_path: /metrics
        scrape_interval: 5s
        
      - job_name: 'mcp-agent-workers'
        kubernetes_sd_configs:
          - role: pod
        relabel_configs:
          - source_labels: [__meta_kubernetes_pod_label_app]
            action: keep
            regex: mcp-agent-worker
          - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
            action: keep
            regex: true
          - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_port]
            action: replace
            target_label: __address__
            regex: ([^:]+)(?::\d+)?;(\d+)
            replacement: $1:$2

    alerting:
      alertmanagers:
        - static_configs:
            - targets:
              - alertmanager:9093

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: prometheus-alerts
data:
  agent_alerts.yml: |
    groups:
    - name: mcp_agent_alerts
      rules:
      - alert: WorkflowHighErrorRate
        expr: rate(agent_workflow_executions_total{status="error"}[5m]) / rate(agent_workflow_executions_total[5m]) > 0.1
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "High workflow error rate detected"
          description: "Error rate is {{ $value | humanizePercentage }} for workflow {{ $labels.workflow }}"
      
      - alert: WorkflowHighLatency
        expr: histogram_quantile(0.95, rate(agent_workflow_duration_seconds_bucket[5m])) > 60
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High workflow latency detected"
          description: "95th percentile latency is {{ $value }}s for workflow {{ $labels.workflow }}"
      
      - alert: LLMCostSpike
        expr: increase(llm_cost_total[1h]) > 100
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "LLM cost spike detected"
          description: "LLM costs have increased by ${{ $value }} in the last hour"
      
      - alert: AgentMemoryUsageHigh
        expr: agent_memory_usage_bytes > 1000000000  # 1GB
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High memory usage by agent"
          description: "Agent {{ $labels.agent }} is using {{ $value | humanizeBytes }} of memory"

Alert Configuration

Intelligent Alerting Rules

Set up smart alerts that reduce noise and focus on actionable issues:
# alerting/alertmanager-config.yml
global:
  smtp_smarthost: 'smtp.gmail.com:587'
  smtp_from: 'alerts@yourcompany.com'
  slack_api_url: 'YOUR_SLACK_WEBHOOK_URL'

route:
  group_by: ['alertname', 'severity']
  group_wait: 30s
  group_interval: 5m
  repeat_interval: 12h
  receiver: 'default'
  routes:
  - match:
      severity: critical
    receiver: 'critical-alerts'
    group_wait: 10s
    group_interval: 1m
    repeat_interval: 1h
  - match:
      alertname: 'WorkflowHighErrorRate'
    receiver: 'workflow-alerts'
  - match:
      alertname: 'LLMCostSpike'
    receiver: 'cost-alerts'

receivers:
- name: 'default'
  slack_configs:
  - channel: '#alerts'
    title: 'MCP Agent Alert'
    text: '{{ range .Alerts }}{{ .Annotations.summary }}{{ end }}'

- name: 'critical-alerts'
  slack_configs:
  - channel: '#critical-alerts'
    title: '=� CRITICAL: MCP Agent Alert'
    text: |
      {{ range .Alerts }}
      *Alert:* {{ .Annotations.summary }}
      *Description:* {{ .Annotations.description }}
      *Severity:* {{ .Labels.severity }}
      *Time:* {{ .StartsAt.Format "2006-01-02 15:04:05" }}
      {{ end }}
  email_configs:
  - to: 'oncall@yourcompany.com'
    subject: 'CRITICAL: MCP Agent Alert'
    body: |
      {{ range .Alerts }}
      Alert: {{ .Annotations.summary }}
      Description: {{ .Annotations.description }}
      Severity: {{ .Labels.severity }}
      Time: {{ .StartsAt.Format "2006-01-02 15:04:05" }}
      {{ end }}

- name: 'workflow-alerts'
  slack_configs:
  - channel: '#workflow-monitoring'
    title: 'Workflow Alert'
    text: |
      {{ range .Alerts }}
      Workflow {{ .Labels.workflow }} is experiencing issues:
      {{ .Annotations.description }}
      {{ end }}

- name: 'cost-alerts'
  slack_configs:
  - channel: '#cost-monitoring'
    title: '=� LLM Cost Alert'
    text: |
      {{ range .Alerts }}
      {{ .Annotations.summary }}
      Current hourly cost trend: {{ .Annotations.description }}
      {{ end }}

inhibit_rules:
- source_match:
    severity: 'critical'
  target_match:
    severity: 'warning'
  equal: ['alertname', 'workflow']

Custom Alert Rules

Define domain-specific alert rules:
# alerting/custom_alerts.py
from typing import Dict, List, Callable
import asyncio
import time
from dataclasses import dataclass
from enum import Enum

class AlertSeverity(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"

@dataclass
class Alert:
    name: str
    severity: AlertSeverity
    message: str
    labels: Dict[str, str]
    timestamp: float
    resolved: bool = False

class AlertManager:
    """Custom alert manager for agent workflows."""
    
    def __init__(self):
        self.active_alerts: Dict[str, Alert] = {}
        self.alert_handlers: Dict[str, Callable] = {}
        self.metrics_cache: Dict[str, float] = {}
        
    def register_handler(self, alert_name: str, handler: Callable):
        """Register custom handler for specific alerts."""
        self.alert_handlers[alert_name] = handler
    
    async def check_workflow_health(self, metrics: Dict[str, float]):
        """Check workflow health metrics and trigger alerts."""
        self.metrics_cache.update(metrics)
        
        # Check error rate
        error_rate = metrics.get("workflow_error_rate", 0)
        if error_rate > 0.1:  # 10% error rate
            await self.trigger_alert(
                "workflow_high_error_rate",
                AlertSeverity.WARNING,
                f"Workflow error rate is {error_rate:.2%}",
                {"error_rate": str(error_rate)}
            )
        elif error_rate > 0.25:  # 25% error rate
            await self.trigger_alert(
                "workflow_critical_error_rate",
                AlertSeverity.CRITICAL,
                f"Critical workflow error rate: {error_rate:.2%}",
                {"error_rate": str(error_rate)}
            )
        else:
            await self.resolve_alert("workflow_high_error_rate")
            await self.resolve_alert("workflow_critical_error_rate")
        
        # Check latency
        p95_latency = metrics.get("workflow_p95_latency", 0)
        if p95_latency > 300:  # 5 minutes
            await self.trigger_alert(
                "workflow_high_latency",
                AlertSeverity.WARNING,
                f"High workflow latency: {p95_latency}s (95th percentile)",
                {"latency": str(p95_latency)}
            )
        else:
            await self.resolve_alert("workflow_high_latency")
        
        # Check LLM costs
        hourly_cost = metrics.get("llm_hourly_cost", 0)
        if hourly_cost > 50:  # $50/hour
            await self.trigger_alert(
                "llm_cost_spike",
                AlertSeverity.CRITICAL,
                f"LLM costs spiking: ${hourly_cost:.2f}/hour",
                {"cost": str(hourly_cost)}
            )
        elif hourly_cost > 20:  # $20/hour
            await self.trigger_alert(
                "llm_cost_high",
                AlertSeverity.WARNING,
                f"Elevated LLM costs: ${hourly_cost:.2f}/hour",
                {"cost": str(hourly_cost)}
            )
        else:
            await self.resolve_alert("llm_cost_high")
            await self.resolve_alert("llm_cost_spike")
        
        # Check memory usage
        memory_usage = metrics.get("memory_usage_gb", 0)
        if memory_usage > 8:  # 8GB
            await self.trigger_alert(
                "high_memory_usage",
                AlertSeverity.WARNING,
                f"High memory usage: {memory_usage:.1f}GB",
                {"memory_gb": str(memory_usage)}
            )
        else:
            await self.resolve_alert("high_memory_usage")
    
    async def trigger_alert(self, name: str, severity: AlertSeverity, message: str, labels: Dict[str, str]):
        """Trigger an alert."""
        alert_key = f"{name}_{hash(str(labels))}"
        
        if alert_key not in self.active_alerts:
            alert = Alert(
                name=name,
                severity=severity,
                message=message,
                labels=labels,
                timestamp=time.time()
            )
            
            self.active_alerts[alert_key] = alert
            
            # Execute custom handler if registered
            handler = self.alert_handlers.get(name)
            if handler:
                await handler(alert)
            
            print(f"=� ALERT: {alert.severity.value.upper()} - {alert.message}")
    
    async def resolve_alert(self, name: str):
        """Resolve alerts by name."""
        resolved_alerts = []
        for key, alert in self.active_alerts.items():
            if alert.name == name and not alert.resolved:
                alert.resolved = True
                alert.timestamp = time.time()
                resolved_alerts.append(key)
                print(f" RESOLVED: {alert.message}")
        
        # Remove resolved alerts
        for key in resolved_alerts:
            del self.active_alerts[key]
    
    def get_active_alerts(self) -> List[Alert]:
        """Get all active alerts."""
        return [alert for alert in self.active_alerts.values() if not alert.resolved]

# Usage example
async def setup_custom_alerting():
    alert_manager = AlertManager()
    
    # Register custom handlers
    async def handle_cost_spike(alert: Alert):
        # Custom logic for cost spike alerts
        cost = float(alert.labels.get("cost", 0))
        if cost > 100:  # $100/hour
            # Emergency actions
            await emergency_cost_controls()
        
        # Send to cost monitoring channel
        await send_slack_alert("#cost-monitoring", alert)
    
    async def handle_critical_errors(alert: Alert):
        # Auto-restart failed workflows
        await restart_failed_workflows()
        
        # Page on-call engineer
        await page_oncall_engineer(alert)
    
    alert_manager.register_handler("llm_cost_spike", handle_cost_spike)
    alert_manager.register_handler("workflow_critical_error_rate", handle_critical_errors)
    
    # Run monitoring loop
    while True:
        # Collect metrics from your monitoring system
        metrics = await collect_workflow_metrics()
        await alert_manager.check_workflow_health(metrics)
        
        # Check every 30 seconds
        await asyncio.sleep(30)

async def collect_workflow_metrics() -> Dict[str, float]:
    """Collect metrics from Prometheus or other monitoring system."""
    # This would typically query your metrics store
    return {
        "workflow_error_rate": 0.05,  # 5%
        "workflow_p95_latency": 45,   # 45 seconds
        "llm_hourly_cost": 25.50,     # $25.50/hour
        "memory_usage_gb": 6.2        # 6.2GB
    }

Performance Monitoring

Comprehensive Performance Tracking

Monitor performance across all workflow components:
# monitoring/performance_monitor.py
import asyncio
import time
import psutil
import resource
from typing import Dict, Any, List
from dataclasses import dataclass, asdict
from contextlib import asynccontextmanager

@dataclass
class PerformanceMetrics:
    timestamp: float
    cpu_percent: float
    memory_mb: float
    memory_percent: float
    disk_io_read_mb: float
    disk_io_write_mb: float
    network_io_sent_mb: float
    network_io_recv_mb: float
    active_threads: int
    open_files: int
    workflow_queue_size: int
    agent_pool_size: int
    avg_response_time_ms: float
    p95_response_time_ms: float
    requests_per_second: float
    error_rate: float

class PerformanceMonitor:
    """Comprehensive performance monitoring for agent workflows."""
    
    def __init__(self, collection_interval: float = 10.0):
        self.collection_interval = collection_interval
        self.metrics_history: List[PerformanceMetrics] = []
        self.max_history_size = 1000
        self.response_times: List[float] = []
        self.request_count = 0
        self.error_count = 0
        self.start_time = time.time()
        self.running = False
        
        # System baseline
        self.baseline_metrics = None
    
    async def start_monitoring(self):
        """Start continuous performance monitoring."""
        self.running = True
        self.baseline_metrics = await self.collect_system_metrics()
        
        while self.running:
            try:
                metrics = await self.collect_comprehensive_metrics()
                self.metrics_history.append(metrics)
                
                # Trim history if needed
                if len(self.metrics_history) > self.max_history_size:
                    self.metrics_history = self.metrics_history[-self.max_history_size:]
                
                # Check for performance anomalies
                await self.check_performance_anomalies(metrics)
                
                await asyncio.sleep(self.collection_interval)
                
            except Exception as e:
                print(f"Error in performance monitoring: {e}")
                await asyncio.sleep(self.collection_interval)
    
    async def stop_monitoring(self):
        """Stop performance monitoring."""
        self.running = False
    
    async def collect_comprehensive_metrics(self) -> PerformanceMetrics:
        """Collect comprehensive performance metrics."""
        # System metrics
        cpu_percent = psutil.cpu_percent(interval=1)
        memory = psutil.virtual_memory()
        disk_io = psutil.disk_io_counters()
        network_io = psutil.net_io_counters()
        
        # Process metrics
        process = psutil.Process()
        process_memory = process.memory_info().rss / 1024 / 1024  # MB
        open_files = len(process.open_files())
        
        # Application metrics
        current_time = time.time()
        uptime = current_time - self.start_time
        
        # Calculate RPS
        requests_per_second = self.request_count / uptime if uptime > 0 else 0
        
        # Calculate error rate
        error_rate = self.error_count / max(self.request_count, 1)
        
        # Response time percentiles
        avg_response_time = sum(self.response_times[-100:]) / len(self.response_times[-100:]) if self.response_times else 0
        p95_response_time = self.calculate_percentile(self.response_times[-100:], 95) if self.response_times else 0
        
        return PerformanceMetrics(
            timestamp=current_time,
            cpu_percent=cpu_percent,
            memory_mb=memory.used / 1024 / 1024,
            memory_percent=memory.percent,
            disk_io_read_mb=disk_io.read_bytes / 1024 / 1024 if disk_io else 0,
            disk_io_write_mb=disk_io.write_bytes / 1024 / 1024 if disk_io else 0,
            network_io_sent_mb=network_io.bytes_sent / 1024 / 1024 if network_io else 0,
            network_io_recv_mb=network_io.bytes_recv / 1024 / 1024 if network_io else 0,
            active_threads=process.num_threads(),
            open_files=open_files,
            workflow_queue_size=await self.get_workflow_queue_size(),
            agent_pool_size=await self.get_agent_pool_size(),
            avg_response_time_ms=avg_response_time * 1000,
            p95_response_time_ms=p95_response_time * 1000,
            requests_per_second=requests_per_second,
            error_rate=error_rate
        )
    
    async def collect_system_metrics(self) -> Dict[str, Any]:
        """Collect baseline system metrics."""
        return {
            "cpu_count": psutil.cpu_count(),
            "memory_total_gb": psutil.virtual_memory().total / 1024 / 1024 / 1024,
            "disk_total_gb": psutil.disk_usage('/').total / 1024 / 1024 / 1024,
            "platform": psutil.platform
        }
    
    @asynccontextmanager
    async def track_request(self):
        """Context manager to track request performance."""
        start_time = time.time()
        success = True
        
        try:
            yield
        except Exception as e:
            success = False
            self.error_count += 1
            raise
        finally:
            duration = time.time() - start_time
            self.response_times.append(duration)
            self.request_count += 1
            
            # Trim response times history
            if len(self.response_times) > 1000:
                self.response_times = self.response_times[-1000:]
    
    async def check_performance_anomalies(self, metrics: PerformanceMetrics):
        """Check for performance anomalies and alert if necessary."""
        # CPU usage anomaly
        if metrics.cpu_percent > 80:
            await self.trigger_performance_alert(
                "high_cpu_usage",
                f"High CPU usage: {metrics.cpu_percent:.1f}%",
                metrics
            )
        
        # Memory usage anomaly
        if metrics.memory_percent > 85:
            await self.trigger_performance_alert(
                "high_memory_usage",
                f"High memory usage: {metrics.memory_percent:.1f}%",
                metrics
            )
        
        # Response time anomaly
        if metrics.p95_response_time_ms > 5000:  # 5 seconds
            await self.trigger_performance_alert(
                "high_response_time",
                f"High response time: {metrics.p95_response_time_ms:.0f}ms (95th percentile)",
                metrics
            )
        
        # Error rate anomaly
        if metrics.error_rate > 0.05:  # 5% error rate
            await self.trigger_performance_alert(
                "high_error_rate",
                f"High error rate: {metrics.error_rate:.2%}",
                metrics
            )
        
        # Queue backup anomaly
        if metrics.workflow_queue_size > 100:
            await self.trigger_performance_alert(
                "workflow_queue_backup",
                f"Workflow queue backup: {metrics.workflow_queue_size} pending workflows",
                metrics
            )
    
    async def trigger_performance_alert(self, alert_type: str, message: str, metrics: PerformanceMetrics):
        """Trigger performance alert."""
        print(f"=% PERFORMANCE ALERT [{alert_type}]: {message}")
        
        # Here you would integrate with your alerting system
        # await send_slack_alert(f"#performance-alerts", {
        #     "alert_type": alert_type,
        #     "message": message,
        #     "metrics": asdict(metrics)
        # })
    
    def calculate_percentile(self, values: List[float], percentile: float) -> float:
        """Calculate percentile from list of values."""
        if not values:
            return 0
        
        sorted_values = sorted(values)
        index = int((percentile / 100.0) * len(sorted_values))
        return sorted_values[min(index, len(sorted_values) - 1)]
    
    async def get_workflow_queue_size(self) -> int:
        """Get current workflow queue size."""
        # This would integrate with your workflow queue system
        return 0
    
    async def get_agent_pool_size(self) -> int:
        """Get current agent pool size."""
        # This would integrate with your agent pool system
        return 0
    
    def get_performance_summary(self, duration_minutes: int = 60) -> Dict[str, Any]:
        """Get performance summary for the last N minutes."""
        cutoff_time = time.time() - (duration_minutes * 60)
        recent_metrics = [m for m in self.metrics_history if m.timestamp > cutoff_time]
        
        if not recent_metrics:
            return {"error": "No metrics available for the specified duration"}
        
        return {
            "duration_minutes": duration_minutes,
            "metrics_count": len(recent_metrics),
            "avg_cpu_percent": sum(m.cpu_percent for m in recent_metrics) / len(recent_metrics),
            "max_cpu_percent": max(m.cpu_percent for m in recent_metrics),
            "avg_memory_percent": sum(m.memory_percent for m in recent_metrics) / len(recent_metrics),
            "max_memory_percent": max(m.memory_percent for m in recent_metrics),
            "avg_response_time_ms": sum(m.avg_response_time_ms for m in recent_metrics) / len(recent_metrics),
            "max_response_time_ms": max(m.p95_response_time_ms for m in recent_metrics),
            "total_requests": sum(m.requests_per_second for m in recent_metrics) * duration_minutes * 60,
            "avg_error_rate": sum(m.error_rate for m in recent_metrics) / len(recent_metrics),
            "max_queue_size": max(m.workflow_queue_size for m in recent_metrics)
        }

# Usage in workflow
@app.workflow  
class PerformanceMonitoredWorkflow(Workflow[dict]):
    def __init__(self):
        super().__init__()
        self.perf_monitor = PerformanceMonitor()
    
    @app.workflow_run
    async def run(self, data: dict) -> WorkflowResult[dict]:
        # Start performance monitoring
        monitor_task = asyncio.create_task(self.perf_monitor.start_monitoring())
        
        try:
            # Track this workflow execution
            async with self.perf_monitor.track_request():
                result = await self.process_with_performance_tracking(data)
                return WorkflowResult(value=result)
                
        finally:
            await self.perf_monitor.stop_monitoring()
            monitor_task.cancel()
    
    async def process_with_performance_tracking(self, data: dict) -> dict:
        """Process data with performance tracking."""
        # Your workflow logic here with performance monitoring
        agent = Agent(name="processor", server_names=["api"])
        
        async with agent:
            llm = await agent.attach_llm(OpenAIAugmentedLLM)
            result = await llm.generate_str(f"Process: {data}")
            
            return {"processed": result}

Next Steps