Kitaru
Guides

PydanticAI Adapter

Make any PydanticAI agent replayable, resumable, and observable by wrapping it once with KitaruAgent

Kitaru's PydanticAI adapter makes any PydanticAI agent durable without changing its code: wrap the agent once with KitaruAgent, and every model request, tool call, MCP invocation, and human-in-the-loop wait is persisted under a Kitaru flow.

from pydantic_ai import Agent
from kitaru.adapters.pydantic_ai import KitaruAgent

agent = Agent("openai:gpt-4o", name="researcher")
durable_agent = KitaruAgent(agent)

result = durable_agent.run_sync("Summarize quantum error correction.")
print(result.output)

No flow decorator, no checkpoint annotations. When called outside a flow, KitaruAgent auto-opens one for you. By default, model, tool, and MCP calls are persisted as separate granular checkpoints. The dashboard shows the run, tool calls, model responses, and wait points.

Install

Add the pydantic-ai extra — and local if you want the local dashboard:

uv add "kitaru[pydantic-ai,local]"

Initialize the project once:

kitaru init
kitaru login        # local server; add a URL to connect to a deployed one
kitaru status

Usage patterns

Zero-config

Wrap the agent and call it directly. The adapter auto-opens a flow and granular per-call checkpoints when you're outside of one.

from pydantic_ai import Agent
from kitaru.adapters.pydantic_ai import KitaruAgent

agent = Agent("openai:gpt-4o", name="researcher")
durable_agent = KitaruAgent(agent)

result = durable_agent.run_sync("What are the open questions in QEC?")

Best for prototyping, porting an existing agent, or single-turn interactions.

Auto-flow is local-only. On remote stacks (Kubernetes, Vertex, SageMaker, AzureML) the in-process registry the adapter uses to stitch the auto-flow isn't visible — wrap the call in an explicit @kitaru.flow there.

Explicit boundaries

For multi-turn workflows, named replay boundaries, or coordinated waits across turns, use @kitaru.flow and @kitaru.checkpoint yourself. Inside a checkpoint, KitaruAgent is a straight passthrough.

Checkpoint an agent turn

This example shows explicit flow/checkpoint boundaries. Human approval waits are covered in the next sections.

import kitaru
from pydantic_ai import Agent
from kitaru.adapters.pydantic_ai import KitaruAgent

agent = Agent("openai:gpt-4o", name="researcher")
durable_agent = KitaruAgent(agent)

@kitaru.checkpoint
def ask(prompt: str) -> str:
    return durable_agent.run_sync(prompt).output

@kitaru.flow
def research(topic: str) -> str:
    overview = ask(f"Overview of {topic}")
    return ask(f"Open questions, given this overview:\n{overview}")

handle = research.run("quantum error correction")
print(handle.wait())

Replay the flow with the original run ID to serve cached outputs for completed checkpoints and re-execute only what changed. See Replay and overrides.

Ask the human from a tool body

kp.wait_for_input(...) is a thin adapter-namespaced wrapper around kitaru.wait(...). The LLM can pick the question, the tool can return the human's typed answer, and the agent can continue with that value as the tool result — but the wait still has to be created at flow scope.

That detail matters in default granular mode. A regular tool body usually runs inside an adapter-created *_tool checkpoint, and kitaru.wait() is intentionally rejected from checkpoint scope. If a regular tool body needs to call kp.wait_for_input(...), opt that tool out of granular checkpointing:

from typing import Literal
from pydantic import BaseModel
from kitaru.adapters import pydantic_ai as kp

class BugReport(BaseModel):
    title: str
    description: str
    severity: Literal["low", "medium", "high"]

def ask_user(question: str) -> str:
    """Ask the human a free-form clarifying question."""
    return kp.wait_for_input(schema=str, question=question)

def collect_bug_report() -> BugReport:
    """Collect a structured bug report."""
    return kp.wait_for_input(
        schema=BugReport,
        question="Describe the bug: title, description, severity.",
    )

