Learn how to combine multiple workflow patterns, create nested workflows, and implement advanced coordination patterns for sophisticated agent systems.

Pattern Composition Overview

Workflow pattern composition allows you to build complex agent systems by combining simpler, well-tested patterns. This approach provides:

Modularity

Build complex workflows from reusable components

Testability

Test individual patterns in isolation

Maintainability

Update and evolve patterns independently

Scalability

Scale different patterns based on workload

Combining Multiple Patterns

Sequential Pattern Composition

Chain different workflow patterns together:
from mcp_agent.app import MCPApp
from mcp_agent.workflows import Workflow, WorkflowResult
from mcp_agent.agents.agent import Agent
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM

app = MCPApp(name="composed_agent")

@app.workflow
class DataPipelineWorkflow(Workflow[dict]):
    """Combines extraction, validation, processing, and reporting patterns."""
    
    @app.workflow_run
    async def run(self, source_config: dict) -> WorkflowResult[dict]:
        pipeline_results = {}
        
        # Step 1: Data Extraction Pattern
        extraction_result = await self.extract_data(source_config)
        pipeline_results["extraction"] = extraction_result
        
        # Step 2: Data Validation Pattern
        validation_result = await self.validate_data(extraction_result)
        pipeline_results["validation"] = validation_result
        
        # Step 3: Parallel Processing Pattern
        processing_result = await self.process_data_parallel(validation_result)
        pipeline_results["processing"] = processing_result
        
        # Step 4: Aggregation and Reporting Pattern
        report = await self.generate_report(processing_result)
        pipeline_results["report"] = report
        
        return WorkflowResult(value=pipeline_results)
    
    async def extract_data(self, config: dict) -> dict:
        """Data extraction workflow pattern."""
        extractor_agent = Agent(
            name="data_extractor",
            instruction="Extract data from various sources with high reliability.",
            server_names=["database", "api", "filesystem"]
        )
        
        async with extractor_agent:
            llm = await extractor_agent.attach_llm(OpenAIAugmentedLLM)
            
            # Extract from multiple sources
            sources = config.get("sources", [])
            extracted_data = []
            
            for source in sources:
                extraction = await llm.generate_str(
                    f"Extract data from {source['type']}: {source['location']}"
                )
                extracted_data.append({
                    "source": source,
                    "data": extraction,
                    "timestamp": datetime.utcnow().isoformat()
                })
            
            return {
                "extracted_items": extracted_data,
                "total_sources": len(sources)
            }
    
    async def validate_data(self, extracted_data: dict) -> dict:
        """Data validation workflow pattern."""
        validator_agent = Agent(
            name="data_validator", 
            instruction="Validate data quality and consistency.",
            server_names=["validation_service"]
        )
        
        async with validator_agent:
            llm = await validator_agent.attach_llm(OpenAIAugmentedLLM)
            
            validated_items = []
            validation_errors = []
            
            for item in extracted_data["extracted_items"]:
                validation = await llm.generate_str(
                    f"Validate data quality and schema: {item['data']}"
                )
                
                if "valid" in validation.lower():
                    validated_items.append(item)
                else:
                    validation_errors.append({
                        "item": item,
                        "error": validation
                    })
            
            return {
                "valid_items": validated_items,
                "errors": validation_errors,
                "validation_rate": len(validated_items) / extracted_data["total_sources"]
            }
    
    async def process_data_parallel(self, validated_data: dict) -> dict:
        """Parallel processing workflow pattern."""
        import asyncio
        
        async def process_item(item):
            processor_agent = Agent(
                name=f"processor_{item['source']['type']}", 
                instruction="Process and enrich data items.",
                server_names=["ml_service", "enrichment_api"]
            )
            
            async with processor_agent:
                llm = await processor_agent.attach_llm(OpenAIAugmentedLLM)
                processed = await llm.generate_str(
                    f"Process and enrich: {item['data']}"
                )
                
                return {
                    "original": item,
                    "processed": processed,
                    "processing_timestamp": datetime.utcnow().isoformat()
                }
        
        # Process all valid items in parallel
        processing_tasks = [
            process_item(item) 
            for item in validated_data["valid_items"]
        ]
        
        processed_results = await asyncio.gather(*processing_tasks)
        
        return {
            "processed_items": processed_results,
            "processing_count": len(processed_results)
        }
    
    async def generate_report(self, processed_data: dict) -> dict:
        """Report generation workflow pattern."""
        reporter_agent = Agent(
            name="report_generator",
            instruction="Generate comprehensive reports from processed data.",
            server_names=["reporting_service", "filesystem"]
        )
        
        async with reporter_agent:
            llm = await reporter_agent.attach_llm(OpenAIAugmentedLLM)
            
            summary = await llm.generate_str(
                f"Generate executive summary for {len(processed_data['processed_items'])} processed items"
            )
            
            detailed_report = await llm.generate_str(
                f"Create detailed analysis report: {processed_data}"
            )
            
            return {
                "summary": summary,
                "detailed_report": detailed_report,
                "report_timestamp": datetime.utcnow().isoformat(),
                "items_processed": processed_data["processing_count"]
            }

