Skip to main content
Cloud deployments turn every tool decorator and workflow definition into a first-class MCP endpoint. This page explains how each decorator maps onto the managed runtime, how Temporal keeps work durable, and how to observe long-running runs in production. Temporal workflow execution timeline for an mcp-agent deployment

Three building blocks

DecoratorExecution modelWhen to useMCP exposure
@app.toolRuns inline inside the MCP server process. Caller blocks until the tool returns.“Quick” actions < O(seconds). Ideal for fan-out RPCs, data lookups, or deterministic helpers.Registered as <tool_name>
@app.async_toolStarts a Temporal workflow, returns {workflow_id, run_id} immediately. Caller polls for completion.Any async or long-running operation: multi-step plans, heavy LLM conversations, human-in-the-loop validations.<tool_name> (async). Helpers workflows-get_status, workflows-cancel, etc.
@app.workflow / @app.workflow_runExplicit workflow class. Useful for complex logic, reusability, or exposing multiple entrypoints.Multi-agent orchestration, routers, evaluator-optimizer loops, deep orchestrators.workflows-<ClassName>-run
All three share the same contextual features:
  • Access to context.server_registry, context.logger, and configured MCP servers.
  • agent.attach_llm(...) to work with Augmented LLMs.
  • Token counting when tracing is enabled.
  • Human input via await context.request_human_input(...).

Example: synchronous vs async tool

from mcp_agent.app import MCPApp
from mcp_agent.core.context import Context

app = MCPApp(name="reporting_agent")

# Fast helper – returns immediately
@app.tool(description="Fetch metrics for a repository")
async def get_metrics(repo: str, ctx: Context | None = None) -> dict:
    stats = await ctx.server_registry.call_tool(
        server_name="github", tool_name="get_repo_stats", arguments={"repo": repo}
    )
    return stats

# Long-running operation – durable workflow
@app.async_tool(description="Generate a weekly engineering report")
async def generate_weekly_report(team: str, ctx: Context | None = None) -> dict:
    agent = ctx.app.create_agent(  # convenience helper – see docs/mcp-agent-sdk
        name="report_writer",
        instruction="Compile GitHub + PagerDuty + Notion into a weekly summary.",
        server_names=["github", "notion", "pagerduty"],
    )
    async with agent:
        llm = await agent.attach_llm()
        summary = await llm.generate_str(
            f"Create a weekly incident & delivery report for {team}. "
            "Include stats from the connected MCP servers."
        )
    # The value is stored in Temporal history and surfaced via workflows-get_status
    return {"report": summary}
Callers experience:
# Synchronous tool – returns result payload immediately
mcp-agent cloud invoke <server> --tool get_metrics --json '{"repo": "lastmile-ai/mcp-agent"}'

# Async tool – returns IDs to poll
mcp-agent cloud invoke <server> --tool generate_weekly_report --json '{"team": "core"}'

# Later…
mcp-agent cloud workflows describe <server> run_9b43be2a

Workflow classes

For intricate flows you can define a workflow class with reusable steps, activities, and signals. This pattern gives you access to the full Temporal API (signals, queries, child workflows, timers).
from mcp_agent.executor.workflow import Workflow, WorkflowResult

@app.workflow
class MultiAgentReview(Workflow[str]):
    """Orchestrate planner, researcher, and writer agents."""

    @app.workflow_run
    async def run(self, topic: str) -> WorkflowResult[str]:
        plan = await self.plan(topic)
        research = await self.research(plan)
        draft = await self.write(research)
        return WorkflowResult(value=draft)

    @app.task
    async def plan(self, topic: str) -> list[str]:
        ...

    @app.task
    async def research(self, plan: list[str]) -> dict:
        ...

    @app.task
    async def write(self, research: dict) -> str:
        ...
Temporal executes each @app.task as an activity. Tasks can run in parallel, include retries/backoff, or call await self.context.request_human_input(...) to pause.

Monitoring and control

Use the workflow commands to introspect long-running operations:
# List definitions exposed by the app
mcp-agent cloud workflows list app_abc123

# List recent runs (optionally filter by status)
mcp-agent cloud workflows runs app_abc123 --status running --limit 5

# Inspect a specific run
mcp-agent cloud workflows describe app_abc123 run_cf98712

# Pause / resume with additional context
mcp-agent cloud workflows suspend app_abc123 run_cf98712
mcp-agent cloud workflows resume app_abc123 run_cf98712 \
  --payload '{"approved": true, "notes": "Ship it"}'

# Cancel if you need to stop work
mcp-agent cloud workflows cancel app_abc123 run_cf98712
Logs and traces remain available while the workflow executes:
  • mcp-agent cloud logger tail app_abc123 --follow
  • Configure OTEL exporters in mcp_agent.config.yaml or via mcp-agent cloud logger configure.
  • Temporal metadata (start time, attempt count, memo fields) is surfaced in workflows describe.

Best practices

Anything that might exceed the default request timeout for clients should be an async tool. Claude Desktop and Cursor expect quick responses; returning {run_id} lets them switch to a progress UI.
Use context.logger.info for status updates and context.signal_notification (custom signals) if you need to push progress to the caller. Future versions will surface these in the console.
await context.request_human_input(prompt="...") pauses the workflow and stores state in Temporal. Users resume via mcp-agent cloud workflows resume … --payload.
Attach lightweight metadata (WorkflowResult(metadata=...)) to make filtering easier (--status, custom reports). Memo values show up in workflows runs.
Set unique temporal.task_queue values per application to control worker placement and concurrency. For large deployments you can run additional workers using mcp-agent cloud app workers.

Further reading

I