Skip to main content
Parallel workflow diagram

When to use it

  • You need several specialists to analyse the same request from different angles (e.g. proofread, fact-check, style review).
  • You want deterministic aggregation of multiple perspectives into a single response.
  • You have functions or agents that can run concurrently and do not depend on each other’s intermediate state.
  • You want a cheap way to blend deterministic helpers (regex, heuristics) with heavyweight LLM agents.
Common scenarios include content review with multiple rubrics, multi-source research where each worker hits a different MCP server, or evaluation workflows where you want a “best effort” vote across diverse models.

How it works

create_parallel_llm(...) returns a ParallelLLM composed of two primitives:
  • FanOut (fan_out.py) launches every worker concurrently. Workers can be AgentSpec, pre-built Agent/AugmentedLLM, or plain callables via fan_out_functions.
  • FanIn (fan_in.py) collects the responses and hands a FanInInput structure to your aggregator. The aggregator can also be an AgentSpec, an AugmentedLLM, or a deterministic function.
FanOut returns a dictionary keyed by worker name, so the aggregator always knows who produced which output. When the aggregator is an LLM, mcp-agent automatically enters the worker contexts, attaches LLMs, and tracks token usage per branch.

Quick start

from mcp_agent.app import MCPApp
from mcp_agent.workflows.factory import (
    AgentSpec,
    RequestParams,
    create_parallel_llm,
)

app = MCPApp(name="parallel_example")

async def main():
    async with app.run() as running_app:
        parallel = create_parallel_llm(
            name="content_review_parallel",
            fan_in=AgentSpec(
                name="grader",
                instruction=(
                    "Blend the reviewer feedback into a single report. "
                    "Call out disagreements and provide an overall score."
                ),
            ),
            fan_out=[
                AgentSpec(
                    name="proofreader",
                    instruction="Check grammar, spelling, and clarity.",
                ),
                AgentSpec(
                    name="fact_checker",
                    instruction="Verify factual claims using the fetch server.",
                    server_names=["fetch"],
                ),
                AgentSpec(
                    name="style_enforcer",
                    instruction="Compare against the company voice and style guide.",
                ),
            ],
            fan_out_functions=[
                lambda prompt: [
                    {
                        "title": "metadata",
                        "details": {
                            "characters": len(prompt),
                            "keywords": prompt.split()[:5],
                        },
                    }
                ]
            ],
            provider="openai",
            request_params=RequestParams(maxTokens=2000, temperature=0.2),
            context=running_app.context,
        )

        result = await parallel.generate_str("Review this customer email draft.")
        return result
create_parallel_llm accepts AgentSpec, already-instantiated Agent/AugmentedLLM, or plain Python callables. Every worker receives the same prompt; the aggregator receives a structured summary of all responses and returns either a list of CreateMessageResult objects or a single string, depending on whether you call generate or generate_str.

Working with the aggregator input

FanIn accepts several shapes (dicts or lists). For example, you can use a deterministic aggregator:
from mcp_agent.workflows.parallel.fan_in import FanInInput

def aggregate_as_markdown(messages: FanInInput) -> str:
    blocks = []
    for source, outputs in messages.items():
        lines = "\n".join(str(item) for item in outputs)
        blocks.append(f"### {source}\n{lines}")
    return "\n\n".join(blocks)

parallel = create_parallel_llm(
    fan_in=aggregate_as_markdown,
    fan_out=[AgentSpec(name="qa", instruction="Answer with citations.")],
    context=running_app.context,
)
When the aggregator is an agent/LLM, mcp-agent wraps the aggregated string in a system prompt that names each contributor, making it easy to ask for weighted votes, majority decisions, or summarised findings.

Operational tips

  • Throttle concurrency by setting executor.max_concurrent_activities in mcp_agent.config.yaml. The parallel workflow uses the shared executor, so the limit applies across your entire app.
  • Mix deterministic helpers via fan_out_functions for cheap signals (regex extractors, heuristics) alongside heavyweight LLM calls.
  • Inspect token/latency costs with await parallel.get_token_node()—each fan-out worker appears as a child node, making it easy to spot the expensive branch.
  • Handle stragglers by adjusting RequestParams.timeoutSeconds or adding retry logic inside the worker agents.
  • Reuse workers by instantiating AugmentedLLMs once and passing them directly into fan_out to avoid repeated attachment overhead.

Example projects

I