Learn how to combine multiple workflow patterns, create nested workflows, and implement advanced coordination patterns for sophisticated agent systems.
Pattern Composition Overview
Workflow pattern composition allows you to build complex agent systems by combining simpler, well-tested patterns. This approach provides:Modularity
Build complex workflows from reusable components
Testability
Test individual patterns in isolation
Maintainability
Update and evolve patterns independently
Scalability
Scale different patterns based on workload
Combining Multiple Patterns
Sequential Pattern Composition
Chain different workflow patterns together:Copy
Ask AI
from mcp_agent.app import MCPApp
from mcp_agent.workflows import Workflow, WorkflowResult
from mcp_agent.agents.agent import Agent
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM
app = MCPApp(name="composed_agent")
@app.workflow
class DataPipelineWorkflow(Workflow[dict]):
"""Combines extraction, validation, processing, and reporting patterns."""
@app.workflow_run
async def run(self, source_config: dict) -> WorkflowResult[dict]:
pipeline_results = {}
# Step 1: Data Extraction Pattern
extraction_result = await self.extract_data(source_config)
pipeline_results["extraction"] = extraction_result
# Step 2: Data Validation Pattern
validation_result = await self.validate_data(extraction_result)
pipeline_results["validation"] = validation_result
# Step 3: Parallel Processing Pattern
processing_result = await self.process_data_parallel(validation_result)
pipeline_results["processing"] = processing_result
# Step 4: Aggregation and Reporting Pattern
report = await self.generate_report(processing_result)
pipeline_results["report"] = report
return WorkflowResult(value=pipeline_results)
async def extract_data(self, config: dict) -> dict:
"""Data extraction workflow pattern."""
extractor_agent = Agent(
name="data_extractor",
instruction="Extract data from various sources with high reliability.",
server_names=["database", "api", "filesystem"]
)
async with extractor_agent:
llm = await extractor_agent.attach_llm(OpenAIAugmentedLLM)
# Extract from multiple sources
sources = config.get("sources", [])
extracted_data = []
for source in sources:
extraction = await llm.generate_str(
f"Extract data from {source['type']}: {source['location']}"
)
extracted_data.append({
"source": source,
"data": extraction,
"timestamp": datetime.utcnow().isoformat()
})
return {
"extracted_items": extracted_data,
"total_sources": len(sources)
}
async def validate_data(self, extracted_data: dict) -> dict:
"""Data validation workflow pattern."""
validator_agent = Agent(
name="data_validator",
instruction="Validate data quality and consistency.",
server_names=["validation_service"]
)
async with validator_agent:
llm = await validator_agent.attach_llm(OpenAIAugmentedLLM)
validated_items = []
validation_errors = []
for item in extracted_data["extracted_items"]:
validation = await llm.generate_str(
f"Validate data quality and schema: {item['data']}"
)
if "valid" in validation.lower():
validated_items.append(item)
else:
validation_errors.append({
"item": item,
"error": validation
})
return {
"valid_items": validated_items,
"errors": validation_errors,
"validation_rate": len(validated_items) / extracted_data["total_sources"]
}
async def process_data_parallel(self, validated_data: dict) -> dict:
"""Parallel processing workflow pattern."""
import asyncio
async def process_item(item):
processor_agent = Agent(
name=f"processor_{item['source']['type']}",
instruction="Process and enrich data items.",
server_names=["ml_service", "enrichment_api"]
)
async with processor_agent:
llm = await processor_agent.attach_llm(OpenAIAugmentedLLM)
processed = await llm.generate_str(
f"Process and enrich: {item['data']}"
)
return {
"original": item,
"processed": processed,
"processing_timestamp": datetime.utcnow().isoformat()
}
# Process all valid items in parallel
processing_tasks = [
process_item(item)
for item in validated_data["valid_items"]
]
processed_results = await asyncio.gather(*processing_tasks)
return {
"processed_items": processed_results,
"processing_count": len(processed_results)
}
async def generate_report(self, processed_data: dict) -> dict:
"""Report generation workflow pattern."""
reporter_agent = Agent(
name="report_generator",
instruction="Generate comprehensive reports from processed data.",
server_names=["reporting_service", "filesystem"]
)
async with reporter_agent:
llm = await reporter_agent.attach_llm(OpenAIAugmentedLLM)
summary = await llm.generate_str(
f"Generate executive summary for {len(processed_data['processed_items'])} processed items"
)
detailed_report = await llm.generate_str(
f"Create detailed analysis report: {processed_data}"
)
return {
"summary": summary,
"detailed_report": detailed_report,
"report_timestamp": datetime.utcnow().isoformat(),
"items_processed": processed_data["processing_count"]
}
Parallel Pattern Composition
Run multiple patterns concurrently:Copy
Ask AI
@app.workflow
class MultiAnalysisWorkflow(Workflow[dict]):
"""Run multiple analysis patterns in parallel."""
@app.workflow_run
async def run(self, document: str) -> WorkflowResult[dict]:
# Launch multiple analysis patterns concurrently
analysis_tasks = await asyncio.gather(
self.sentiment_analysis_pattern(document),
self.entity_extraction_pattern(document),
self.topic_modeling_pattern(document),
self.quality_assessment_pattern(document),
self.summarization_pattern(document)
)
# Combine results from all patterns
combined_results = {
"sentiment": analysis_tasks[0],
"entities": analysis_tasks[1],
"topics": analysis_tasks[2],
"quality": analysis_tasks[3],
"summary": analysis_tasks[4],
"analysis_timestamp": datetime.utcnow().isoformat()
}
# Generate meta-analysis
meta_analysis = await self.meta_analysis_pattern(combined_results)
combined_results["meta_analysis"] = meta_analysis
return WorkflowResult(value=combined_results)
async def sentiment_analysis_pattern(self, text: str) -> dict:
"""Sentiment analysis workflow pattern."""
sentiment_agent = Agent(
name="sentiment_analyzer",
instruction="Analyze text sentiment with nuanced understanding.",
server_names=["sentiment_api", "ml_service"]
)
async with sentiment_agent:
llm = await sentiment_agent.attach_llm(OpenAIAugmentedLLM)
# Primary sentiment analysis
primary_sentiment = await llm.generate_str(
f"Analyze overall sentiment of this text: {text[:500]}..."
)
# Aspect-based sentiment
aspects_sentiment = await llm.generate_str(
f"Analyze sentiment for key aspects/topics in: {text[:500]}..."
)
# Confidence scoring
confidence = await llm.generate_str(
f"Rate confidence in sentiment analysis (0-100): {primary_sentiment}"
)
return {
"primary_sentiment": primary_sentiment,
"aspects": aspects_sentiment,
"confidence": confidence,
"pattern": "sentiment_analysis"
}
async def entity_extraction_pattern(self, text: str) -> dict:
"""Named entity recognition workflow pattern."""
entity_agent = Agent(
name="entity_extractor",
instruction="Extract and classify entities with high precision.",
server_names=["ner_service", "knowledge_graph"]
)
async with entity_agent:
llm = await entity_agent.attach_llm(OpenAIAugmentedLLM)
# Extract entities
entities = await llm.generate_str(
f"Extract named entities (people, places, organizations, etc.): {text[:500]}..."
)
# Entity relationships
relationships = await llm.generate_str(
f"Identify relationships between entities: {entities}"
)
# Entity disambiguation
disambiguated = await llm.generate_str(
f"Disambiguate entities using context: {entities}"
)
return {
"entities": entities,
"relationships": relationships,
"disambiguated": disambiguated,
"pattern": "entity_extraction"
}
async def meta_analysis_pattern(self, all_analyses: dict) -> dict:
"""Meta-analysis pattern to synthesize insights."""
meta_agent = Agent(
name="meta_analyzer",
instruction="Synthesize insights from multiple analysis patterns.",
server_names=["synthesis_engine"]
)
async with meta_agent:
llm = await meta_agent.attach_llm(OpenAIAugmentedLLM)
synthesis = await llm.generate_str(
f"Synthesize key insights from multiple analyses: {all_analyses}"
)
confidence_assessment = await llm.generate_str(
f"Assess overall confidence in combined analysis results"
)
recommendations = await llm.generate_str(
f"Generate actionable recommendations based on synthesis: {synthesis}"
)
return {
"synthesis": synthesis,
"confidence": confidence_assessment,
"recommendations": recommendations,
"pattern": "meta_analysis"
}
Nested Workflow Patterns
Hierarchical Workflow Composition
Create workflows that spawn child workflows:Copy
Ask AI
@app.workflow
class ProjectManagementWorkflow(Workflow[dict]):
"""Master workflow that orchestrates project execution."""
@app.workflow_run
async def run(self, project_config: dict) -> WorkflowResult[dict]:
project_results = {}
# Phase 1: Project Planning (Child Workflow)
planning_handle = await self.start_child_workflow(
PlanningWorkflow,
project_config,
workflow_id=f"planning-{project_config['project_id']}"
)
project_results["planning"] = await planning_handle.result()
# Phase 2: Resource Allocation (Child Workflow)
resources_handle = await self.start_child_workflow(
ResourceAllocationWorkflow,
{
"project_plan": project_results["planning"],
"budget": project_config["budget"]
},
workflow_id=f"resources-{project_config['project_id']}"
)
project_results["resources"] = await resources_handle.result()
# Phase 3: Parallel Task Execution (Multiple Child Workflows)
task_handles = []
tasks = project_results["planning"]["tasks"]
for task in tasks:
task_handle = await self.start_child_workflow(
TaskExecutionWorkflow,
{
"task": task,
"resources": project_results["resources"],
"project_context": project_config
},
workflow_id=f"task-{project_config['project_id']}-{task['id']}"
)
task_handles.append(task_handle)
# Wait for all tasks to complete
task_results = []
for handle in task_handles:
result = await handle.result()
task_results.append(result)
project_results["tasks"] = task_results
# Phase 4: Project Closure (Child Workflow)
closure_handle = await self.start_child_workflow(
ProjectClosureWorkflow,
{
"project_results": project_results,
"original_config": project_config
},
workflow_id=f"closure-{project_config['project_id']}"
)
project_results["closure"] = await closure_handle.result()
return WorkflowResult(value=project_results)
@app.workflow
class PlanningWorkflow(Workflow[dict]):
"""Child workflow for project planning."""
@app.workflow_run
async def run(self, project_config: dict) -> WorkflowResult[dict]:
planner_agent = Agent(
name="project_planner",
instruction="Create detailed project plans with task breakdown.",
server_names=["project_mgmt", "resource_db"]
)
async with planner_agent:
llm = await planner_agent.attach_llm(OpenAIAugmentedLLM)
# Analyze project requirements
requirements = await llm.generate_str(
f"Analyze project requirements: {project_config}"
)
# Create task breakdown structure
task_breakdown = await llm.generate_str(
f"Create detailed task breakdown: {requirements}"
)
# Estimate timeline and dependencies
timeline = await llm.generate_str(
f"Create project timeline with dependencies: {task_breakdown}"
)
# Risk assessment
risks = await llm.generate_str(
f"Identify project risks and mitigation strategies: {project_config}"
)
return WorkflowResult(value={
"requirements": requirements,
"tasks": task_breakdown,
"timeline": timeline,
"risks": risks,
"planning_completed": datetime.utcnow().isoformat()
})
@app.workflow
class TaskExecutionWorkflow(Workflow[dict]):
"""Child workflow for individual task execution."""
@app.workflow_run
async def run(self, task_data: dict) -> WorkflowResult[dict]:
task = task_data["task"]
# Task-specific agent
executor_agent = Agent(
name=f"task_executor_{task['type']}",
instruction=f"Execute {task['type']} tasks efficiently and thoroughly.",
server_names=task.get("required_services", ["general"])
)
async with executor_agent:
llm = await executor_agent.attach_llm(OpenAIAugmentedLLM)
# Execute task with progress tracking
execution_result = await llm.generate_str(
f"Execute task: {task} with resources: {task_data['resources']}"
)
# Quality check
quality_check = await llm.generate_str(
f"Perform quality check on task execution: {execution_result}"
)
# Generate deliverable
deliverable = await llm.generate_str(
f"Create task deliverable: {execution_result}"
)
return WorkflowResult(value={
"task_id": task["id"],
"execution_result": execution_result,
"quality_check": quality_check,
"deliverable": deliverable,
"completion_time": datetime.utcnow().isoformat()
})
Dynamic Workflow Composition
Runtime Pattern Selection
Choose workflow patterns based on runtime conditions:Copy
Ask AI
@app.workflow
class AdaptiveAnalysisWorkflow(Workflow[dict]):
"""Dynamically selects analysis patterns based on input characteristics."""
@app.workflow_run
async def run(self, content: dict) -> WorkflowResult[dict]:
# Analyze input to determine optimal patterns
content_analysis = await self.analyze_content_characteristics(content)
# Select appropriate patterns based on characteristics
selected_patterns = await self.select_patterns(content_analysis)
# Execute selected patterns dynamically
pattern_results = {}
for pattern_name in selected_patterns:
result = await self.execute_pattern(pattern_name, content)
pattern_results[pattern_name] = result
# Synthesize results
final_result = await self.synthesize_results(pattern_results, content_analysis)
return WorkflowResult(value=final_result)
async def analyze_content_characteristics(self, content: dict) -> dict:
"""Analyze input to determine its characteristics."""
analyzer_agent = Agent(
name="content_analyzer",
instruction="Analyze content characteristics to guide processing strategy.",
server_names=["analysis_service"]
)
async with analyzer_agent:
llm = await analyzer_agent.attach_llm(OpenAIAugmentedLLM)
characteristics = await llm.generate_str(f"""
Analyze these content characteristics:
1. Content type and format
2. Length and complexity
3. Language and domain
4. Required processing depth
5. Time sensitivity
Content: {content}
""")
return {"characteristics": characteristics, "content_type": content.get("type")}
async def select_patterns(self, content_analysis: dict) -> list[str]:
"""Select optimal patterns based on content analysis."""
selector_agent = Agent(
name="pattern_selector",
instruction="Select optimal processing patterns based on content analysis.",
server_names=["decision_engine"]
)
async with selector_agent:
llm = await selector_agent.attach_llm(OpenAIAugmentedLLM)
pattern_selection = await llm.generate_str(f"""
Based on this content analysis, select the most appropriate processing patterns:
Available patterns:
- detailed_analysis: Deep, comprehensive analysis (slow, thorough)
- rapid_analysis: Quick insights extraction (fast, basic)
- multilingual_analysis: Language-specific processing
- technical_analysis: Domain-specific technical processing
- sentiment_analysis: Emotion and opinion analysis
- factual_analysis: Fact-checking and verification
- comparative_analysis: Comparison with reference materials
Analysis: {content_analysis}
Return comma-separated list of selected patterns.
""")
# Parse selected patterns
selected = [p.strip() for p in pattern_selection.split(",")]
return selected
async def execute_pattern(self, pattern_name: str, content: dict) -> dict:
"""Execute a specific analysis pattern."""
pattern_executors = {
"detailed_analysis": self.detailed_analysis_pattern,
"rapid_analysis": self.rapid_analysis_pattern,
"multilingual_analysis": self.multilingual_analysis_pattern,
"technical_analysis": self.technical_analysis_pattern,
"sentiment_analysis": self.sentiment_analysis_pattern,
"factual_analysis": self.factual_analysis_pattern,
"comparative_analysis": self.comparative_analysis_pattern
}
executor = pattern_executors.get(pattern_name)
if executor:
return await executor(content)
else:
return {"error": f"Unknown pattern: {pattern_name}"}
async def detailed_analysis_pattern(self, content: dict) -> dict:
"""Comprehensive analysis pattern."""
detailed_agent = Agent(
name="detailed_analyzer",
instruction="Perform thorough, comprehensive analysis with deep insights.",
server_names=["deep_analysis", "knowledge_base", "ml_service"]
)
async with detailed_agent:
llm = await detailed_agent.attach_llm(OpenAIAugmentedLLM)
# Multi-stage deep analysis
structural_analysis = await llm.generate_str(f"Deep structural analysis: {content}")
contextual_analysis = await llm.generate_str(f"Contextual analysis: {structural_analysis}")
implications = await llm.generate_str(f"Derive implications: {contextual_analysis}")
return {
"pattern": "detailed_analysis",
"structural": structural_analysis,
"contextual": contextual_analysis,
"implications": implications,
"depth": "comprehensive"
}
async def rapid_analysis_pattern(self, content: dict) -> dict:
"""Quick analysis pattern for time-sensitive processing."""
rapid_agent = Agent(
name="rapid_analyzer",
instruction="Provide quick, essential insights with time efficiency.",
server_names=["fast_analysis"]
)
async with rapid_agent:
llm = await rapid_agent.attach_llm(OpenAIAugmentedLLM)
quick_insights = await llm.generate_str(f"Quick key insights: {content}")
return {
"pattern": "rapid_analysis",
"insights": quick_insights,
"depth": "surface"
}
State Sharing Between Workflows
Shared State Management
Implement state sharing across workflow patterns:Copy
Ask AI
from typing import Dict, Any
import json
@app.workflow
class StatefulOrchestrator(Workflow[dict]):
"""Orchestrator that maintains shared state across patterns."""
def __init__(self):
self.shared_state: Dict[str, Any] = {
"global_context": {},
"pattern_results": {},
"workflow_metadata": {},
"communication_log": []
}
@app.workflow_run
async def run(self, initial_data: dict) -> WorkflowResult[dict]:
# Initialize shared state
self.shared_state["global_context"] = initial_data
self.shared_state["workflow_metadata"] = {
"start_time": datetime.utcnow().isoformat(),
"workflow_id": workflow.info().workflow_id,
"run_id": workflow.info().run_id
}
# Execute patterns with shared state
await self.execute_data_collection_pattern()
await self.execute_processing_patterns()
await self.execute_synthesis_pattern()
return WorkflowResult(value={
"final_state": self.shared_state,
"execution_summary": await self.generate_execution_summary()
})
async def execute_data_collection_pattern(self):
"""Data collection pattern that updates shared state."""
collector_agent = Agent(
name="data_collector",
instruction="Collect data and update shared context.",
server_names=["data_sources"]
)
async with collector_agent:
llm = await collector_agent.attach_llm(OpenAIAugmentedLLM)
# Collect data based on current context
collected_data = await llm.generate_str(
f"Collect relevant data based on context: {self.shared_state['global_context']}"
)
# Update shared state
self.shared_state["pattern_results"]["data_collection"] = {
"collected_data": collected_data,
"timestamp": datetime.utcnow().isoformat(),
"status": "completed"
}
# Update global context with new data
self.shared_state["global_context"]["collected_data"] = collected_data
# Log communication
self.shared_state["communication_log"].append({
"pattern": "data_collection",
"action": "state_update",
"timestamp": datetime.utcnow().isoformat(),
"data_keys": list(self.shared_state["pattern_results"]["data_collection"].keys())
})
async def execute_processing_patterns(self):
"""Execute multiple processing patterns that share state."""
# Pattern 1: Analysis
await self.execute_analysis_pattern()
# Pattern 2: Validation (uses analysis results)
await self.execute_validation_pattern()
# Pattern 3: Enhancement (uses both previous results)
await self.execute_enhancement_pattern()
async def execute_analysis_pattern(self):
"""Analysis pattern that reads and updates shared state."""
analysis_agent = Agent(
name="analyzer",
instruction="Analyze data using shared context and state.",
server_names=["analysis_service"]
)
async with analysis_agent:
llm = await analysis_agent.attach_llm(OpenAIAugmentedLLM)
# Use shared state for analysis
current_context = self.shared_state["global_context"]
previous_results = self.shared_state.get("pattern_results", {})
analysis_result = await llm.generate_str(f"""
Perform analysis using shared context:
Context: {current_context}
Previous Results: {previous_results}
""")
# Update shared state with analysis
self.shared_state["pattern_results"]["analysis"] = {
"result": analysis_result,
"timestamp": datetime.utcnow().isoformat(),
"input_context": current_context
}
# Update global context
self.shared_state["global_context"]["analysis_insights"] = analysis_result
async def execute_validation_pattern(self):
"""Validation pattern that uses analysis results from shared state."""
validator_agent = Agent(
name="validator",
instruction="Validate analysis results using shared state.",
server_names=["validation_service"]
)
async with validator_agent:
llm = await validator_agent.attach_llm(OpenAIAugmentedLLM)
# Access analysis results from shared state
analysis_result = self.shared_state["pattern_results"]["analysis"]["result"]
validation_result = await llm.generate_str(f"""
Validate analysis result:
Analysis to validate: {analysis_result}
Global context: {self.shared_state['global_context']}
""")
# Update shared state
self.shared_state["pattern_results"]["validation"] = {
"validation_result": validation_result,
"validated_analysis": analysis_result,
"timestamp": datetime.utcnow().isoformat()
}
# Update global context based on validation
is_valid = "valid" in validation_result.lower()
self.shared_state["global_context"]["validation_status"] = is_valid
async def execute_enhancement_pattern(self):
"""Enhancement pattern that uses all previous results."""
enhancer_agent = Agent(
name="enhancer",
instruction="Enhance results using all available shared state.",
server_names=["enhancement_service"]
)
async with enhancer_agent:
llm = await enhancer_agent.attach_llm(OpenAIAugmentedLLM)
# Use all shared state for enhancement
all_results = self.shared_state["pattern_results"]
global_context = self.shared_state["global_context"]
enhancement_result = await llm.generate_str(f"""
Enhance results using all available information:
All Pattern Results: {all_results}
Global Context: {global_context}
""")
# Final state update
self.shared_state["pattern_results"]["enhancement"] = {
"enhanced_result": enhancement_result,
"used_results": list(all_results.keys()),
"timestamp": datetime.utcnow().isoformat()
}
async def execute_synthesis_pattern(self):
"""Final synthesis pattern that creates comprehensive output."""
synthesizer_agent = Agent(
name="synthesizer",
instruction="Synthesize all shared state into final comprehensive result.",
server_names=["synthesis_engine"]
)
async with synthesizer_agent:
llm = await synthesizer_agent.attach_llm(OpenAIAugmentedLLM)
synthesis = await llm.generate_str(f"""
Synthesize comprehensive final result from all shared state:
Complete State: {self.shared_state}
""")
self.shared_state["pattern_results"]["synthesis"] = {
"final_synthesis": synthesis,
"synthesized_patterns": list(self.shared_state["pattern_results"].keys()),
"timestamp": datetime.utcnow().isoformat()
}
async def generate_execution_summary(self) -> dict:
"""Generate summary of workflow execution."""
return {
"executed_patterns": list(self.shared_state["pattern_results"].keys()),
"execution_duration": "calculated_duration",
"state_updates": len(self.shared_state["communication_log"]),
"final_context_keys": list(self.shared_state["global_context"].keys())
}
Advanced Coordination Patterns
Event-Driven Coordination
Implement event-driven coordination between patterns:Copy
Ask AI
from dataclasses import dataclass
from typing import List
from enum import Enum
class EventType(Enum):
PATTERN_STARTED = "pattern_started"
PATTERN_COMPLETED = "pattern_completed"
DATA_UPDATED = "data_updated"
ERROR_OCCURRED = "error_occurred"
THRESHOLD_REACHED = "threshold_reached"
@dataclass
class WorkflowEvent:
event_type: EventType
source_pattern: str
data: dict
timestamp: str
@app.workflow
class EventDrivenCoordinator(Workflow[dict]):
"""Event-driven coordination between workflow patterns."""
def __init__(self):
self.event_queue: List[WorkflowEvent] = []
self.pattern_states: Dict[str, str] = {}
self.event_handlers: Dict[EventType, callable] = {
EventType.PATTERN_COMPLETED: self.handle_pattern_completion,
EventType.DATA_UPDATED: self.handle_data_update,
EventType.ERROR_OCCURRED: self.handle_error,
EventType.THRESHOLD_REACHED: self.handle_threshold
}
@app.workflow_run
async def run(self, config: dict) -> WorkflowResult[dict]:
# Initialize event-driven execution
await self.initialize_patterns(config)
# Event processing loop
while not self.all_patterns_complete():
# Process queued events
await self.process_events()
# Check for new triggers
await self.check_triggers()
# Wait a bit before next iteration
await asyncio.sleep(1)
return WorkflowResult(value={
"execution_results": self.pattern_states,
"processed_events": len(self.event_queue),
"completion_time": datetime.utcnow().isoformat()
})
async def initialize_patterns(self, config: dict):
"""Initialize patterns based on configuration."""
patterns_to_start = config.get("initial_patterns", ["data_ingestion"])
for pattern_name in patterns_to_start:
await self.start_pattern(pattern_name, config)
async def start_pattern(self, pattern_name: str, config: dict):
"""Start a pattern and emit start event."""
self.pattern_states[pattern_name] = "running"
# Emit pattern started event
event = WorkflowEvent(
event_type=EventType.PATTERN_STARTED,
source_pattern=pattern_name,
data={"config": config},
timestamp=datetime.utcnow().isoformat()
)
self.event_queue.append(event)
# Execute pattern asynchronously
asyncio.create_task(self.execute_pattern_async(pattern_name, config))
async def execute_pattern_async(self, pattern_name: str, config: dict):
"""Execute pattern and emit completion event."""
try:
# Pattern execution logic
pattern_agent = Agent(
name=f"{pattern_name}_executor",
instruction=f"Execute {pattern_name} pattern according to configuration.",
server_names=config.get("required_services", ["general"])
)
async with pattern_agent:
llm = await pattern_agent.attach_llm(OpenAIAugmentedLLM)
result = await llm.generate_str(f"Execute {pattern_name}: {config}")
# Update pattern state
self.pattern_states[pattern_name] = "completed"
# Emit completion event
completion_event = WorkflowEvent(
event_type=EventType.PATTERN_COMPLETED,
source_pattern=pattern_name,
data={"result": result, "status": "success"},
timestamp=datetime.utcnow().isoformat()
)
self.event_queue.append(completion_event)
except Exception as e:
# Update state and emit error event
self.pattern_states[pattern_name] = "failed"
error_event = WorkflowEvent(
event_type=EventType.ERROR_OCCURRED,
source_pattern=pattern_name,
data={"error": str(e), "status": "failed"},
timestamp=datetime.utcnow().isoformat()
)
self.event_queue.append(error_event)
async def process_events(self):
"""Process all queued events."""
events_to_process = self.event_queue.copy()
self.event_queue.clear()
for event in events_to_process:
handler = self.event_handlers.get(event.event_type)
if handler:
await handler(event)
async def handle_pattern_completion(self, event: WorkflowEvent):
"""Handle pattern completion event."""
completed_pattern = event.source_pattern
# Determine next patterns to start based on completion
next_patterns = self.get_next_patterns(completed_pattern)
for next_pattern in next_patterns:
if self.pattern_states.get(next_pattern) != "running":
await self.start_pattern(next_pattern, event.data)
async def handle_data_update(self, event: WorkflowEvent):
"""Handle data update event."""
# Check if update triggers new patterns or threshold events
data_size = len(str(event.data))
if data_size > 10000: # Large data threshold
threshold_event = WorkflowEvent(
event_type=EventType.THRESHOLD_REACHED,
source_pattern=event.source_pattern,
data={"threshold": "large_data", "size": data_size},
timestamp=datetime.utcnow().isoformat()
)
self.event_queue.append(threshold_event)
async def handle_error(self, event: WorkflowEvent):
"""Handle error event."""
failed_pattern = event.source_pattern
# Implement error recovery logic
recovery_patterns = self.get_recovery_patterns(failed_pattern)
for recovery_pattern in recovery_patterns:
await self.start_pattern(recovery_pattern, {
"recovery_mode": True,
"failed_pattern": failed_pattern,
"error_details": event.data
})
async def handle_threshold(self, event: WorkflowEvent):
"""Handle threshold reached event."""
threshold_type = event.data.get("threshold")
if threshold_type == "large_data":
# Start parallel processing pattern for large data
await self.start_pattern("parallel_processing", event.data)
def get_next_patterns(self, completed_pattern: str) -> List[str]:
"""Get patterns that should start after completion."""
pattern_dependencies = {
"data_ingestion": ["data_validation", "initial_analysis"],
"data_validation": ["data_processing"],
"initial_analysis": ["detailed_analysis"],
"data_processing": ["result_synthesis"],
"detailed_analysis": ["result_synthesis"],
"parallel_processing": ["result_aggregation"],
"result_synthesis": ["final_reporting"],
"result_aggregation": ["final_reporting"]
}
return pattern_dependencies.get(completed_pattern, [])
def get_recovery_patterns(self, failed_pattern: str) -> List[str]:
"""Get recovery patterns for failed patterns."""
recovery_map = {
"data_ingestion": ["data_ingestion_retry"],
"data_processing": ["alternative_processing"],
"detailed_analysis": ["fallback_analysis"]
}
return recovery_map.get(failed_pattern, [])
def all_patterns_complete(self) -> bool:
"""Check if all patterns are complete."""
active_states = ["running", "pending"]
return not any(state in active_states for state in self.pattern_states.values())
async def check_triggers(self):
"""Check for external triggers that might start new patterns."""
# This could check external systems, databases, APIs, etc.
# For now, it's a placeholder for trigger logic
pass
Best Practices for Pattern Composition
Design for Composability
Design for Composability
- Keep patterns focused on single responsibilities
- Use well-defined interfaces between patterns
- Make patterns stateless when possible
- Document pattern dependencies clearly
Copy
Ask AI
# Good: Single responsibility pattern
@app.workflow
class DataValidationPattern(Workflow[dict]):
"""Focuses solely on data validation."""
pass
# Avoid: Pattern that tries to do everything
@app.workflow
class DataEverythingPattern(Workflow[dict]):
"""Validates, processes, analyzes, and reports data."""
pass
Handle Pattern Failures
Handle Pattern Failures
- Implement graceful degradation
- Use circuit breaker patterns
- Provide fallback mechanisms
- Log failures for debugging
Copy
Ask AI
async def execute_with_fallback(self, primary_pattern, fallback_pattern, data):
try:
return await primary_pattern(data)
except Exception as e:
logger.warning(f"Primary pattern failed: {e}, using fallback")
return await fallback_pattern(data)
Optimize Resource Usage
Optimize Resource Usage
- Share resources between patterns when possible
- Use connection pooling for external services
- Implement proper cleanup in patterns
- Monitor resource consumption
Copy
Ask AI
@app.workflow
class ResourceEfficientPattern(Workflow[dict]):
def __init__(self):
self.shared_agent_pool = AgentPool(max_size=5)
async def cleanup(self):
await self.shared_agent_pool.close()
Test Pattern Compositions
Test Pattern Compositions
- Test patterns in isolation
- Test pattern interactions
- Use mocks for external dependencies
- Validate error handling paths
Copy
Ask AI
@pytest.mark.asyncio
async def test_pattern_composition():
mock_config = {"test": True}
workflow = ComposedWorkflow()
result = await workflow.run(mock_config)
assert result.value["pattern_1_complete"] == True
assert result.value["pattern_2_complete"] == True