from mcp_agent.app import MCPApp
from mcp_agent.workflows.factory import (
AgentSpec,
create_evaluator_optimizer_llm,
create_parallel_llm,
create_router_llm,
)
app = MCPApp(name="composed_pattern")
# Cache long-lived components so we don't recreate them per request.
router = None
parallel_research = None
research_loop = None
@app.async_tool(name="answer_question")
async def answer(request: str) -> str:
global router, parallel_research, research_loop
async with app.run() as running_app:
ctx = running_app.context
if router is None:
router = await create_router_llm(
name="triage",
agents=[
AgentSpec(name="qa", instruction="Answer factual questions concisely."),
AgentSpec(
name="analysis",
instruction="Perform deep research with citations before answering.",
),
],
provider="openai",
context=ctx,
)
if parallel_research is None:
parallel_research = create_parallel_llm(
name="research_parallel",
fan_in=AgentSpec(
name="aggregator",
instruction="Blend researcher outputs into a single structured brief.",
),
fan_out=[
AgentSpec(
name="news",
instruction="Search recent press releases.",
server_names=["fetch"],
),
AgentSpec(
name="financials",
instruction="Lookup filings and key metrics.",
server_names=["fetch"],
),
],
context=ctx,
)
if research_loop is None:
research_loop = create_evaluator_optimizer_llm(
name="research_with_qc",
optimizer=parallel_research,
evaluator=AgentSpec(
name="editor",
instruction=(
"Score the brief from 1-5. Demand improvements if it lacks citations, "
"actionable insights, or policy compliance."
),
),
min_rating=4,
max_refinements=3,
context=ctx,
)
decision = await router.route(request, top_k=1)
top = decision[0]
if top.category == "agent" and top.result.name == "analysis":
return await research_loop.generate_str(request)
if top.category == "agent":
async with top.result:
return await top.result.generate_str(request)
# Fallback: let the router destination handle it directly
return await top.result.generate_str(request)