Parallel Pattern Composition

Run multiple patterns concurrently:
@app.workflow
class MultiAnalysisWorkflow(Workflow[dict]):
    """Run multiple analysis patterns in parallel."""
    
    @app.workflow_run
    async def run(self, document: str) -> WorkflowResult[dict]:
        # Launch multiple analysis patterns concurrently
        analysis_tasks = await asyncio.gather(
            self.sentiment_analysis_pattern(document),
            self.entity_extraction_pattern(document),
            self.topic_modeling_pattern(document),
            self.quality_assessment_pattern(document),
            self.summarization_pattern(document)
        )
        
        # Combine results from all patterns
        combined_results = {
            "sentiment": analysis_tasks[0],
            "entities": analysis_tasks[1],
            "topics": analysis_tasks[2],
            "quality": analysis_tasks[3],
            "summary": analysis_tasks[4],
            "analysis_timestamp": datetime.utcnow().isoformat()
        }
        
        # Generate meta-analysis
        meta_analysis = await self.meta_analysis_pattern(combined_results)
        combined_results["meta_analysis"] = meta_analysis
        
        return WorkflowResult(value=combined_results)
    
    async def sentiment_analysis_pattern(self, text: str) -> dict:
        """Sentiment analysis workflow pattern."""
        sentiment_agent = Agent(
            name="sentiment_analyzer",
            instruction="Analyze text sentiment with nuanced understanding.",
            server_names=["sentiment_api", "ml_service"]
        )
        
        async with sentiment_agent:
            llm = await sentiment_agent.attach_llm(OpenAIAugmentedLLM)
            
            # Primary sentiment analysis
            primary_sentiment = await llm.generate_str(
                f"Analyze overall sentiment of this text: {text[:500]}..."
            )
            
            # Aspect-based sentiment
            aspects_sentiment = await llm.generate_str(
                f"Analyze sentiment for key aspects/topics in: {text[:500]}..."
            )
            
            # Confidence scoring
            confidence = await llm.generate_str(
                f"Rate confidence in sentiment analysis (0-100): {primary_sentiment}"
            )
            
            return {
                "primary_sentiment": primary_sentiment,
                "aspects": aspects_sentiment,
                "confidence": confidence,
                "pattern": "sentiment_analysis"
            }
    
    async def entity_extraction_pattern(self, text: str) -> dict:
        """Named entity recognition workflow pattern."""
        entity_agent = Agent(
            name="entity_extractor",
            instruction="Extract and classify entities with high precision.",
            server_names=["ner_service", "knowledge_graph"]
        )
        
        async with entity_agent:
            llm = await entity_agent.attach_llm(OpenAIAugmentedLLM)
            
            # Extract entities
            entities = await llm.generate_str(
                f"Extract named entities (people, places, organizations, etc.): {text[:500]}..."
            )
            
            # Entity relationships
            relationships = await llm.generate_str(
                f"Identify relationships between entities: {entities}"
            )
            
            # Entity disambiguation
            disambiguated = await llm.generate_str(
                f"Disambiguate entities using context: {entities}"
            )
            
            return {
                "entities": entities,
                "relationships": relationships,
                "disambiguated": disambiguated,
                "pattern": "entity_extraction"
            }
    
    async def meta_analysis_pattern(self, all_analyses: dict) -> dict:
        """Meta-analysis pattern to synthesize insights."""
        meta_agent = Agent(
            name="meta_analyzer",
            instruction="Synthesize insights from multiple analysis patterns.",
            server_names=["synthesis_engine"]
        )
        
        async with meta_agent:
            llm = await meta_agent.attach_llm(OpenAIAugmentedLLM)
            
            synthesis = await llm.generate_str(
                f"Synthesize key insights from multiple analyses: {all_analyses}"
            )
            
            confidence_assessment = await llm.generate_str(
                f"Assess overall confidence in combined analysis results"
            )
            
            recommendations = await llm.generate_str(
                f"Generate actionable recommendations based on synthesis: {synthesis}"
            )
            
            return {
                "synthesis": synthesis,
                "confidence": confidence_assessment,
                "recommendations": recommendations,
                "pattern": "meta_analysis"
            }

