Kitaru
Adapters

OpenAI Agents Adapter

Wrap an OpenAI Agents SDK Agent with KitaruRunner so calls are durable and replayable inside Kitaru flows

Kitaru's OpenAI Agents adapter lets you keep your existing OpenAI Agents SDK agent logic while adding Kitaru durability around it.

from agents import Agent
from kitaru.adapters.openai_agents import KitaruRunner

agent = Agent(name="researcher", model=your_model)
runner = KitaruRunner(agent, checkpoint_strategy="runner_call")

The runtime default is checkpoint_strategy="calls" (per-call checkpoints — see below); pass "runner_call" whenever you want a single terminal checkpoint so flow.run(...).wait() returns the run result directly.

You run the agent through runner.run(...) or runner.run_sync(...) with an OpenAIRunRequest.

Install

uv add "kitaru[openai-agents,local]"

Then initialize/login as usual:

kitaru init
kitaru login
kitaru status

Minimal flow

from kitaru import flow
from kitaru.adapters.openai_agents import KitaruRunner, OpenAIRunRequest

runner = KitaruRunner(agent, checkpoint_strategy="runner_call")

@flow
def research(prompt: str) -> str:
    result = runner.run_sync(OpenAIRunRequest.start(prompt))
    return str(result.final_output)

Fresh-run context

OpenAI Agents SDK tools and guardrails often use a local application context: for example, "which team is this user in?", "which thread is this request part of?", or "which plugin settings are active?" Pass that object to Kitaru the same way you pass it to the OpenAI SDK: as a runner-call argument, not as part of the serializable OpenAIRunRequest.

from dataclasses import dataclass
from typing import Any

from agents import RunContextWrapper, function_tool
from kitaru.adapters.openai_agents import KitaruRunner, OpenAIRunRequest

@dataclass(frozen=True)
class WorkerContext:
    team_id: str
    user_id: str
    thread_id: str
    message_id: str
    tool_settings: dict[str, Any]

@function_tool
def lookup_customer(ctx: RunContextWrapper[WorkerContext], customer_id: str) -> str:
    # The context stays local to your Python process. The model only sees what
    # your tool chooses to return.
    return f"team={ctx.context.team_id}, customer={customer_id}"

runner = KitaruRunner(
    agent,
    context_cache_identity=lambda ctx: {
        "team_id": ctx.team_id,
        "user_id": ctx.user_id,
        "thread_id": ctx.thread_id,
        "tool_settings": ctx.tool_settings,
    },
)

result = runner.run_sync(
    OpenAIRunRequest.start("Look up customer 123"),
    context=WorkerContext(
        team_id="team_abc",
        user_id="user_123",
        thread_id="thread_456",
        message_id="msg_this_run_only",
        tool_settings={"include_private_notes": False},
    ),
)

A concrete way to think about this: the OpenAIRunRequest is the written travel plan Kitaru can save and replay. context= is the live badge the worker carries while doing the trip. Tools and guardrails can inspect the badge through RunContextWrapper.context, but Kitaru does not save that badge as a visible artifact or send it to the model automatically. Kitaru still uses the context identity internally for safe replay, without adding your raw context or projection to visible tool input artifacts.

Context does matter for safe replay. Imagine two teams both call lookup_customer(customer_id="123"). The visible tool arguments are identical, but team A and team B may be allowed to see different customer records. Kitaru therefore includes a context identity in adapter cache keys. If your context is plain data, Kitaru can derive a structural identity. For production contexts, prefer context_cache_identity= so you can include stable fields such as team, user, thread, project, plugin, and JSON-primitive tool_settings, while excluding per-run fields such as message_id, trace_id, or a changing document cursor. That keeps replay safe without making every new message miss the cache unnecessarily.

context= is different from metadata= on OpenAIRunRequest.start(...): metadata is Kitaru run/checkpoint metadata; context is local OpenAI Agents SDK runtime state for your tools, guardrails, handoffs, and hooks.

Fresh context is only for new kind="start" requests. Interrupted/resumed runs use the saved OpenAI RunState; context_serializer= and context_deserializer= on KitaruRunner remain the way to serialize and rebuild context that is already inside an interrupted SDK state. With checkpoint_strategy="calls", tool checkpoint cache keys use that restored SDK context identity when it is available, so an approved resumed tool call for team A does not accidentally reuse a cached tool result from team B. Kitaru uses only the derived cache key for that separation; it does not save the raw context or your context_cache_identity= projection in visible tool input artifacts.