Then construct the durable agent with a per-tool checkpoint opt-out:

durable_agent = KitaruAgent(
    agent,
    tool_checkpoint_config_by_name={"ask_user": False, "collect_bug_report": False},
)

Both question and schema are ordinary arguments, so the tool body can compute them, branch on agent state, prepend context, or call multiple waits. The adapter attaches identifying metadata (adapter=pydantic_ai, source=tool_body) so these waits are distinguishable from hand-written kitaru.wait() calls in flow code.

If you do not opt the tool out, the adapter fails early with an actionable KitaruUsageError rather than creating a checkpoint-contained wait that would be hard to resume safely. Another safe option is to move the human gate out of the tool body and call kitaru.wait() directly before or after the agent turn in your @kitaru.flow code.

Running locally, Kitaru prompts in the terminal. Running against a deployed server, the execution pauses and can be resumed from anywhere:

kitaru executions input <exec_id> --value '{"title": "login broken", "description": "500 on /auth", "severity": "high"}'
kitaru executions resume <exec_id>

Declarative sugar: @hitl_tool

When a tool is purely a wait — nothing computed in the body, no branching — the @hitl_tool decorator is a compact alternative. The body is skipped entirely.

from kitaru.adapters.pydantic_ai import hitl_tool

@hitl_tool(question="Approve publish?", schema=bool)
def approve(summary: str) -> bool: ...

@hitl_tool(schema=str)
def ask_user(question: str) -> str: ...   # LLM-supplied question via `question_arg`

@hitl_tool(
    name="collect_bug_report",
    question="Describe the bug: title, description, severity.",
    schema=BugReport,
)
def collect_bug_report() -> BugReport: ...

@hitl_tool(schema=..., question_arg=...) picks up the LLM-supplied argument at runtime (defaults to looking for question). Pass question_arg=None to force the static prompt.

Replay semantics

Waits belong at flow scope. kitaru.wait() is rejected inside @checkpoint bodies because the flow can pause while the enclosing checkpoint step is recorded as failed or incomplete. The default granular mode splits each top-level model, tool, and MCP call into its own checkpoint, which improves visibility and retry isolation, but it also means regular tool-body waits need an explicit per-tool opt-out as shown above.

Runtime behavior and guardrails

Human-in-the-loop tools

The adapter bridges every PydanticAI deferred pattern into kitaru.wait(). A paused flow is visible from kitaru executions list, the dashboard, and the REST API; once input is supplied the flow resumes from the exact same point.

from kitaru.adapters.pydantic_ai import hitl_tool

@hitl_tool(question="Approve publishing this brief?", schema=bool)
def publish_brief(headline: str, sources: list[str]) -> str:
    return f"published: {headline} ({len(sources)} sources)"

Other PydanticAI deferred patterns also route through kitaru.wait() when they run at flow scope:

  • @agent.tool(requires_approval=True) — PydanticAI's native approval flag
  • raising pydantic_ai.exceptions.ApprovalRequired or CallDeferred from a tool body
  • calling kp.wait_for_input(...) from a tool body

In granular mode, @hitl_tool stays flow-scope safe because the adapter deliberately skips the synthetic *_tool checkpoint for that call. Regular tools that use native approval/deferred exceptions or wait_for_input() must either opt out with tool_checkpoint_config_by_name={"tool_name": False} or move the wait to explicit flow code.

See Wait, Input, and Resume for how paused flows are resolved.

MCP servers

MCP servers attached to the agent are wrapped automatically. Their tool calls are tracked alongside native tools; in the default granular mode, each top-level MCP call gets its own adapter checkpoint. MCPServer.cache_tools=True is honored to skip redundant tools/list round-trips on replay.

from pydantic_ai import Agent
from pydantic_ai.mcp import MCPServerStdio
from kitaru.adapters.pydantic_ai import KitaruAgent