Nested Workflow Patterns

Hierarchical Workflow Composition

Create workflows that spawn child workflows:
@app.workflow
class ProjectManagementWorkflow(Workflow[dict]):
    """Master workflow that orchestrates project execution."""
    
    @app.workflow_run
    async def run(self, project_config: dict) -> WorkflowResult[dict]:
        project_results = {}
        
        # Phase 1: Project Planning (Child Workflow)
        planning_handle = await self.start_child_workflow(
            PlanningWorkflow,
            project_config,
            workflow_id=f"planning-{project_config['project_id']}"
        )
        project_results["planning"] = await planning_handle.result()
        
        # Phase 2: Resource Allocation (Child Workflow)
        resources_handle = await self.start_child_workflow(
            ResourceAllocationWorkflow,
            {
                "project_plan": project_results["planning"],
                "budget": project_config["budget"]
            },
            workflow_id=f"resources-{project_config['project_id']}"
        )
        project_results["resources"] = await resources_handle.result()
        
        # Phase 3: Parallel Task Execution (Multiple Child Workflows)
        task_handles = []
        tasks = project_results["planning"]["tasks"]
        
        for task in tasks:
            task_handle = await self.start_child_workflow(
                TaskExecutionWorkflow,
                {
                    "task": task,
                    "resources": project_results["resources"],
                    "project_context": project_config
                },
                workflow_id=f"task-{project_config['project_id']}-{task['id']}"
            )
            task_handles.append(task_handle)
        
        # Wait for all tasks to complete
        task_results = []
        for handle in task_handles:
            result = await handle.result()
            task_results.append(result)
        
        project_results["tasks"] = task_results
        
        # Phase 4: Project Closure (Child Workflow)
        closure_handle = await self.start_child_workflow(
            ProjectClosureWorkflow,
            {
                "project_results": project_results,
                "original_config": project_config
            },
            workflow_id=f"closure-{project_config['project_id']}"
        )
        project_results["closure"] = await closure_handle.result()
        
        return WorkflowResult(value=project_results)

@app.workflow
class PlanningWorkflow(Workflow[dict]):
    """Child workflow for project planning."""
    
    @app.workflow_run
    async def run(self, project_config: dict) -> WorkflowResult[dict]:
        planner_agent = Agent(
            name="project_planner",
            instruction="Create detailed project plans with task breakdown.",
            server_names=["project_mgmt", "resource_db"]
        )
        
        async with planner_agent:
            llm = await planner_agent.attach_llm(OpenAIAugmentedLLM)
            
            # Analyze project requirements
            requirements = await llm.generate_str(
                f"Analyze project requirements: {project_config}"
            )
            
            # Create task breakdown structure
            task_breakdown = await llm.generate_str(
                f"Create detailed task breakdown: {requirements}"
            )
            
            # Estimate timeline and dependencies
            timeline = await llm.generate_str(
                f"Create project timeline with dependencies: {task_breakdown}"
            )
            
            # Risk assessment
            risks = await llm.generate_str(
                f"Identify project risks and mitigation strategies: {project_config}"
            )
            
            return WorkflowResult(value={
                "requirements": requirements,
                "tasks": task_breakdown,
                "timeline": timeline,
                "risks": risks,
                "planning_completed": datetime.utcnow().isoformat()
            })