One more boundary to remember: if your guardrail manually calls raw agents.Runner.run(...) for a nested evaluator, that nested call is not managed by Kitaru automatically. Wrap the nested evaluator with its own KitaruRunner if you need Kitaru checkpoints there too.

Checkpoint strategy choices

You choose how Kitaru places checkpoints with checkpoint_strategy=.

Kitaru places one checkpoint around the outer OpenAI Runner.run(...) call. That single checkpoint becomes the flow's terminal artifact, so flow.run(...).wait() returns the run result cleanly. The name "runner_call" is deliberately specific: it means Kitaru is wrapping the outer OpenAI runner call, not claiming to own every SDK-internal step.

Use this when you want one coarse replay boundary for the whole agent run, or whenever you want a clean Python value back from .wait().

checkpoint_strategy="calls" (default)

Kitaru catches supported model/tool calls individually as separate peer checkpoints under the flow.

Use this when you want finer replay units (for example: if call 6 fails, calls 1–5 can come from cache).

Because the per-call checkpoints are siblings under the flow with no single sink, flow.run(...).wait() cannot pick one as "the" return value and raises KitaruAmbiguousFlowResultError. The per-checkpoint artifacts are still fully visible in the Kitaru UI and retrievable via KitaruClient — the error message points at them. If you need a clean .wait() return value, switch to checkpoint_strategy="runner_call". Wrapping the runner.run_sync() call in your own @checkpoint is not a workaround here — the adapter guards against it and will raise, because per-call checkpoints cannot be nested inside another Kitaru checkpoint.

Structured outputs, guardrails, and nested agents

OpenAI Agents SDK structured outputs work through the adapter. If your agent is created with Agent(output_type=...), Kitaru preserves the SDK result object and its typed final_output in both supported strategies:

  • checkpoint_strategy="runner_call" records the outer runner call and returns the structured result from .wait() cleanly.
  • checkpoint_strategy="calls" records supported model and tool calls individually, while the SDK still produces the typed final output for your Python code.

For tool-input guardrails, use checkpoint_strategy="calls" when you need to see blocked tool attempts. In that strategy, Kitaru records a rejected tool attempt as an existing tool_call event with guardrail metadata before the tool function runs. It does not create a new event type, and it does not save a tool checkpoint for arguments that the guardrail rejected.

Privacy follows the capture policy here too. If save_input=False, Kitaru omits raw tool input artifacts and also redacts guardrail rejection messages and unexpected guardrail exception details from persisted event metadata, because those strings may repeat the user/tool input the guardrail just inspected. The event still shows that a guardrail blocked the call, which guardrail did it, and whether the behavior was reject_content, raise_exception, or an exception.

checkpoint_strategy="runner_call" still only sees the outer Runner.run(...) boundary. That is useful for a single durable result, but it cannot show each individual tool guardrail decision. Choose "calls" when per-tool guardrail observability matters.

One more boundary to remember: raw nested agents.Runner.run(...) calls remain outside Kitaru unless you wrap that evaluator agent with its own KitaruRunner. Raw nested agents are fine for quick ephemeral checks. If their inputs, outputs, or guardrail decisions need Kitaru observability, run them through KitaruRunner too.

Important guardrail

checkpoint_strategy="calls" must run from flow scope (not from inside another @checkpoint), because the adapter needs room to open inner checkpoints for model/tool calls.

Approval interruptions

The adapter preserves OpenAI Agents SDK runs that stop for human approval. In story form: the agent reaches a tool approval, the SDK returns an interrupted run, Kitaru stores the serialized run state, and your flow can turn that interruption into a normal durable kitaru.wait().

from kitaru import flow
from kitaru.adapters.openai_agents import (
    KitaruRunner,
    OpenAIRunRequest,
    wait_for_approval,
)

runner = KitaruRunner(agent, checkpoint_strategy="runner_call")

@flow
def publish_with_gate(prompt: str) -> str:
    result = runner.run_sync(OpenAIRunRequest.start(prompt))

    if result.status == "interrupted":
        resume_request = wait_for_approval(
            result,
            name="approve_openai_tool",
            timeout=600,
        )
        result = runner.run_sync(resume_request)

    return str(result.final_output)

wait_for_approval(...) asks Kitaru to wait for a boolean approval and then returns an OpenAIRunRequest.resume(...) object. Approving resumes the saved OpenAI run state; rejecting sends the SDK a rejection message. If you already collected the decision somewhere else, use build_resume_request(result, approve=True) or pass an explicit OpenAIApprovalDecision to OpenAIRunRequest.resume(...).

