Set up production-grade observability for your agent workflows with OpenTelemetry, metrics collection, distributed tracing, and intelligent alerting systems.
Observability Overview
Production agent workflows require comprehensive monitoring to ensure reliability, performance, and troubleshooting capabilities:Metrics Collection
Track performance, throughput, and system health metrics
Distributed Tracing
Follow requests across agents, workflows, and external services
Structured Logging
Centralized, searchable logs with contextual information
Alerting
Proactive notifications for issues and anomalies
OpenTelemetry Configuration
Core Setup
Configure OpenTelemetry for comprehensive observability:Copy
Ask AI
# observability/telemetry.py
import asyncio
from opentelemetry import trace, metrics
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from opentelemetry.instrumentation.asyncio import AsyncioInstrumentor
from opentelemetry.instrumentation.logging import LoggingInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.resources import Resource
from prometheus_client import start_http_server
class ObservabilityManager:
"""Manages observability configuration for MCP Agent workflows."""
def __init__(self, service_name: str, service_version: str = "1.0.0"):
self.service_name = service_name
self.service_version = service_version
self.resource = Resource.create({
"service.name": service_name,
"service.version": service_version,
"telemetry.sdk.name": "opentelemetry",
"telemetry.sdk.language": "python",
})
self.tracer_provider = None
self.meter_provider = None
self.tracer = None
self.meter = None
async def initialize(self, config: dict):
"""Initialize all observability components."""
await self.setup_tracing(config.get("tracing", {}))
await self.setup_metrics(config.get("metrics", {}))
await self.setup_logging(config.get("logging", {}))
await self.instrument_libraries()
print(f" Observability initialized for {self.service_name}")
async def setup_tracing(self, tracing_config: dict):
"""Configure distributed tracing."""
# Create tracer provider
self.tracer_provider = TracerProvider(resource=self.resource)
trace.set_tracer_provider(self.tracer_provider)
# Configure Jaeger exporter
jaeger_exporter = JaegerExporter(
agent_host_name=tracing_config.get("jaeger_host", "localhost"),
agent_port=tracing_config.get("jaeger_port", 6831),
collector_endpoint=tracing_config.get("jaeger_endpoint")
)
# Add span processor
span_processor = BatchSpanProcessor(jaeger_exporter)
self.tracer_provider.add_span_processor(span_processor)
# Get tracer instance
self.tracer = trace.get_tracer(self.service_name)
async def setup_metrics(self, metrics_config: dict):
"""Configure metrics collection."""
# Start Prometheus metrics server
prometheus_port = metrics_config.get("prometheus_port", 8000)
start_http_server(prometheus_port)
# Create metric reader
metric_reader = PrometheusMetricReader()
# Create meter provider
self.meter_provider = MeterProvider(
resource=self.resource,
metric_readers=[metric_reader]
)
metrics.set_meter_provider(self.meter_provider)
# Get meter instance
self.meter = metrics.get_meter(self.service_name)
async def setup_logging(self, logging_config: dict):
"""Configure structured logging."""
LoggingInstrumentor().instrument(
set_logging_format=True,
log_correlation=True
)
async def instrument_libraries(self):
"""Instrument common libraries."""
AsyncioInstrumentor().instrument()
# Add more instrumentations as needed
# HTTPXInstrumentor().instrument()
# SQLAlchemyInstrumentor().instrument()
# Global observability manager
observability_manager = None
async def initialize_observability(config: dict):
"""Initialize global observability."""
global observability_manager
observability_manager = ObservabilityManager(
service_name=config.get("service_name", "mcp-agent"),
service_version=config.get("service_version", "1.0.0")
)
await observability_manager.initialize(config)
def get_tracer():
"""Get the global tracer instance."""
return observability_manager.tracer if observability_manager else None
def get_meter():
"""Get the global meter instance."""
return observability_manager.meter if observability_manager else None
MCP Agent Integration
Integrate observability into your MCP Agent workflows:Copy
Ask AI
# workflows/observable_workflow.py
from mcp_agent.app import MCPApp
from mcp_agent.workflows import Workflow, WorkflowResult
from mcp_agent.agents.agent import Agent
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM
from opentelemetry import trace, metrics
from observability.telemetry import get_tracer, get_meter
import time
import logging
logger = logging.getLogger(__name__)
app = MCPApp(name="observable_agent")
class ObservableWorkflow(Workflow[dict]):
"""Base workflow class with built-in observability."""
def __init__(self):
super().__init__()
self.tracer = get_tracer()
self.meter = get_meter()
# Create metrics
if self.meter:
self.workflow_duration = self.meter.create_histogram(
"workflow_duration_seconds",
description="Duration of workflow execution",
unit="s"
)
self.workflow_counter = self.meter.create_counter(
"workflow_executions_total",
description="Total number of workflow executions"
)
self.agent_calls = self.meter.create_counter(
"agent_calls_total",
description="Total number of agent calls"
)
self.llm_tokens = self.meter.create_histogram(
"llm_tokens_used",
description="Number of LLM tokens used",
unit="tokens"
)
@app.workflow
class DataProcessingWorkflow(ObservableWorkflow):
"""Observable data processing workflow with comprehensive tracking."""
@app.workflow_run
async def run(self, input_data: dict) -> WorkflowResult[dict]:
workflow_start = time.time()
with self.tracer.start_as_current_span("data_processing_workflow") as span:
# Add workflow attributes to span
span.set_attributes({
"workflow.name": "DataProcessingWorkflow",
"workflow.input_size": len(str(input_data)),
"workflow.version": "1.0.0"
})
try:
# Track workflow execution
self.workflow_counter.add(1, {"workflow": "data_processing"})
# Step 1: Data validation with tracing
validation_result = await self.trace_step(
"data_validation",
self.validate_data,
input_data
)
# Step 2: Data processing with tracing
processing_result = await self.trace_step(
"data_processing",
self.process_data,
validation_result
)
# Step 3: Result synthesis with tracing
final_result = await self.trace_step(
"result_synthesis",
self.synthesize_results,
processing_result
)
# Record successful completion
workflow_duration = time.time() - workflow_start
self.workflow_duration.record(workflow_duration, {
"workflow": "data_processing",
"status": "success"
})
span.set_attribute("workflow.status", "success")
span.set_attribute("workflow.duration", workflow_duration)
logger.info(
"Workflow completed successfully",
extra={
"workflow": "data_processing",
"duration": workflow_duration,
"input_size": len(str(input_data)),
"output_size": len(str(final_result))
}
)
return WorkflowResult(value=final_result)
except Exception as e:
# Record failure
workflow_duration = time.time() - workflow_start
self.workflow_duration.record(workflow_duration, {
"workflow": "data_processing",
"status": "error"
})
span.set_attribute("workflow.status", "error")
span.set_attribute("error.message", str(e))
span.record_exception(e)
logger.error(
"Workflow failed",
extra={
"workflow": "data_processing",
"error": str(e),
"duration": workflow_duration
},
exc_info=True
)
return WorkflowResult(value=None, error=str(e))
async def trace_step(self, step_name: str, step_function, data):
"""Execute a workflow step with tracing."""
with self.tracer.start_as_current_span(f"step_{step_name}") as span:
step_start = time.time()
span.set_attributes({
"step.name": step_name,
"step.input_size": len(str(data))
})
try:
result = await step_function(data)
step_duration = time.time() - step_start
span.set_attribute("step.status", "success")
span.set_attribute("step.duration", step_duration)
span.set_attribute("step.output_size", len(str(result)))
return result
except Exception as e:
span.set_attribute("step.status", "error")
span.record_exception(e)
raise
async def validate_data(self, data: dict) -> dict:
"""Data validation step with agent observability."""
validator_agent = Agent(
name="data_validator",
instruction="Validate data quality and format.",
server_names=["validation_service"]
)
with self.tracer.start_as_current_span("agent_validation") as span:
# Track agent usage
self.agent_calls.add(1, {"agent": "data_validator", "step": "validation"})
span.set_attributes({
"agent.name": "data_validator",
"agent.instruction": validator_agent.instruction,
"agent.servers": str(validator_agent.server_names)
})
async with validator_agent:
llm = await validator_agent.attach_llm(OpenAIAugmentedLLM)
# Track LLM usage
with self.tracer.start_as_current_span("llm_validation") as llm_span:
validation_result = await llm.generate_str(
f"Validate this data for quality and format: {data}"
)
# Record LLM token usage (approximate)
estimated_tokens = len(str(data)) // 4 + len(validation_result) // 4
self.llm_tokens.record(estimated_tokens, {
"model": "openai",
"operation": "validation"
})
llm_span.set_attributes({
"llm.model": "openai",
"llm.operation": "validation",
"llm.estimated_tokens": estimated_tokens
})
span.set_attribute("validation.result_size", len(validation_result))
return {
"original_data": data,
"validation_result": validation_result,
"is_valid": "valid" in validation_result.lower()
}
async def process_data(self, validation_data: dict) -> dict:
"""Data processing step with detailed tracing."""
if not validation_data["is_valid"]:
raise ValueError("Data validation failed")
processor_agent = Agent(
name="data_processor",
instruction="Process and enrich validated data.",
server_names=["processing_service", "ml_service"]
)
with self.tracer.start_as_current_span("agent_processing") as span:
self.agent_calls.add(1, {"agent": "data_processor", "step": "processing"})
span.set_attributes({
"agent.name": "data_processor",
"processing.input_valid": validation_data["is_valid"]
})
async with processor_agent:
llm = await processor_agent.attach_llm(OpenAIAugmentedLLM)
with self.tracer.start_as_current_span("llm_processing") as llm_span:
processed_result = await llm.generate_str(
f"Process and enrich this validated data: {validation_data['original_data']}"
)
# Track LLM usage
estimated_tokens = len(str(validation_data)) // 4 + len(processed_result) // 4
self.llm_tokens.record(estimated_tokens, {
"model": "openai",
"operation": "processing"
})
llm_span.set_attributes({
"llm.model": "openai",
"llm.operation": "processing",
"llm.estimated_tokens": estimated_tokens
})
return {
"validation_data": validation_data,
"processed_result": processed_result
}
async def synthesize_results(self, processing_data: dict) -> dict:
"""Final synthesis step."""
synthesizer_agent = Agent(
name="result_synthesizer",
instruction="Synthesize final results from processed data.",
server_names=["synthesis_service"]
)
with self.tracer.start_as_current_span("agent_synthesis") as span:
self.agent_calls.add(1, {"agent": "result_synthesizer", "step": "synthesis"})
async with synthesizer_agent:
llm = await synthesizer_agent.attach_llm(OpenAIAugmentedLLM)
with self.tracer.start_as_current_span("llm_synthesis"):
synthesis = await llm.generate_str(
f"Synthesize final comprehensive results: {processing_data}"
)
# Track final LLM usage
estimated_tokens = len(str(processing_data)) // 4 + len(synthesis) // 4
self.llm_tokens.record(estimated_tokens, {
"model": "openai",
"operation": "synthesis"
})
return {
"processing_data": processing_data,
"final_synthesis": synthesis,
"completion_timestamp": time.time()
}
Metrics Collection
Custom Metrics for Agent Workflows
Define domain-specific metrics for agent workflows:Copy
Ask AI
# metrics/agent_metrics.py
from opentelemetry import metrics
from typing import Dict, Any
import time
from contextlib import asynccontextmanager
class AgentMetrics:
"""Custom metrics collection for agent workflows."""
def __init__(self, meter):
self.meter = meter
# Workflow metrics
self.workflow_executions = meter.create_counter(
"agent_workflow_executions_total",
description="Total number of workflow executions"
)
self.workflow_duration = meter.create_histogram(
"agent_workflow_duration_seconds",
description="Duration of workflow executions"
)
self.workflow_success_rate = meter.create_up_down_counter(
"agent_workflow_success_rate",
description="Success rate of workflow executions"
)
# Agent metrics
self.agent_creations = meter.create_counter(
"agent_creations_total",
description="Total number of agent creations"
)
self.agent_active_count = meter.create_up_down_counter(
"agent_active_count",
description="Number of currently active agents"
)
self.agent_execution_duration = meter.create_histogram(
"agent_execution_duration_seconds",
description="Duration of agent executions"
)
# LLM metrics
self.llm_requests = meter.create_counter(
"llm_requests_total",
description="Total number of LLM requests"
)
self.llm_tokens_consumed = meter.create_counter(
"llm_tokens_consumed_total",
description="Total number of LLM tokens consumed"
)
self.llm_cost = meter.create_counter(
"llm_cost_total",
description="Total LLM usage cost",
unit="USD"
)
# System metrics
self.memory_usage = meter.create_up_down_counter(
"agent_memory_usage_bytes",
description="Memory usage by agent processes"
)
self.error_count = meter.create_counter(
"agent_errors_total",
description="Total number of agent errors"
)
# Business metrics
self.tasks_completed = meter.create_counter(
"business_tasks_completed_total",
description="Total number of business tasks completed"
)
self.data_processed = meter.create_counter(
"business_data_processed_bytes",
description="Total amount of data processed"
)
@asynccontextmanager
async def track_workflow_execution(self, workflow_name: str, attributes: Dict[str, Any] = None):
"""Context manager to track workflow execution metrics."""
start_time = time.time()
attrs = {"workflow": workflow_name}
if attributes:
attrs.update(attributes)
# Increment execution counter
self.workflow_executions.add(1, attrs)
try:
yield
# Success
duration = time.time() - start_time
self.workflow_duration.record(duration, {**attrs, "status": "success"})
self.workflow_success_rate.add(1, {**attrs, "status": "success"})
except Exception as e:
# Failure
duration = time.time() - start_time
self.workflow_duration.record(duration, {**attrs, "status": "error"})
self.workflow_success_rate.add(-1, {**attrs, "status": "error"})
self.error_count.add(1, {**attrs, "error_type": type(e).__name__})
raise
@asynccontextmanager
async def track_agent_execution(self, agent_name: str, attributes: Dict[str, Any] = None):
"""Context manager to track agent execution metrics."""
start_time = time.time()
attrs = {"agent": agent_name}
if attributes:
attrs.update(attributes)
# Increment active agent count
self.agent_creations.add(1, attrs)
self.agent_active_count.add(1, attrs)
try:
yield
finally:
# Always decrement active count and record duration
duration = time.time() - start_time
self.agent_execution_duration.record(duration, attrs)
self.agent_active_count.add(-1, attrs)
def track_llm_usage(self, model: str, tokens: int, cost: float = 0, operation: str = "generate"):
"""Track LLM usage metrics."""
attrs = {"model": model, "operation": operation}
self.llm_requests.add(1, attrs)
self.llm_tokens_consumed.add(tokens, attrs)
if cost > 0:
self.llm_cost.add(cost, attrs)
def track_business_metrics(self, metric_type: str, value: float, attributes: Dict[str, Any] = None):
"""Track business-specific metrics."""
attrs = attributes or {}
if metric_type == "tasks_completed":
self.tasks_completed.add(value, attrs)
elif metric_type == "data_processed":
self.data_processed.add(value, attrs)
def track_system_metrics(self, memory_bytes: int, attributes: Dict[str, Any] = None):
"""Track system resource metrics."""
attrs = attributes or {}
self.memory_usage.add(memory_bytes, attrs)
# Usage example in workflow
@app.workflow
class MetricsEnabledWorkflow(Workflow[dict]):
def __init__(self):
super().__init__()
self.metrics = AgentMetrics(get_meter())
@app.workflow_run
async def run(self, data: dict) -> WorkflowResult[dict]:
async with self.metrics.track_workflow_execution("data_processing", {"version": "1.0"}):
# Agent execution with metrics
async with self.metrics.track_agent_execution("processor", {"type": "data_processor"}):
agent = Agent(name="processor", server_names=["api"])
async with agent:
llm = await agent.attach_llm(OpenAIAugmentedLLM)
result = await llm.generate_str(f"Process: {data}")
# Track LLM usage
self.metrics.track_llm_usage("openai-gpt-4", 150, 0.003)
# Track business metrics
self.metrics.track_business_metrics("tasks_completed", 1)
self.metrics.track_business_metrics("data_processed", len(str(data)))
return WorkflowResult(value={"processed": result})
Distributed Tracing
Advanced Tracing Patterns
Implement sophisticated tracing for complex workflows:Copy
Ask AI
# tracing/advanced_tracing.py
from opentelemetry import trace, baggage
from opentelemetry.trace import Status, StatusCode
from typing import Dict, Any, Optional
import json
import asyncio
from contextlib import asynccontextmanager
class AdvancedTracer:
"""Advanced tracing utilities for agent workflows."""
def __init__(self, tracer):
self.tracer = tracer
@asynccontextmanager
async def trace_workflow_execution(
self,
workflow_name: str,
workflow_id: str,
input_data: Any,
attributes: Dict[str, Any] = None
):
"""Comprehensive workflow tracing with correlation."""
with self.tracer.start_as_current_span(
f"workflow.{workflow_name}",
kind=trace.SpanKind.SERVER
) as span:
# Set standard workflow attributes
span.set_attributes({
"workflow.name": workflow_name,
"workflow.id": workflow_id,
"workflow.input.size": len(str(input_data)),
"workflow.version": "1.0.0"
})
# Add custom attributes
if attributes:
span.set_attributes(attributes)
# Set baggage for cross-service correlation
ctx = baggage.set_baggage("workflow.id", workflow_id)
ctx = baggage.set_baggage("workflow.name", workflow_name, ctx)
try:
# Create workflow context
workflow_context = {
"workflow_id": workflow_id,
"span_context": span.get_span_context(),
"trace_id": format(span.get_span_context().trace_id, '032x')
}
yield workflow_context
# Mark successful completion
span.set_status(Status(StatusCode.OK))
span.set_attribute("workflow.status", "completed")
except Exception as e:
# Mark error and add exception details
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
span.set_attribute("workflow.status", "failed")
span.set_attribute("workflow.error.type", type(e).__name__)
raise
@asynccontextmanager
async def trace_agent_interaction(
self,
agent_name: str,
operation: str,
parent_context: Optional[Dict] = None
):
"""Trace agent interactions with detailed context."""
span_name = f"agent.{agent_name}.{operation}"
with self.tracer.start_as_current_span(
span_name,
kind=trace.SpanKind.INTERNAL
) as span:
span.set_attributes({
"agent.name": agent_name,
"agent.operation": operation,
"agent.type": "mcp_agent"
})
# Link to parent workflow if provided
if parent_context:
span.set_attribute("workflow.id", parent_context.get("workflow_id"))
try:
agent_context = {
"agent_name": agent_name,
"operation": operation,
"span_context": span.get_span_context(),
"parent_context": parent_context
}
yield agent_context
span.set_status(Status(StatusCode.OK))
except Exception as e:
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
span.set_attribute("agent.error.type", type(e).__name__)
raise
@asynccontextmanager
async def trace_external_call(
self,
service_name: str,
operation: str,
endpoint: str = None,
request_data: Any = None
):
"""Trace external service calls."""
with self.tracer.start_as_current_span(
f"external.{service_name}.{operation}",
kind=trace.SpanKind.CLIENT
) as span:
span.set_attributes({
"service.name": service_name,
"service.operation": operation,
"http.method": "POST", # Assuming most agent calls are POST
"external.call": True
})
if endpoint:
span.set_attribute("http.url", endpoint)
if request_data:
span.set_attribute("request.size", len(str(request_data)))
try:
yield span
span.set_status(Status(StatusCode.OK))
except Exception as e:
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise
async def trace_parallel_execution(
self,
tasks: Dict[str, asyncio.Task],
operation_name: str = "parallel_execution"
):
"""Trace parallel task execution with individual spans."""
with self.tracer.start_as_current_span(f"parallel.{operation_name}") as parent_span:
parent_span.set_attributes({
"parallel.task_count": len(tasks),
"parallel.operation": operation_name
})
# Create child spans for each task
task_spans = {}
for task_name, task in tasks.items():
child_span = self.tracer.start_span(
f"parallel.task.{task_name}",
kind=trace.SpanKind.INTERNAL
)
child_span.set_attributes({
"task.name": task_name,
"task.parallel": True
})
task_spans[task_name] = child_span
try:
# Wait for all tasks to complete
results = {}
for task_name, task in tasks.items():
span = task_spans[task_name]
try:
result = await task
results[task_name] = result
span.set_status(Status(StatusCode.OK))
span.set_attribute("task.status", "completed")
except Exception as e:
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
span.set_attribute("task.status", "failed")
results[task_name] = {"error": str(e)}
finally:
span.end()
# Update parent span
successful_tasks = sum(1 for r in results.values() if "error" not in r)
parent_span.set_attributes({
"parallel.successful_tasks": successful_tasks,
"parallel.failed_tasks": len(tasks) - successful_tasks
})
return results
except Exception as e:
parent_span.record_exception(e)
parent_span.set_status(Status(StatusCode.ERROR, str(e)))
# Close any remaining spans
for span in task_spans.values():
if not span.is_recording():
continue
span.set_status(Status(StatusCode.ERROR, "Parent operation failed"))
span.end()
raise
# Usage in workflow
@app.workflow
class TracedWorkflow(Workflow[dict]):
def __init__(self):
super().__init__()
self.tracer = AdvancedTracer(get_tracer())
@app.workflow_run
async def run(self, data: dict) -> WorkflowResult[dict]:
workflow_id = f"traced_workflow_{int(time.time())}"
async with self.tracer.trace_workflow_execution(
"traced_data_processing",
workflow_id,
data,
{"user_id": data.get("user_id"), "priority": data.get("priority", "normal")}
) as workflow_ctx:
# Sequential processing with tracing
validation_result = await self.trace_validation_step(data, workflow_ctx)
processing_result = await self.trace_processing_step(validation_result, workflow_ctx)
# Parallel analysis with tracing
analysis_tasks = {
"sentiment": self.analyze_sentiment(processing_result, workflow_ctx),
"entities": self.extract_entities(processing_result, workflow_ctx),
"summary": self.generate_summary(processing_result, workflow_ctx)
}
parallel_results = await self.tracer.trace_parallel_execution(
analysis_tasks,
"data_analysis"
)
# Final synthesis
final_result = await self.trace_synthesis_step(
processing_result,
parallel_results,
workflow_ctx
)
return WorkflowResult(value=final_result)
async def trace_validation_step(self, data: dict, workflow_ctx: dict):
"""Validation step with detailed tracing."""
async with self.tracer.trace_agent_interaction(
"validator",
"validate_data",
workflow_ctx
) as agent_ctx:
agent = Agent(name="validator", server_names=["validation"])
async with agent:
llm = await agent.attach_llm(OpenAIAugmentedLLM)
# Trace the LLM call
async with self.tracer.trace_external_call(
"openai",
"generate",
"https://api.openai.com/v1/chat/completions",
{"data": data}
) as llm_span:
result = await llm.generate_str(f"Validate: {data}")
llm_span.set_attribute("llm.response_length", len(result))
return {"validated_data": data, "validation_result": result}
Log Aggregation
Structured Logging Setup
Configure structured logging for comprehensive log aggregation:Copy
Ask AI
# logging/structured_logging.py
import logging
import json
from datetime import datetime
from typing import Any, Dict
from opentelemetry.trace import get_current_span
from opentelemetry import baggage
class StructuredFormatter(logging.Formatter):
"""Custom formatter for structured JSON logs."""
def format(self, record: logging.LogRecord) -> str:
# Base log structure
log_entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
# Add trace context if available
span = get_current_span()
if span and span.is_recording():
span_context = span.get_span_context()
log_entry.update({
"trace_id": format(span_context.trace_id, '032x'),
"span_id": format(span_context.span_id, '016x')
})
# Add baggage context
workflow_id = baggage.get_baggage("workflow.id")
if workflow_id:
log_entry["workflow_id"] = workflow_id
workflow_name = baggage.get_baggage("workflow.name")
if workflow_name:
log_entry["workflow_name"] = workflow_name
# Add custom fields from record
if hasattr(record, "custom_fields"):
log_entry.update(record.custom_fields)
# Add exception information
if record.exc_info:
log_entry["exception"] = {
"type": record.exc_info[0].__name__,
"message": str(record.exc_info[1]),
"traceback": self.formatException(record.exc_info)
}
return json.dumps(log_entry, ensure_ascii=False)
class AgentLogger:
"""Enhanced logger for agent workflows with context."""
def __init__(self, name: str):
self.logger = logging.getLogger(name)
self.setup_handler()
def setup_handler(self):
"""Setup structured logging handler."""
if not self.logger.handlers:
handler = logging.StreamHandler()
formatter = StructuredFormatter()
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def info(self, message: str, **kwargs):
"""Log info message with custom fields."""
extra = {"custom_fields": kwargs} if kwargs else {}
self.logger.info(message, extra=extra)
def error(self, message: str, **kwargs):
"""Log error message with custom fields."""
extra = {"custom_fields": kwargs} if kwargs else {}
self.logger.error(message, extra=extra)
def warning(self, message: str, **kwargs):
"""Log warning message with custom fields."""
extra = {"custom_fields": kwargs} if kwargs else {}
self.logger.warning(message, extra=extra)
def debug(self, message: str, **kwargs):
"""Log debug message with custom fields."""
extra = {"custom_fields": kwargs} if kwargs else {}
self.logger.debug(message, extra=extra)
def workflow_start(self, workflow_name: str, workflow_id: str, input_data: Any):
"""Log workflow start."""
self.info(
"Workflow started",
workflow_name=workflow_name,
workflow_id=workflow_id,
input_size=len(str(input_data)),
event_type="workflow_start"
)
def workflow_complete(self, workflow_name: str, workflow_id: str, duration: float, output_data: Any):
"""Log workflow completion."""
self.info(
"Workflow completed successfully",
workflow_name=workflow_name,
workflow_id=workflow_id,
duration_seconds=duration,
output_size=len(str(output_data)),
event_type="workflow_complete"
)
def workflow_error(self, workflow_name: str, workflow_id: str, error: Exception, duration: float):
"""Log workflow error."""
self.error(
"Workflow failed",
workflow_name=workflow_name,
workflow_id=workflow_id,
error_type=type(error).__name__,
error_message=str(error),
duration_seconds=duration,
event_type="workflow_error",
exc_info=True
)
def agent_interaction(self, agent_name: str, operation: str, duration: float, success: bool, **kwargs):
"""Log agent interaction."""
level = self.info if success else self.error
level(
f"Agent {operation} {'completed' if success else 'failed'}",
agent_name=agent_name,
operation=operation,
duration_seconds=duration,
success=success,
event_type="agent_interaction",
**kwargs
)
def llm_usage(self, model: str, operation: str, tokens: int, cost: float, duration: float):
"""Log LLM usage."""
self.info(
"LLM request completed",
model=model,
operation=operation,
tokens_used=tokens,
cost_usd=cost,
duration_seconds=duration,
event_type="llm_usage"
)
def external_service_call(self, service: str, endpoint: str, method: str, status_code: int, duration: float):
"""Log external service calls."""
level = self.info if 200 <= status_code < 400 else self.error
level(
f"External service call to {service}",
service=service,
endpoint=endpoint,
method=method,
status_code=status_code,
duration_seconds=duration,
event_type="external_service_call"
)
# Usage in workflows
@app.workflow
class LoggedWorkflow(Workflow[dict]):
def __init__(self):
super().__init__()
self.logger = AgentLogger(f"workflow.{self.__class__.__name__}")
@app.workflow_run
async def run(self, data: dict) -> WorkflowResult[dict]:
workflow_id = f"logged_workflow_{int(time.time())}"
start_time = time.time()
self.logger.workflow_start("LoggedWorkflow", workflow_id, data)
try:
# Your workflow logic here
result = await self.process_data(data)
duration = time.time() - start_time
self.logger.workflow_complete("LoggedWorkflow", workflow_id, duration, result)
return WorkflowResult(value=result)
except Exception as e:
duration = time.time() - start_time
self.logger.workflow_error("LoggedWorkflow", workflow_id, e, duration)
raise
async def process_data(self, data: dict) -> dict:
"""Process data with logging."""
agent_start = time.time()
try:
agent = Agent(name="processor", server_names=["api"])
async with agent:
llm = await agent.attach_llm(OpenAIAugmentedLLM)
llm_start = time.time()
result = await llm.generate_str(f"Process: {data}")
llm_duration = time.time() - llm_start
# Log LLM usage
self.logger.llm_usage(
model="openai-gpt-4",
operation="process_data",
tokens=150, # Estimated
cost=0.003,
duration=llm_duration
)
agent_duration = time.time() - agent_start
self.logger.agent_interaction(
agent_name="processor",
operation="process_data",
duration=agent_duration,
success=True,
input_size=len(str(data)),
output_size=len(result)
)
return {"processed": result}
except Exception as e:
agent_duration = time.time() - agent_start
self.logger.agent_interaction(
agent_name="processor",
operation="process_data",
duration=agent_duration,
success=False,
error=str(e)
)
raise
Dashboard Setup
Grafana Dashboard Configuration
Create comprehensive Grafana dashboards for monitoring:Copy
Ask AI
{
"dashboard": {
"title": "MCP Agent Workflows Dashboard",
"tags": ["mcp-agent", "workflows", "observability"],
"time": {
"from": "now-1h",
"to": "now"
},
"panels": [
{
"title": "Workflow Execution Rate",
"type": "stat",
"targets": [
{
"expr": "rate(agent_workflow_executions_total[5m])",
"legendFormat": "{{workflow}}"
}
],
"fieldConfig": {
"defaults": {
"unit": "ops",
"thresholds": {
"steps": [
{"color": "green", "value": 0},
{"color": "yellow", "value": 10},
{"color": "red", "value": 50}
]
}
}
}
},
{
"title": "Workflow Success Rate",
"type": "stat",
"targets": [
{
"expr": "rate(agent_workflow_executions_total{status=\"success\"}[5m]) / rate(agent_workflow_executions_total[5m]) * 100",
"legendFormat": "Success Rate"
}
],
"fieldConfig": {
"defaults": {
"unit": "percent",
"min": 0,
"max": 100,
"thresholds": {
"steps": [
{"color": "red", "value": 0},
{"color": "yellow", "value": 90},
{"color": "green", "value": 95}
]
}
}
}
},
{
"title": "Workflow Duration",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(agent_workflow_duration_seconds_bucket[5m]))",
"legendFormat": "95th percentile"
},
{
"expr": "histogram_quantile(0.50, rate(agent_workflow_duration_seconds_bucket[5m]))",
"legendFormat": "50th percentile"
}
],
"yAxes": [
{
"unit": "s",
"min": 0
}
]
},
{
"title": "Active Agents",
"type": "graph",
"targets": [
{
"expr": "agent_active_count",
"legendFormat": "{{agent}}"
}
],
"yAxes": [
{
"unit": "short",
"min": 0
}
]
},
{
"title": "LLM Token Usage",
"type": "graph",
"targets": [
{
"expr": "rate(llm_tokens_consumed_total[5m])",
"legendFormat": "{{model}} - {{operation}}"
}
],
"yAxes": [
{
"unit": "short",
"min": 0
}
]
},
{
"title": "LLM Costs",
"type": "stat",
"targets": [
{
"expr": "increase(llm_cost_total[1h])",
"legendFormat": "Hourly Cost"
}
],
"fieldConfig": {
"defaults": {
"unit": "currencyUSD",
"thresholds": {
"steps": [
{"color": "green", "value": 0},
{"color": "yellow", "value": 10},
{"color": "red", "value": 50}
]
}
}
}
},
{
"title": "Error Rate by Type",
"type": "graph",
"targets": [
{
"expr": "rate(agent_errors_total[5m])",
"legendFormat": "{{error_type}}"
}
],
"yAxes": [
{
"unit": "ops",
"min": 0
}
]
},
{
"title": "Memory Usage",
"type": "graph",
"targets": [
{
"expr": "agent_memory_usage_bytes",
"legendFormat": "{{instance}}"
}
],
"yAxes": [
{
"unit": "bytes",
"min": 0
}
]
}
]
}
}
Kubernetes Monitoring Setup
Deploy monitoring stack in Kubernetes:Copy
Ask AI
# monitoring/prometheus-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-config
data:
prometheus.yml: |
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "agent_alerts.yml"
scrape_configs:
- job_name: 'mcp-agent-workflows'
static_configs:
- targets: ['mcp-agent-service:8000']
metrics_path: /metrics
scrape_interval: 5s
- job_name: 'mcp-agent-workers'
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_app]
action: keep
regex: mcp-agent-worker
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
action: keep
regex: true
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_port]
action: replace
target_label: __address__
regex: ([^:]+)(?::\d+)?;(\d+)
replacement: $1:$2
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093
---
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-alerts
data:
agent_alerts.yml: |
groups:
- name: mcp_agent_alerts
rules:
- alert: WorkflowHighErrorRate
expr: rate(agent_workflow_executions_total{status="error"}[5m]) / rate(agent_workflow_executions_total[5m]) > 0.1
for: 2m
labels:
severity: warning
annotations:
summary: "High workflow error rate detected"
description: "Error rate is {{ $value | humanizePercentage }} for workflow {{ $labels.workflow }}"
- alert: WorkflowHighLatency
expr: histogram_quantile(0.95, rate(agent_workflow_duration_seconds_bucket[5m])) > 60
for: 5m
labels:
severity: warning
annotations:
summary: "High workflow latency detected"
description: "95th percentile latency is {{ $value }}s for workflow {{ $labels.workflow }}"
- alert: LLMCostSpike
expr: increase(llm_cost_total[1h]) > 100
for: 1m
labels:
severity: critical
annotations:
summary: "LLM cost spike detected"
description: "LLM costs have increased by ${{ $value }} in the last hour"
- alert: AgentMemoryUsageHigh
expr: agent_memory_usage_bytes > 1000000000 # 1GB
for: 5m
labels:
severity: warning
annotations:
summary: "High memory usage by agent"
description: "Agent {{ $labels.agent }} is using {{ $value | humanizeBytes }} of memory"
Alert Configuration
Intelligent Alerting Rules
Set up smart alerts that reduce noise and focus on actionable issues:Copy
Ask AI
# alerting/alertmanager-config.yml
global:
smtp_smarthost: 'smtp.gmail.com:587'
smtp_from: 'alerts@yourcompany.com'
slack_api_url: 'YOUR_SLACK_WEBHOOK_URL'
route:
group_by: ['alertname', 'severity']
group_wait: 30s
group_interval: 5m
repeat_interval: 12h
receiver: 'default'
routes:
- match:
severity: critical
receiver: 'critical-alerts'
group_wait: 10s
group_interval: 1m
repeat_interval: 1h
- match:
alertname: 'WorkflowHighErrorRate'
receiver: 'workflow-alerts'
- match:
alertname: 'LLMCostSpike'
receiver: 'cost-alerts'
receivers:
- name: 'default'
slack_configs:
- channel: '#alerts'
title: 'MCP Agent Alert'
text: '{{ range .Alerts }}{{ .Annotations.summary }}{{ end }}'
- name: 'critical-alerts'
slack_configs:
- channel: '#critical-alerts'
title: '=� CRITICAL: MCP Agent Alert'
text: |
{{ range .Alerts }}
*Alert:* {{ .Annotations.summary }}
*Description:* {{ .Annotations.description }}
*Severity:* {{ .Labels.severity }}
*Time:* {{ .StartsAt.Format "2006-01-02 15:04:05" }}
{{ end }}
email_configs:
- to: 'oncall@yourcompany.com'
subject: 'CRITICAL: MCP Agent Alert'
body: |
{{ range .Alerts }}
Alert: {{ .Annotations.summary }}
Description: {{ .Annotations.description }}
Severity: {{ .Labels.severity }}
Time: {{ .StartsAt.Format "2006-01-02 15:04:05" }}
{{ end }}
- name: 'workflow-alerts'
slack_configs:
- channel: '#workflow-monitoring'
title: 'Workflow Alert'
text: |
{{ range .Alerts }}
Workflow {{ .Labels.workflow }} is experiencing issues:
{{ .Annotations.description }}
{{ end }}
- name: 'cost-alerts'
slack_configs:
- channel: '#cost-monitoring'
title: '=� LLM Cost Alert'
text: |
{{ range .Alerts }}
{{ .Annotations.summary }}
Current hourly cost trend: {{ .Annotations.description }}
{{ end }}
inhibit_rules:
- source_match:
severity: 'critical'
target_match:
severity: 'warning'
equal: ['alertname', 'workflow']
Custom Alert Rules
Define domain-specific alert rules:Copy
Ask AI
# alerting/custom_alerts.py
from typing import Dict, List, Callable
import asyncio
import time
from dataclasses import dataclass
from enum import Enum
class AlertSeverity(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class Alert:
name: str
severity: AlertSeverity
message: str
labels: Dict[str, str]
timestamp: float
resolved: bool = False
class AlertManager:
"""Custom alert manager for agent workflows."""
def __init__(self):
self.active_alerts: Dict[str, Alert] = {}
self.alert_handlers: Dict[str, Callable] = {}
self.metrics_cache: Dict[str, float] = {}
def register_handler(self, alert_name: str, handler: Callable):
"""Register custom handler for specific alerts."""
self.alert_handlers[alert_name] = handler
async def check_workflow_health(self, metrics: Dict[str, float]):
"""Check workflow health metrics and trigger alerts."""
self.metrics_cache.update(metrics)
# Check error rate
error_rate = metrics.get("workflow_error_rate", 0)
if error_rate > 0.1: # 10% error rate
await self.trigger_alert(
"workflow_high_error_rate",
AlertSeverity.WARNING,
f"Workflow error rate is {error_rate:.2%}",
{"error_rate": str(error_rate)}
)
elif error_rate > 0.25: # 25% error rate
await self.trigger_alert(
"workflow_critical_error_rate",
AlertSeverity.CRITICAL,
f"Critical workflow error rate: {error_rate:.2%}",
{"error_rate": str(error_rate)}
)
else:
await self.resolve_alert("workflow_high_error_rate")
await self.resolve_alert("workflow_critical_error_rate")
# Check latency
p95_latency = metrics.get("workflow_p95_latency", 0)
if p95_latency > 300: # 5 minutes
await self.trigger_alert(
"workflow_high_latency",
AlertSeverity.WARNING,
f"High workflow latency: {p95_latency}s (95th percentile)",
{"latency": str(p95_latency)}
)
else:
await self.resolve_alert("workflow_high_latency")
# Check LLM costs
hourly_cost = metrics.get("llm_hourly_cost", 0)
if hourly_cost > 50: # $50/hour
await self.trigger_alert(
"llm_cost_spike",
AlertSeverity.CRITICAL,
f"LLM costs spiking: ${hourly_cost:.2f}/hour",
{"cost": str(hourly_cost)}
)
elif hourly_cost > 20: # $20/hour
await self.trigger_alert(
"llm_cost_high",
AlertSeverity.WARNING,
f"Elevated LLM costs: ${hourly_cost:.2f}/hour",
{"cost": str(hourly_cost)}
)
else:
await self.resolve_alert("llm_cost_high")
await self.resolve_alert("llm_cost_spike")
# Check memory usage
memory_usage = metrics.get("memory_usage_gb", 0)
if memory_usage > 8: # 8GB
await self.trigger_alert(
"high_memory_usage",
AlertSeverity.WARNING,
f"High memory usage: {memory_usage:.1f}GB",
{"memory_gb": str(memory_usage)}
)
else:
await self.resolve_alert("high_memory_usage")
async def trigger_alert(self, name: str, severity: AlertSeverity, message: str, labels: Dict[str, str]):
"""Trigger an alert."""
alert_key = f"{name}_{hash(str(labels))}"
if alert_key not in self.active_alerts:
alert = Alert(
name=name,
severity=severity,
message=message,
labels=labels,
timestamp=time.time()
)
self.active_alerts[alert_key] = alert
# Execute custom handler if registered
handler = self.alert_handlers.get(name)
if handler:
await handler(alert)
print(f"=� ALERT: {alert.severity.value.upper()} - {alert.message}")
async def resolve_alert(self, name: str):
"""Resolve alerts by name."""
resolved_alerts = []
for key, alert in self.active_alerts.items():
if alert.name == name and not alert.resolved:
alert.resolved = True
alert.timestamp = time.time()
resolved_alerts.append(key)
print(f" RESOLVED: {alert.message}")
# Remove resolved alerts
for key in resolved_alerts:
del self.active_alerts[key]
def get_active_alerts(self) -> List[Alert]:
"""Get all active alerts."""
return [alert for alert in self.active_alerts.values() if not alert.resolved]
# Usage example
async def setup_custom_alerting():
alert_manager = AlertManager()
# Register custom handlers
async def handle_cost_spike(alert: Alert):
# Custom logic for cost spike alerts
cost = float(alert.labels.get("cost", 0))
if cost > 100: # $100/hour
# Emergency actions
await emergency_cost_controls()
# Send to cost monitoring channel
await send_slack_alert("#cost-monitoring", alert)
async def handle_critical_errors(alert: Alert):
# Auto-restart failed workflows
await restart_failed_workflows()
# Page on-call engineer
await page_oncall_engineer(alert)
alert_manager.register_handler("llm_cost_spike", handle_cost_spike)
alert_manager.register_handler("workflow_critical_error_rate", handle_critical_errors)
# Run monitoring loop
while True:
# Collect metrics from your monitoring system
metrics = await collect_workflow_metrics()
await alert_manager.check_workflow_health(metrics)
# Check every 30 seconds
await asyncio.sleep(30)
async def collect_workflow_metrics() -> Dict[str, float]:
"""Collect metrics from Prometheus or other monitoring system."""
# This would typically query your metrics store
return {
"workflow_error_rate": 0.05, # 5%
"workflow_p95_latency": 45, # 45 seconds
"llm_hourly_cost": 25.50, # $25.50/hour
"memory_usage_gb": 6.2 # 6.2GB
}
Performance Monitoring
Comprehensive Performance Tracking
Monitor performance across all workflow components:Copy
Ask AI
# monitoring/performance_monitor.py
import asyncio
import time
import psutil
import resource
from typing import Dict, Any, List
from dataclasses import dataclass, asdict
from contextlib import asynccontextmanager
@dataclass
class PerformanceMetrics:
timestamp: float
cpu_percent: float
memory_mb: float
memory_percent: float
disk_io_read_mb: float
disk_io_write_mb: float
network_io_sent_mb: float
network_io_recv_mb: float
active_threads: int
open_files: int
workflow_queue_size: int
agent_pool_size: int
avg_response_time_ms: float
p95_response_time_ms: float
requests_per_second: float
error_rate: float
class PerformanceMonitor:
"""Comprehensive performance monitoring for agent workflows."""
def __init__(self, collection_interval: float = 10.0):
self.collection_interval = collection_interval
self.metrics_history: List[PerformanceMetrics] = []
self.max_history_size = 1000
self.response_times: List[float] = []
self.request_count = 0
self.error_count = 0
self.start_time = time.time()
self.running = False
# System baseline
self.baseline_metrics = None
async def start_monitoring(self):
"""Start continuous performance monitoring."""
self.running = True
self.baseline_metrics = await self.collect_system_metrics()
while self.running:
try:
metrics = await self.collect_comprehensive_metrics()
self.metrics_history.append(metrics)
# Trim history if needed
if len(self.metrics_history) > self.max_history_size:
self.metrics_history = self.metrics_history[-self.max_history_size:]
# Check for performance anomalies
await self.check_performance_anomalies(metrics)
await asyncio.sleep(self.collection_interval)
except Exception as e:
print(f"Error in performance monitoring: {e}")
await asyncio.sleep(self.collection_interval)
async def stop_monitoring(self):
"""Stop performance monitoring."""
self.running = False
async def collect_comprehensive_metrics(self) -> PerformanceMetrics:
"""Collect comprehensive performance metrics."""
# System metrics
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk_io = psutil.disk_io_counters()
network_io = psutil.net_io_counters()
# Process metrics
process = psutil.Process()
process_memory = process.memory_info().rss / 1024 / 1024 # MB
open_files = len(process.open_files())
# Application metrics
current_time = time.time()
uptime = current_time - self.start_time
# Calculate RPS
requests_per_second = self.request_count / uptime if uptime > 0 else 0
# Calculate error rate
error_rate = self.error_count / max(self.request_count, 1)
# Response time percentiles
avg_response_time = sum(self.response_times[-100:]) / len(self.response_times[-100:]) if self.response_times else 0
p95_response_time = self.calculate_percentile(self.response_times[-100:], 95) if self.response_times else 0
return PerformanceMetrics(
timestamp=current_time,
cpu_percent=cpu_percent,
memory_mb=memory.used / 1024 / 1024,
memory_percent=memory.percent,
disk_io_read_mb=disk_io.read_bytes / 1024 / 1024 if disk_io else 0,
disk_io_write_mb=disk_io.write_bytes / 1024 / 1024 if disk_io else 0,
network_io_sent_mb=network_io.bytes_sent / 1024 / 1024 if network_io else 0,
network_io_recv_mb=network_io.bytes_recv / 1024 / 1024 if network_io else 0,
active_threads=process.num_threads(),
open_files=open_files,
workflow_queue_size=await self.get_workflow_queue_size(),
agent_pool_size=await self.get_agent_pool_size(),
avg_response_time_ms=avg_response_time * 1000,
p95_response_time_ms=p95_response_time * 1000,
requests_per_second=requests_per_second,
error_rate=error_rate
)
async def collect_system_metrics(self) -> Dict[str, Any]:
"""Collect baseline system metrics."""
return {
"cpu_count": psutil.cpu_count(),
"memory_total_gb": psutil.virtual_memory().total / 1024 / 1024 / 1024,
"disk_total_gb": psutil.disk_usage('/').total / 1024 / 1024 / 1024,
"platform": psutil.platform
}
@asynccontextmanager
async def track_request(self):
"""Context manager to track request performance."""
start_time = time.time()
success = True
try:
yield
except Exception as e:
success = False
self.error_count += 1
raise
finally:
duration = time.time() - start_time
self.response_times.append(duration)
self.request_count += 1
# Trim response times history
if len(self.response_times) > 1000:
self.response_times = self.response_times[-1000:]
async def check_performance_anomalies(self, metrics: PerformanceMetrics):
"""Check for performance anomalies and alert if necessary."""
# CPU usage anomaly
if metrics.cpu_percent > 80:
await self.trigger_performance_alert(
"high_cpu_usage",
f"High CPU usage: {metrics.cpu_percent:.1f}%",
metrics
)
# Memory usage anomaly
if metrics.memory_percent > 85:
await self.trigger_performance_alert(
"high_memory_usage",
f"High memory usage: {metrics.memory_percent:.1f}%",
metrics
)
# Response time anomaly
if metrics.p95_response_time_ms > 5000: # 5 seconds
await self.trigger_performance_alert(
"high_response_time",
f"High response time: {metrics.p95_response_time_ms:.0f}ms (95th percentile)",
metrics
)
# Error rate anomaly
if metrics.error_rate > 0.05: # 5% error rate
await self.trigger_performance_alert(
"high_error_rate",
f"High error rate: {metrics.error_rate:.2%}",
metrics
)
# Queue backup anomaly
if metrics.workflow_queue_size > 100:
await self.trigger_performance_alert(
"workflow_queue_backup",
f"Workflow queue backup: {metrics.workflow_queue_size} pending workflows",
metrics
)
async def trigger_performance_alert(self, alert_type: str, message: str, metrics: PerformanceMetrics):
"""Trigger performance alert."""
print(f"=% PERFORMANCE ALERT [{alert_type}]: {message}")
# Here you would integrate with your alerting system
# await send_slack_alert(f"#performance-alerts", {
# "alert_type": alert_type,
# "message": message,
# "metrics": asdict(metrics)
# })
def calculate_percentile(self, values: List[float], percentile: float) -> float:
"""Calculate percentile from list of values."""
if not values:
return 0
sorted_values = sorted(values)
index = int((percentile / 100.0) * len(sorted_values))
return sorted_values[min(index, len(sorted_values) - 1)]
async def get_workflow_queue_size(self) -> int:
"""Get current workflow queue size."""
# This would integrate with your workflow queue system
return 0
async def get_agent_pool_size(self) -> int:
"""Get current agent pool size."""
# This would integrate with your agent pool system
return 0
def get_performance_summary(self, duration_minutes: int = 60) -> Dict[str, Any]:
"""Get performance summary for the last N minutes."""
cutoff_time = time.time() - (duration_minutes * 60)
recent_metrics = [m for m in self.metrics_history if m.timestamp > cutoff_time]
if not recent_metrics:
return {"error": "No metrics available for the specified duration"}
return {
"duration_minutes": duration_minutes,
"metrics_count": len(recent_metrics),
"avg_cpu_percent": sum(m.cpu_percent for m in recent_metrics) / len(recent_metrics),
"max_cpu_percent": max(m.cpu_percent for m in recent_metrics),
"avg_memory_percent": sum(m.memory_percent for m in recent_metrics) / len(recent_metrics),
"max_memory_percent": max(m.memory_percent for m in recent_metrics),
"avg_response_time_ms": sum(m.avg_response_time_ms for m in recent_metrics) / len(recent_metrics),
"max_response_time_ms": max(m.p95_response_time_ms for m in recent_metrics),
"total_requests": sum(m.requests_per_second for m in recent_metrics) * duration_minutes * 60,
"avg_error_rate": sum(m.error_rate for m in recent_metrics) / len(recent_metrics),
"max_queue_size": max(m.workflow_queue_size for m in recent_metrics)
}
# Usage in workflow
@app.workflow
class PerformanceMonitoredWorkflow(Workflow[dict]):
def __init__(self):
super().__init__()
self.perf_monitor = PerformanceMonitor()
@app.workflow_run
async def run(self, data: dict) -> WorkflowResult[dict]:
# Start performance monitoring
monitor_task = asyncio.create_task(self.perf_monitor.start_monitoring())
try:
# Track this workflow execution
async with self.perf_monitor.track_request():
result = await self.process_with_performance_tracking(data)
return WorkflowResult(value=result)
finally:
await self.perf_monitor.stop_monitoring()
monitor_task.cancel()
async def process_with_performance_tracking(self, data: dict) -> dict:
"""Process data with performance tracking."""
# Your workflow logic here with performance monitoring
agent = Agent(name="processor", server_names=["api"])
async with agent:
llm = await agent.attach_llm(OpenAIAugmentedLLM)
result = await llm.generate_str(f"Process: {data}")
return {"processed": result}
Next Steps
Temporal Integration
Integrate observability with Temporal workflows for production durability
Pattern Composition
Apply monitoring to composed workflow patterns
Production Deployment
Deploy monitored workflows to production environments
Workflow Examples
See monitoring in action with complete workflow examples