@app.workflow
class TaskExecutionWorkflow(Workflow[dict]):
    """Child workflow for individual task execution."""
    
    @app.workflow_run
    async def run(self, task_data: dict) -> WorkflowResult[dict]:
        task = task_data["task"]
        
        # Task-specific agent
        executor_agent = Agent(
            name=f"task_executor_{task['type']}",
            instruction=f"Execute {task['type']} tasks efficiently and thoroughly.",
            server_names=task.get("required_services", ["general"])
        )
        
        async with executor_agent:
            llm = await executor_agent.attach_llm(OpenAIAugmentedLLM)
            
            # Execute task with progress tracking
            execution_result = await llm.generate_str(
                f"Execute task: {task} with resources: {task_data['resources']}"
            )
            
            # Quality check
            quality_check = await llm.generate_str(
                f"Perform quality check on task execution: {execution_result}"
            )
            
            # Generate deliverable
            deliverable = await llm.generate_str(
                f"Create task deliverable: {execution_result}"
            )
            
            return WorkflowResult(value={
                "task_id": task["id"],
                "execution_result": execution_result,
                "quality_check": quality_check,
                "deliverable": deliverable,
                "completion_time": datetime.utcnow().isoformat()
            })

Dynamic Workflow Composition

Runtime Pattern Selection

Choose workflow patterns based on runtime conditions:
@app.workflow  
class AdaptiveAnalysisWorkflow(Workflow[dict]):
    """Dynamically selects analysis patterns based on input characteristics."""
    
    @app.workflow_run
    async def run(self, content: dict) -> WorkflowResult[dict]:
        # Analyze input to determine optimal patterns
        content_analysis = await self.analyze_content_characteristics(content)
        
        # Select appropriate patterns based on characteristics
        selected_patterns = await self.select_patterns(content_analysis)
        
        # Execute selected patterns dynamically
        pattern_results = {}
        for pattern_name in selected_patterns:
            result = await self.execute_pattern(pattern_name, content)
            pattern_results[pattern_name] = result
        
        # Synthesize results
        final_result = await self.synthesize_results(pattern_results, content_analysis)
        
        return WorkflowResult(value=final_result)
    
    async def analyze_content_characteristics(self, content: dict) -> dict:
        """Analyze input to determine its characteristics."""
        analyzer_agent = Agent(
            name="content_analyzer",
            instruction="Analyze content characteristics to guide processing strategy.",
            server_names=["analysis_service"]
        )
        
        async with analyzer_agent:
            llm = await analyzer_agent.attach_llm(OpenAIAugmentedLLM)
            
            characteristics = await llm.generate_str(f"""
            Analyze these content characteristics:
            1. Content type and format
            2. Length and complexity
            3. Language and domain
            4. Required processing depth
            5. Time sensitivity
            
            Content: {content}
            """)
            
            return {"characteristics": characteristics, "content_type": content.get("type")}
    
    async def select_patterns(self, content_analysis: dict) -> list[str]:
        """Select optimal patterns based on content analysis."""
        selector_agent = Agent(
            name="pattern_selector",
            instruction="Select optimal processing patterns based on content analysis.",
            server_names=["decision_engine"]
        )
        
        async with selector_agent:
            llm = await selector_agent.attach_llm(OpenAIAugmentedLLM)
            
            pattern_selection = await llm.generate_str(f"""
            Based on this content analysis, select the most appropriate processing patterns:
            
            Available patterns:
            - detailed_analysis: Deep, comprehensive analysis (slow, thorough)
            - rapid_analysis: Quick insights extraction (fast, basic)
            - multilingual_analysis: Language-specific processing
            - technical_analysis: Domain-specific technical processing
            - sentiment_analysis: Emotion and opinion analysis
            - factual_analysis: Fact-checking and verification
            - comparative_analysis: Comparison with reference materials
            
            Analysis: {content_analysis}
            
            Return comma-separated list of selected patterns.
            """)
            
            # Parse selected patterns
            selected = [p.strip() for p in pattern_selection.split(",")]
            return selected
    
    async def execute_pattern(self, pattern_name: str, content: dict) -> dict:
        """Execute a specific analysis pattern."""
        pattern_executors = {
            "detailed_analysis": self.detailed_analysis_pattern,
            "rapid_analysis": self.rapid_analysis_pattern,
            "multilingual_analysis": self.multilingual_analysis_pattern,
            "technical_analysis": self.technical_analysis_pattern,
            "sentiment_analysis": self.sentiment_analysis_pattern,
            "factual_analysis": self.factual_analysis_pattern,
            "comparative_analysis": self.comparative_analysis_pattern
        }
        
        executor = pattern_executors.get(pattern_name)
        if executor:
            return await executor(content)
        else:
            return {"error": f"Unknown pattern: {pattern_name}"}
    
    async def detailed_analysis_pattern(self, content: dict) -> dict:
        """Comprehensive analysis pattern."""
        detailed_agent = Agent(
            name="detailed_analyzer",
            instruction="Perform thorough, comprehensive analysis with deep insights.",
            server_names=["deep_analysis", "knowledge_base", "ml_service"]
        )
        
        async with detailed_agent:
            llm = await detailed_agent.attach_llm(OpenAIAugmentedLLM)
            
            # Multi-stage deep analysis
            structural_analysis = await llm.generate_str(f"Deep structural analysis: {content}")
            contextual_analysis = await llm.generate_str(f"Contextual analysis: {structural_analysis}")
            implications = await llm.generate_str(f"Derive implications: {contextual_analysis}")
            
            return {
                "pattern": "detailed_analysis",
                "structural": structural_analysis,
                "contextual": contextual_analysis,
                "implications": implications,
                "depth": "comprehensive"
            }
    
    async def rapid_analysis_pattern(self, content: dict) -> dict:
        """Quick analysis pattern for time-sensitive processing."""
        rapid_agent = Agent(
            name="rapid_analyzer",
            instruction="Provide quick, essential insights with time efficiency.",
            server_names=["fast_analysis"]
        )
        
        async with rapid_agent:
            llm = await rapid_agent.attach_llm(OpenAIAugmentedLLM)
            
            quick_insights = await llm.generate_str(f"Quick key insights: {content}")
            
            return {
                "pattern": "rapid_analysis", 
                "insights": quick_insights,
                "depth": "surface"
            }