server = MCPServerStdio(
    "npx",
    args=["-y", "@modelcontextprotocol/server-filesystem", "/tmp"],
    cache_tools=True,
)
agent = Agent("openai:gpt-4o", name="researcher", toolsets=[server])
durable_agent = KitaruAgent(agent)

Checkpoint modes

The adapter offers two strategies for how agent work maps onto Kitaru checkpoints. Pick per agent based on how you want to replay and retry.

ModeHow it mapsReplay unitBest for
Granular (default)No turn checkpoint; each model/tool/MCP call becomes its own checkpointPer callExpensive model calls, flaky tools, long tool-call chains where one failure shouldn't rewind everything
Turn (granular_checkpoints=False)One checkpoint per agent run; model/tool/MCP calls are child eventsThe full turnAgents where one aggregated checkpoint and checkpoint artifacts like event_log / run_summary are more useful than per-call boundaries

Replay semantics in one sentence. If a flow crashes on the 8th model call of a turn, turn mode re-runs the whole turn; granular mode gives the earlier calls their own completed checkpoint boundaries. If you set cache=True on adapter-created checkpoint configs, repeated runs can reuse completed checkpoints when the logical inputs are the same; changed prompts, message history, model settings, tool arguments, tool call IDs, or behavior-changing run options produce different cache keys and should miss cache.

Granular mode is the default. granular_checkpoints=True is shown here for clarity when setting per-call checkpoint configs:

durable_agent = KitaruAgent(
    agent,
    granular_checkpoints=True,
    model_checkpoint_config={"retries": 3, "cache": True},
    tool_checkpoint_config={"retries": 2, "cache": True},
    tool_checkpoint_config_by_name={
        "lookup_price": {"retries": 5, "cache": True},   # flaky external API
        "fetch_secret": False,             # never checkpoint this tool
    },
    mcp_checkpoint_config={"retries": 3},
)

Each config is a CheckpointConfig TypedDict accepting:

  • cache: bool | None — passed through to @kitaru.checkpoint(cache=...). Use True to opt adapter-created checkpoints into step caching, False to disable caching for that boundary, or omit it / use None to inherit the stack default.
  • runtime: "inline" — run in-process. runtime="isolated" is not yet supported on adapter-managed checkpoints and raises KitaruUsageError.
  • retries: int — auto-retry the call on failure.
  • type: str — dashboard grouping. Defaults to "llm_call", "tool_call", or "mcp_call" so adapter checkpoints group with native kitaru.llm() / @kitaru.checkpoint(type="tool_call") calls.

The turn checkpoint itself is configured via turn_checkpoint_config= in turn mode. To opt into the previous one-checkpoint-per-run behavior, pass granular_checkpoints=False:

durable_agent = KitaruAgent(agent, granular_checkpoints=False)

Streaming exception. Granular mode cannot apply to streamed turns — per-call checkpointing around an async context manager would require draining and replaying the stream inside a sync checkpoint. When an event_stream_handler is supplied, KitaruAgent transparently falls back to a turn checkpoint for that call. That fallback disables turn-checkpoint caching for the call, because a cached final result would skip the handler's progress side effects. run_stream() and iter() always require an explicit @kitaru.checkpoint.

Streaming

Two patterns are supported:

  • event_stream_handler (recommended) — pass a handler to run() / run_sync() for live progress updates; auto-flow and auto-checkpoint still work.
  • run_stream() / iter() — these return context managers and cannot auto-open a checkpoint. Wrap them in an explicit @kitaru.checkpoint yourself.

Stream transcripts are persisted as artifacts when CapturePolicy.save_stream_transcripts=True (the default).

Capture policy

CapturePolicy controls what the adapter stores per run. Defaults favor full observability. Wait records always keep minimal routing metadata (adapter, tool_name, tool_call_id), but tool args and exception payloads are only stored in wait metadata when tool_capture="full".

