Decorators
MCPApp exposes a small set of decorators that register tools, workflows, and workflow tasks. The decorators are engine-aware: when you switch from the default asyncio executor to Temporal, the same annotations automatically apply the appropriate Temporal SDK wrappers.
| Decorator | Applies to | Purpose |
|---|
@app.tool | sync function | Exposes a blocking function as an MCP tool. |
@app.async_tool | async function | Registers an async tool and publishes it through FastMCP. |
@app.workflow | Workflow subclass | Declares a workflow class and adapts it for the active execution engine. |
@app.workflow_run | async method | Marks the primary run coroutine for a workflow. |
@app.workflow_task | async function/method | Registers an activity task reusable across workflows. |
@app.workflow_signal | async method | Handles inbound workflow signals (Temporal-friendly). |
Tools expose code as MCP functions that agents and LLMs can call. Both decorators accept the same keyword arguments:
| Parameter | Description |
|---|
name | Override the exported MCP tool name (defaults to the function name). |
title | Short display label for clients. |
description | Human-readable description; the function docstring is used when omitted. |
annotations | Supply a ToolAnnotations object or mapping for MCP metadata. |
icons | Provide one or more Icon instances or mappings for client rendering. |
meta | Arbitrary metadata forwarded to FastMCP. |
structured_output | Set to True to hint that the result is structured JSON; some LLMs will choose schema-aware models automatically. |
from pydantic import BaseModel
from mcp_agent.app import MCPApp
app = MCPApp(name="reporting")
class Summary(BaseModel):
title: str
verdict: str
@app.tool(description="Summarise a raw document into a headline and verdict.", structured_output=True)
def summarise_document(text: str) -> Summary:
"""Summarise content deterministically."""
title = text.splitlines()[0][:120]
verdict = "APPROVED" if "ship it" in text.lower() else "NEEDS REVIEW"
return Summary(title=title, verdict=verdict)
- The decorator validates the signature up-front; missing type hints or unsupported default values raise an error during import.
@app.tool automatically creates a hidden workflow so the tool is reachable via both callTool and the workflow endpoints (run, get_status) exposed by FastMCP.
- The function executes inside the app event loop; heavy work should offload itself (for example, using
asyncio.to_thread).
import httpx
@app.async_tool(
name="fetch_page",
description="Fetch raw HTML from an HTTP endpoint.",
icons=[{"emoji": "🌐"}],
)
async def fetch_page(url: str, *, timeout: int = 20) -> str:
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.get(url)
response.raise_for_status()
return response.text
- The coroutine is awaited directly, so you can call other async APIs without wrappers.
- When Temporal is active, the decorated function is wrapped with
workflow.activity metadata automatically.
Workflow decorator suite
Workflows orchestrate complex sequences, combining tasks, tools, and signals. Every workflow subclass must inherit from mcp_agent.executor.workflow.Workflow.
from datetime import timedelta
from mcp_agent.executor.workflow import Workflow, WorkflowResult
@app.workflow
class PublishArticle(Workflow[WorkflowResult[str]]):
@app.workflow_task(schedule_to_close_timeout=timedelta(minutes=5))
async def generate_outline(self, topic: str) -> str:
outline = f"- Introduction to {topic}\n- Key findings\n- Next steps"
self.context.logger.info("Generated outline", data={"topic": topic})
return outline
@app.workflow_task(retry_policy={"maximum_attempts": 3})
async def send_to_editor(self, outline: str) -> None:
self.context.logger.info("Sending to editor", data={"outline": outline})
@app.workflow_signal(name="editor_feedback")
async def editor_feedback(self, notes: str) -> None:
self.state.feedback = notes or ""
@app.workflow_run
async def run(self, topic: str) -> WorkflowResult[str]:
outline = await self.generate_outline(topic)
await self.send_to_editor(outline)
feedback = getattr(self.state, "feedback", "Awaiting review.")
return WorkflowResult(value=f"{outline}\n\nEditor feedback: {feedback}")
@app.workflow
- Registers the class with the app and applies engine-specific decorators (
workflow.defn for Temporal, no-op for asyncio).
- An optional
workflow_id parameter lets you export the workflow under a different name when registering.
- The decorator stores a reference to the
MCPApp, letting workflow instances access self.context.app.
@app.workflow_run
Wraps the run coroutine so that initialization, tracing, and Temporal-specific instrumentation are handled automatically. You rarely need to call it manually—applying @app.workflow and naming the method run is enough—but explicit usage lets you decorate additional entry points.
@app.workflow_task
Registers a coroutine as a reusable activity. Key options:
| Option | Effect |
|---|
name | Override the fully qualified activity name (defaults to <module>.<qualname>). |
schedule_to_close_timeout | Maximum wall-clock duration allowed for the task. |
retry_policy | Temporal retry configuration (e.g. {"maximum_attempts": 5}). |
**meta_kwargs | Arbitrary metadata stored alongside the task for inspection. |
The decorator enforces that the target is async; synchronous functions should wrap their blocking work with asyncio.to_thread.
Tasks defined outside a workflow are also supported—they are registered globally and can be reused across multiple workflows.
@app.workflow_signal
Signals let external actors (humans, webhooks, other workflows) nudge a running workflow. The decorator accepts an optional name argument and works in both asyncio and Temporal modes.
@app.workflow_signal(name="user_input")
async def collect_user_input(self, payload: str) -> None:
self.context.logger.info("Received payload", data={"payload": payload})
self.state.latest_payload = payload
The generated wrapper automatically strips the workflow instance (self) for Temporal’s signal handler signature.
All workflow decorators defer to the active executor. When you switch to Temporal, tasks become activities, run becomes a workflow entry point, and signals map to @workflow.signal—no additional changes required.
Putting it together
from mcp_agent.app import MCPApp
from mcp_agent.executor.workflow import Workflow
app = MCPApp(name="insight-generator")
@app.tool(description="List the MCP servers registered with this app.")
def list_servers() -> list[str]:
return sorted((app.config.mcp.servers or {}).keys())
@app.workflow
class ResearchWorkflow(Workflow[None]):
@app.workflow_task()
async def gather_sources(self, query: str) -> list[str]:
self.context.logger.info("Gathering sources", data={"query": query})
return [f"https://example.com/search?q={query}"]
@app.workflow_run
async def run(self, topic: str) -> None:
sources = await self.gather_sources(topic)
self.context.logger.info("Workflow completed", data={"topic": topic, "sources": sources})
This pattern gives you:
- A reusable
list_servers tool that exposes runtime metadata without boilerplate.
- A workflow that can run locally (asyncio) or durably (Temporal) with the same code.
- Optional signals/tasks to pause, resume, or branch as needed.
For CLI-driven workflows and deployment details, continue with the CLI reference. For configuration options that pair with these decorators, review the Configuration reference.