State Sharing Between Workflows

Shared State Management

Implement state sharing across workflow patterns:
from typing import Dict, Any
import json

@app.workflow
class StatefulOrchestrator(Workflow[dict]):
    """Orchestrator that maintains shared state across patterns."""
    
    def __init__(self):
        self.shared_state: Dict[str, Any] = {
            "global_context": {},
            "pattern_results": {},
            "workflow_metadata": {},
            "communication_log": []
        }
    
    @app.workflow_run
    async def run(self, initial_data: dict) -> WorkflowResult[dict]:
        # Initialize shared state
        self.shared_state["global_context"] = initial_data
        self.shared_state["workflow_metadata"] = {
            "start_time": datetime.utcnow().isoformat(),
            "workflow_id": workflow.info().workflow_id,
            "run_id": workflow.info().run_id
        }
        
        # Execute patterns with shared state
        await self.execute_data_collection_pattern()
        await self.execute_processing_patterns()
        await self.execute_synthesis_pattern()
        
        return WorkflowResult(value={
            "final_state": self.shared_state,
            "execution_summary": await self.generate_execution_summary()
        })
    
    async def execute_data_collection_pattern(self):
        """Data collection pattern that updates shared state."""
        collector_agent = Agent(
            name="data_collector",
            instruction="Collect data and update shared context.",
            server_names=["data_sources"]
        )
        
        async with collector_agent:
            llm = await collector_agent.attach_llm(OpenAIAugmentedLLM)
            
            # Collect data based on current context
            collected_data = await llm.generate_str(
                f"Collect relevant data based on context: {self.shared_state['global_context']}"
            )
            
            # Update shared state
            self.shared_state["pattern_results"]["data_collection"] = {
                "collected_data": collected_data,
                "timestamp": datetime.utcnow().isoformat(),
                "status": "completed"
            }
            
            # Update global context with new data
            self.shared_state["global_context"]["collected_data"] = collected_data
            
            # Log communication
            self.shared_state["communication_log"].append({
                "pattern": "data_collection",
                "action": "state_update",
                "timestamp": datetime.utcnow().isoformat(),
                "data_keys": list(self.shared_state["pattern_results"]["data_collection"].keys())
            })
    
    async def execute_processing_patterns(self):
        """Execute multiple processing patterns that share state."""
        # Pattern 1: Analysis
        await self.execute_analysis_pattern()
        
        # Pattern 2: Validation (uses analysis results)
        await self.execute_validation_pattern()
        
        # Pattern 3: Enhancement (uses both previous results)
        await self.execute_enhancement_pattern()
    
    async def execute_analysis_pattern(self):
        """Analysis pattern that reads and updates shared state."""
        analysis_agent = Agent(
            name="analyzer",
            instruction="Analyze data using shared context and state.",
            server_names=["analysis_service"]
        )
        
        async with analysis_agent:
            llm = await analysis_agent.attach_llm(OpenAIAugmentedLLM)
            
            # Use shared state for analysis
            current_context = self.shared_state["global_context"]
            previous_results = self.shared_state.get("pattern_results", {})
            
            analysis_result = await llm.generate_str(f"""
            Perform analysis using shared context:
            Context: {current_context}
            Previous Results: {previous_results}
            """)
            
            # Update shared state with analysis
            self.shared_state["pattern_results"]["analysis"] = {
                "result": analysis_result,
                "timestamp": datetime.utcnow().isoformat(),
                "input_context": current_context
            }
            
            # Update global context
            self.shared_state["global_context"]["analysis_insights"] = analysis_result
    
    async def execute_validation_pattern(self):
        """Validation pattern that uses analysis results from shared state."""
        validator_agent = Agent(
            name="validator",
            instruction="Validate analysis results using shared state.",
            server_names=["validation_service"]
        )
        
        async with validator_agent:
            llm = await validator_agent.attach_llm(OpenAIAugmentedLLM)
            
            # Access analysis results from shared state
            analysis_result = self.shared_state["pattern_results"]["analysis"]["result"]
            
            validation_result = await llm.generate_str(f"""
            Validate analysis result:
            Analysis to validate: {analysis_result}
            Global context: {self.shared_state['global_context']}
            """)
            
            # Update shared state
            self.shared_state["pattern_results"]["validation"] = {
                "validation_result": validation_result,
                "validated_analysis": analysis_result,
                "timestamp": datetime.utcnow().isoformat()
            }
            
            # Update global context based on validation
            is_valid = "valid" in validation_result.lower()
            self.shared_state["global_context"]["validation_status"] = is_valid
    
    async def execute_enhancement_pattern(self):
        """Enhancement pattern that uses all previous results."""
        enhancer_agent = Agent(
            name="enhancer",
            instruction="Enhance results using all available shared state.",
            server_names=["enhancement_service"]
        )
        
        async with enhancer_agent:
            llm = await enhancer_agent.attach_llm(OpenAIAugmentedLLM)
            
            # Use all shared state for enhancement
            all_results = self.shared_state["pattern_results"]
            global_context = self.shared_state["global_context"]
            
            enhancement_result = await llm.generate_str(f"""
            Enhance results using all available information:
            All Pattern Results: {all_results}
            Global Context: {global_context}
            """)
            
            # Final state update
            self.shared_state["pattern_results"]["enhancement"] = {
                "enhanced_result": enhancement_result,
                "used_results": list(all_results.keys()),
                "timestamp": datetime.utcnow().isoformat()
            }
    
    async def execute_synthesis_pattern(self):
        """Final synthesis pattern that creates comprehensive output."""
        synthesizer_agent = Agent(
            name="synthesizer",
            instruction="Synthesize all shared state into final comprehensive result.",
            server_names=["synthesis_engine"]
        )
        
        async with synthesizer_agent:
            llm = await synthesizer_agent.attach_llm(OpenAIAugmentedLLM)
            
            synthesis = await llm.generate_str(f"""
            Synthesize comprehensive final result from all shared state:
            Complete State: {self.shared_state}
            """)
            
            self.shared_state["pattern_results"]["synthesis"] = {
                "final_synthesis": synthesis,
                "synthesized_patterns": list(self.shared_state["pattern_results"].keys()),
                "timestamp": datetime.utcnow().isoformat()
            }
    
    async def generate_execution_summary(self) -> dict:
        """Generate summary of workflow execution."""
        return {
            "executed_patterns": list(self.shared_state["pattern_results"].keys()),
            "execution_duration": "calculated_duration",
            "state_updates": len(self.shared_state["communication_log"]),
            "final_context_keys": list(self.shared_state["global_context"].keys())
        }