Keep this bridge at flow scope. If you put it inside a Kitaru checkpoint, the flow would be trying to pause from inside a step that is meant to finish or fail as one unit.

Capture and checkpoint configuration

KitaruRunner exposes the same two kinds of knobs most teams need in production:

  • Capture policy: what gets saved for observability.
  • Checkpoint policy: how retries and dashboard grouping apply to adapter-created checkpoints.
from kitaru.adapters.openai_agents import KitaruRunner, OpenAICapturePolicy

runner = KitaruRunner(
    agent,
    checkpoint_strategy="calls",
    capture=OpenAICapturePolicy(
        save_input=False,              # privacy: do not persist full user input
        save_final_output=True,
        save_run_state=True,           # needed for approval resume
        save_interruption_payloads=True,
        save_response_items=False,      # opt in only when you need raw SDK items
        save_usage=True,
    ),
    model_checkpoint_config={"retries": 2},
    tool_checkpoint_config={"retries": 1},
    tool_checkpoint_config_by_name={
        "charge_card": False,          # do not checkpoint side-effectful tool
        "search_docs": {"retries": 3},
    },
)

OpenAICapturePolicy defaults are designed for useful traces: child events, input, final output, run state, interruption payloads, usage, and OTel correlation are on; raw response items are off by default because they can be noisy.

Two privacy switches are worth calling out:

  • save_input=False keeps raw model/tool inputs out of artifacts and redacts tool-input guardrail messages or exception text that may contain those inputs.
  • save_interruption_payloads=False keeps approval interruption summaries usable for resume decisions — index, kind, tool name, call ID, and message when the SDK exposes them — but omits raw arguments and arguments_preview.

Checkpoint config accepts retries, type, and runtime. runtime="isolated" is rejected for adapter-managed checkpoints today because those synthetic checkpoint closures capture live OpenAI SDK objects; use inline runtime or omit runtime.

For interrupted OpenAI runs, the SDK stores its own RunState so the run can resume later. If that saved RunState contains context objects that are not JSON-serializable, pass context_serializer= and context_deserializer= to KitaruRunner. These hooks are for serializing resume state after an interruption; they do not control the fresh-run context= object you pass when starting a new run. By default strict_context=True, so Kitaru fails loudly instead of saving a resume state that cannot be reconstructed later.

Runnable example

This example uses the real OpenAI API (not a stub model), so set your key:

uv sync --extra local --extra openai-agents
export OPENAI_API_KEY='OPENAI_API_KEY_VALUE'
# default model in the example is gpt-5-nano
# optional override: any OpenAI model you have access to
# export OPENAI_AGENTS_MODEL='<another-openai-model>'
uv run examples/integrations/openai_agents_agent/openai_agents_adapter.py

End-to-end research bot example

For a larger example, run the OpenAI research bot:

cd examples/end_to_end/openai_research_bot
uv sync --extra local --extra openai-agents
uv run kitaru init
export OPENAI_API_KEY='OPENAI_API_KEY_VALUE'
uv run python research_bot.py "AI agent durability" --max-searches 2

The workflow keeps the original research-bot shape:

planner → submitted search fan-out → writer report

The planner and writer run at flow scope through KitaruRunner with checkpoint_strategy="runner_call". The planned searches fan out with run_search_item.submit(...), so each search is its own durable checkpoint. Ordinary Kitaru checkpoints publish stable dashboard artifacts such as the normalized research plan, search summaries, and final report.

The example also uses a local OpenAI Agents SDK @function_tool named search_web instead of the hosted WebSearchTool. The local tool calls the OpenAI Responses API with web_search, which makes the checkpoint trace clearer with the adapter's current public behavior.

Look for these artifacts in the Kitaru UI:

  • research_plan
  • search_summaries
  • durability_drill
  • final_report
  • research_report_metadata

To test the durable-retry story directly, set KITARU_RESEARCH_BOT_FAIL_AFTER_SEARCHES=1 before running the example. It will fail after the submitted searches complete. Unset the flag and run kitaru executions replay <EXECUTION_ID> --from durability_drill_gate; the replay should reuse the completed planner/search checkpoints and continue into the writer. retry tries to restart the same failed execution and may be unavailable on server-backed stacks after a run has concluded.

See also: Replay and overrides.

On this page