OptionDefaultDescription
emit_child_eventsTrueTrack per-request / per-tool events. False disables tool-wait correlation.
save_promptsTruePersist prompts sent to the model.
save_responsesTruePersist final model responses.
save_stream_transcriptsTruePersist serialized stream events + final response.
tool_capture"full""full" (args + result), "metadata" (timing only), or None (skip entirely).
tool_capture_overrides{}Per-tool overrides keyed by tool name.
correlate_otel_spansTrueAttach Kitaru event IDs to the current OTel span.
from kitaru.adapters.pydantic_ai import CapturePolicy, KitaruAgent

durable_agent = KitaruAgent(
    agent,
    capture=CapturePolicy(
        save_prompts=False,                               # privacy
        save_stream_transcripts=False,                    # cost
        tool_capture="metadata",                          # default for all tools
        tool_capture_overrides={"fetch_secret": None},    # never capture this tool
    ),
)

Capture policy is observability-only — it never changes tool execution.

Message history

Pass message_history explicitly like any PydanticAI agent, or let the adapter thread it for you:

durable_agent = KitaruAgent(agent, persist_message_history=True)

durable_agent.run_sync("Hi, I am Alice.")
durable_agent.run_sync("What's my name?")  # sees the prior turn automatically

With persist_message_history=True the adapter remembers result.all_messages() on the instance after each run and auto-injects it as message_history on the next call when the caller doesn't pass one. One KitaruAgent instance = one conversation — create separate instances for separate conversations. An explicit message_history= on a single call overrides the remembered history for that call only.

Limits. History lives on the Python instance: a restart, new process, or replay of a prior flow starts with no history. The list grows unbounded — apply your own truncation or summarization for long-lived conversations. Concurrent run / run_sync calls on the same instance race on the stored history; use one instance per conversation. For durable conversation state, persist result.all_messages() yourself via kitaru.memory.

Constraints

  • Concrete model at construction time. The wrapped agent must have a bound Model — late model binding and per-run model= overrides are not supported. To use a different model, wrap a different agent.
  • Stable agent name. name= is required; the adapter uses it for artifact keys and auto-created flow/checkpoint names. Changing it orphans existing executions.
  • No nested checkpoints. Kitaru forbids opening a checkpoint inside another, so granular mode cannot coexist with an enclosing turn checkpoint — the adapter runs the agent body inline at flow scope when granular_checkpoints=True.

Advanced composition

Most users only need KitaruAgent. For custom durable surfaces, the lower-level wrappers are exported:

  • KitaruModel — wrap a PydanticAI Model directly.
  • KitaruToolset / KitaruFunctionToolset / KitaruMCPServer — wrap toolsets or MCP servers independently.
  • kitaruify_toolset(toolset, capture=..., ...) — dispatch helper that picks the right wrapper class.
  • KitaruRunContextRunContext subclass that survives isolated-runtime serialization boundaries.

Troubleshooting

  • "KitaruAgent requires the wrapped agent to define a concrete model" — pass model= to the Agent() constructor, not to run().
  • "requires an explicit @kitaru.checkpoint"run_stream() and iter() return context managers; wrap them in a checkpoint yourself.
  • Auto-flow fails on a remote stack — the in-process registry doesn't cross process boundaries. Use @kitaru.flow explicitly.
  • Too many per-call checkpoints — pass granular_checkpoints=False to group a whole agent run into one turn checkpoint.
  • Replay cost control — granular mode gives per-call checkpoint boundaries, not a billing guarantee. Pair it with provider-side caching or idempotency for expensive calls.
  • Checkpoints not appearing in the dashboard — verify kitaru status shows a running server and that kitaru init has been run in the project root.

Runnable example

uv sync --extra local --extra pydantic-ai
uv run examples/integrations/pydantic_ai_agent/pydantic_ai_adapter.py

The example prints the execution ID, final result, child-event count, and run-summary count. For the broader catalog, see Examples.

On this page