Advanced Coordination Patterns

Event-Driven Coordination

Implement event-driven coordination between patterns:
from dataclasses import dataclass
from typing import List
from enum import Enum

class EventType(Enum):
    PATTERN_STARTED = "pattern_started"
    PATTERN_COMPLETED = "pattern_completed"
    DATA_UPDATED = "data_updated"
    ERROR_OCCURRED = "error_occurred"
    THRESHOLD_REACHED = "threshold_reached"

@dataclass
class WorkflowEvent:
    event_type: EventType
    source_pattern: str
    data: dict
    timestamp: str

@app.workflow
class EventDrivenCoordinator(Workflow[dict]):
    """Event-driven coordination between workflow patterns."""
    
    def __init__(self):
        self.event_queue: List[WorkflowEvent] = []
        self.pattern_states: Dict[str, str] = {}
        self.event_handlers: Dict[EventType, callable] = {
            EventType.PATTERN_COMPLETED: self.handle_pattern_completion,
            EventType.DATA_UPDATED: self.handle_data_update,
            EventType.ERROR_OCCURRED: self.handle_error,
            EventType.THRESHOLD_REACHED: self.handle_threshold
        }
    
    @app.workflow_run
    async def run(self, config: dict) -> WorkflowResult[dict]:
        # Initialize event-driven execution
        await self.initialize_patterns(config)
        
        # Event processing loop
        while not self.all_patterns_complete():
            # Process queued events
            await self.process_events()
            
            # Check for new triggers
            await self.check_triggers()
            
            # Wait a bit before next iteration
            await asyncio.sleep(1)
        
        return WorkflowResult(value={
            "execution_results": self.pattern_states,
            "processed_events": len(self.event_queue),
            "completion_time": datetime.utcnow().isoformat()
        })
    
    async def initialize_patterns(self, config: dict):
        """Initialize patterns based on configuration."""
        patterns_to_start = config.get("initial_patterns", ["data_ingestion"])
        
        for pattern_name in patterns_to_start:
            await self.start_pattern(pattern_name, config)
    
    async def start_pattern(self, pattern_name: str, config: dict):
        """Start a pattern and emit start event."""
        self.pattern_states[pattern_name] = "running"
        
        # Emit pattern started event
        event = WorkflowEvent(
            event_type=EventType.PATTERN_STARTED,
            source_pattern=pattern_name,
            data={"config": config},
            timestamp=datetime.utcnow().isoformat()
        )
        self.event_queue.append(event)
        
        # Execute pattern asynchronously
        asyncio.create_task(self.execute_pattern_async(pattern_name, config))
    
    async def execute_pattern_async(self, pattern_name: str, config: dict):
        """Execute pattern and emit completion event."""
        try:
            # Pattern execution logic
            pattern_agent = Agent(
                name=f"{pattern_name}_executor",
                instruction=f"Execute {pattern_name} pattern according to configuration.",
                server_names=config.get("required_services", ["general"])
            )
            
            async with pattern_agent:
                llm = await pattern_agent.attach_llm(OpenAIAugmentedLLM)
                result = await llm.generate_str(f"Execute {pattern_name}: {config}")
            
            # Update pattern state
            self.pattern_states[pattern_name] = "completed"
            
            # Emit completion event
            completion_event = WorkflowEvent(
                event_type=EventType.PATTERN_COMPLETED,
                source_pattern=pattern_name,
                data={"result": result, "status": "success"},
                timestamp=datetime.utcnow().isoformat()
            )
            self.event_queue.append(completion_event)
            
        except Exception as e:
            # Update state and emit error event
            self.pattern_states[pattern_name] = "failed"
            
            error_event = WorkflowEvent(
                event_type=EventType.ERROR_OCCURRED,
                source_pattern=pattern_name,
                data={"error": str(e), "status": "failed"},
                timestamp=datetime.utcnow().isoformat()
            )
            self.event_queue.append(error_event)
    
    async def process_events(self):
        """Process all queued events."""
        events_to_process = self.event_queue.copy()
        self.event_queue.clear()
        
        for event in events_to_process:
            handler = self.event_handlers.get(event.event_type)
            if handler:
                await handler(event)
    
    async def handle_pattern_completion(self, event: WorkflowEvent):
        """Handle pattern completion event."""
        completed_pattern = event.source_pattern
        
        # Determine next patterns to start based on completion
        next_patterns = self.get_next_patterns(completed_pattern)
        
        for next_pattern in next_patterns:
            if self.pattern_states.get(next_pattern) != "running":
                await self.start_pattern(next_pattern, event.data)
    
    async def handle_data_update(self, event: WorkflowEvent):
        """Handle data update event."""
        # Check if update triggers new patterns or threshold events
        data_size = len(str(event.data))
        
        if data_size > 10000:  # Large data threshold
            threshold_event = WorkflowEvent(
                event_type=EventType.THRESHOLD_REACHED,
                source_pattern=event.source_pattern,
                data={"threshold": "large_data", "size": data_size},
                timestamp=datetime.utcnow().isoformat()
            )
            self.event_queue.append(threshold_event)
    
    async def handle_error(self, event: WorkflowEvent):
        """Handle error event."""
        failed_pattern = event.source_pattern
        
        # Implement error recovery logic
        recovery_patterns = self.get_recovery_patterns(failed_pattern)
        
        for recovery_pattern in recovery_patterns:
            await self.start_pattern(recovery_pattern, {
                "recovery_mode": True,
                "failed_pattern": failed_pattern,
                "error_details": event.data
            })
    
    async def handle_threshold(self, event: WorkflowEvent):
        """Handle threshold reached event."""
        threshold_type = event.data.get("threshold")
        
        if threshold_type == "large_data":
            # Start parallel processing pattern for large data
            await self.start_pattern("parallel_processing", event.data)
    
    def get_next_patterns(self, completed_pattern: str) -> List[str]:
        """Get patterns that should start after completion."""
        pattern_dependencies = {
            "data_ingestion": ["data_validation", "initial_analysis"],
            "data_validation": ["data_processing"],
            "initial_analysis": ["detailed_analysis"],
            "data_processing": ["result_synthesis"],
            "detailed_analysis": ["result_synthesis"],
            "parallel_processing": ["result_aggregation"],
            "result_synthesis": ["final_reporting"],
            "result_aggregation": ["final_reporting"]
        }
        
        return pattern_dependencies.get(completed_pattern, [])
    
    def get_recovery_patterns(self, failed_pattern: str) -> List[str]:
        """Get recovery patterns for failed patterns."""
        recovery_map = {
            "data_ingestion": ["data_ingestion_retry"],
            "data_processing": ["alternative_processing"],
            "detailed_analysis": ["fallback_analysis"]
        }
        
        return recovery_map.get(failed_pattern, [])
    
    def all_patterns_complete(self) -> bool:
        """Check if all patterns are complete."""
        active_states = ["running", "pending"]
        return not any(state in active_states for state in self.pattern_states.values())
    
    async def check_triggers(self):
        """Check for external triggers that might start new patterns."""
        # This could check external systems, databases, APIs, etc.
        # For now, it's a placeholder for trigger logic
        pass

Best Practices for Pattern Composition

Next Steps