OpenAI Agents Adapter
Wrap an OpenAI Agents SDK Agent with KitaruRunner so calls are durable and replayable inside Kitaru flows
Kitaru's OpenAI Agents adapter lets you keep your existing OpenAI Agents SDK agent logic while adding Kitaru durability around it.
from agents import Agent
from kitaru.adapters.openai_agents import KitaruRunner
agent = Agent(name="researcher", model=your_model)
runner = KitaruRunner(agent, checkpoint_strategy="runner_call")The runtime default is checkpoint_strategy="calls" (per-call checkpoints —
see below); pass "runner_call" whenever you want a single terminal
checkpoint so flow.run(...).wait() returns the run result directly.
You run the agent through runner.run(...) or runner.run_sync(...) with an
OpenAIRunRequest.
Install
uv add "kitaru[openai-agents,local]"Then initialize/login as usual:
kitaru init
kitaru login
kitaru statusMinimal flow
from kitaru import flow
from kitaru.adapters.openai_agents import KitaruRunner, OpenAIRunRequest
runner = KitaruRunner(agent, checkpoint_strategy="runner_call")
@flow
def research(prompt: str) -> str:
result = runner.run_sync(OpenAIRunRequest.start(prompt))
return str(result.final_output)Fresh-run context
OpenAI Agents SDK tools and guardrails often use a local application context:
for example, "which team is this user in?", "which thread is this request part
of?", or "which plugin settings are active?" Pass that object to Kitaru the same
way you pass it to the OpenAI SDK: as a runner-call argument, not as part of the
serializable OpenAIRunRequest.
from dataclasses import dataclass
from typing import Any
from agents import RunContextWrapper, function_tool
from kitaru.adapters.openai_agents import KitaruRunner, OpenAIRunRequest
@dataclass(frozen=True)
class WorkerContext:
team_id: str
user_id: str
thread_id: str
message_id: str
tool_settings: dict[str, Any]
@function_tool
def lookup_customer(ctx: RunContextWrapper[WorkerContext], customer_id: str) -> str:
# The context stays local to your Python process. The model only sees what
# your tool chooses to return.
return f"team={ctx.context.team_id}, customer={customer_id}"
runner = KitaruRunner(
agent,
context_cache_identity=lambda ctx: {
"team_id": ctx.team_id,
"user_id": ctx.user_id,
"thread_id": ctx.thread_id,
"tool_settings": ctx.tool_settings,
},
)
result = runner.run_sync(
OpenAIRunRequest.start("Look up customer 123"),
context=WorkerContext(
team_id="team_abc",
user_id="user_123",
thread_id="thread_456",
message_id="msg_this_run_only",
tool_settings={"include_private_notes": False},
),
)A concrete way to think about this: the OpenAIRunRequest is the written travel
plan Kitaru can save and replay. context= is the live badge the worker carries
while doing the trip. Tools and guardrails can inspect the badge through
RunContextWrapper.context, but Kitaru does not save that badge as a visible
artifact or send it to the model automatically. Kitaru still uses the context
identity internally for safe replay, without adding your raw context or projection
to visible tool input artifacts.
Context does matter for safe replay. Imagine two teams both call
lookup_customer(customer_id="123"). The visible tool arguments are identical,
but team A and team B may be allowed to see different customer records. Kitaru
therefore includes a context identity in adapter cache keys. If your context is
plain data, Kitaru can derive a structural identity. For production contexts,
prefer context_cache_identity= so you can include stable fields such as team,
user, thread, project, plugin, and JSON-primitive tool_settings, while
excluding per-run fields such as message_id, trace_id, or a changing
document cursor. That keeps replay safe without making every new message miss
the cache unnecessarily.
context= is different from metadata= on OpenAIRunRequest.start(...):
metadata is Kitaru run/checkpoint metadata; context is local OpenAI Agents SDK
runtime state for your tools, guardrails, handoffs, and hooks.
Fresh context is only for new kind="start" requests. Interrupted/resumed runs
use the saved OpenAI RunState; context_serializer= and
context_deserializer= on KitaruRunner remain the way to serialize and rebuild
context that is already inside an interrupted SDK state. With
checkpoint_strategy="calls", tool checkpoint cache keys use that restored SDK
context identity when it is available, so an approved resumed tool call for team
A does not accidentally reuse a cached tool result from team B. Kitaru uses only
the derived cache key for that separation; it does not save the raw context or
your context_cache_identity= projection in visible tool input artifacts.
One more boundary to remember: if your guardrail manually calls raw
agents.Runner.run(...) for a nested evaluator, that nested call is not managed
by Kitaru automatically. Wrap the nested evaluator with its own KitaruRunner if
you need Kitaru checkpoints there too.
Checkpoint strategy choices
You choose how Kitaru places checkpoints with checkpoint_strategy=.
checkpoint_strategy="runner_call" (recommended for .wait())
Kitaru places one checkpoint around the outer OpenAI Runner.run(...) call. That
single checkpoint becomes the flow's terminal artifact, so
flow.run(...).wait() returns the run result cleanly. The name
"runner_call" is deliberately specific: it means Kitaru is wrapping the outer
OpenAI runner call, not claiming to own every SDK-internal step.
Use this when you want one coarse replay boundary for the whole agent run, or
whenever you want a clean Python value back from .wait().
checkpoint_strategy="calls" (default)
Kitaru catches supported model/tool calls individually as separate peer checkpoints under the flow.
Use this when you want finer replay units (for example: if call 6 fails, calls 1–5 can come from cache).
Because the per-call checkpoints are siblings under the flow with no single
sink, flow.run(...).wait() cannot pick one as "the" return value and raises
KitaruAmbiguousFlowResultError. The per-checkpoint artifacts are still
fully visible in the Kitaru UI and retrievable via KitaruClient — the
error message points at them. If you need a clean .wait() return value,
switch to checkpoint_strategy="runner_call". Wrapping the runner.run_sync()
call in your own @checkpoint is not a workaround here — the adapter
guards against it and will raise, because per-call checkpoints cannot be
nested inside another Kitaru checkpoint.
Structured outputs, guardrails, and nested agents
OpenAI Agents SDK structured outputs work through the adapter. If your agent is
created with Agent(output_type=...), Kitaru preserves the SDK result object and
its typed final_output in both supported strategies:
checkpoint_strategy="runner_call"records the outer runner call and returns the structured result from.wait()cleanly.checkpoint_strategy="calls"records supported model and tool calls individually, while the SDK still produces the typed final output for your Python code.
For tool-input guardrails, use checkpoint_strategy="calls" when you need to
see blocked tool attempts. In that strategy, Kitaru records a rejected tool
attempt as an existing tool_call event with guardrail metadata before the tool
function runs. It does not create a new event type, and it does not save a tool
checkpoint for arguments that the guardrail rejected.
Privacy follows the capture policy here too. If save_input=False, Kitaru omits
raw tool input artifacts and also redacts guardrail rejection messages and
unexpected guardrail exception details from persisted event metadata, because
those strings may repeat the user/tool input the guardrail just inspected. The
event still shows that a guardrail blocked the call, which guardrail did it, and
whether the behavior was reject_content, raise_exception, or an exception.
checkpoint_strategy="runner_call" still only sees the outer Runner.run(...)
boundary. That is useful for a single durable result, but it cannot show each
individual tool guardrail decision. Choose "calls" when per-tool guardrail
observability matters.
One more boundary to remember: raw nested agents.Runner.run(...) calls remain
outside Kitaru unless you wrap that evaluator agent with its own KitaruRunner.
Raw nested agents are fine for quick ephemeral checks. If their inputs, outputs,
or guardrail decisions need Kitaru observability, run them through
KitaruRunner too.
Important guardrail
checkpoint_strategy="calls" must run from flow scope (not from inside another
@checkpoint), because the adapter needs room to open inner checkpoints for
model/tool calls.
Approval interruptions
The adapter preserves OpenAI Agents SDK runs that stop for human approval. In story form: the agent reaches a tool approval, the SDK returns an interrupted run, Kitaru stores the serialized run state, and your flow can turn that interruption into a normal durable kitaru.wait().
from kitaru import flow
from kitaru.adapters.openai_agents import (
KitaruRunner,
OpenAIRunRequest,
wait_for_approval,
)
runner = KitaruRunner(agent, checkpoint_strategy="runner_call")
@flow
def publish_with_gate(prompt: str) -> str:
result = runner.run_sync(OpenAIRunRequest.start(prompt))
if result.status == "interrupted":
resume_request = wait_for_approval(
result,
name="approve_openai_tool",
timeout=600,
)
result = runner.run_sync(resume_request)
return str(result.final_output)wait_for_approval(...) asks Kitaru to wait for a boolean approval and then returns an OpenAIRunRequest.resume(...) object. Approving resumes the saved OpenAI run state; rejecting sends the SDK a rejection message. If you already collected the decision somewhere else, use build_resume_request(result, approve=True) or pass an explicit OpenAIApprovalDecision to OpenAIRunRequest.resume(...).
Keep this bridge at flow scope. If you put it inside a Kitaru checkpoint, the flow would be trying to pause from inside a step that is meant to finish or fail as one unit.
Capture and checkpoint configuration
KitaruRunner exposes the same two kinds of knobs most teams need in production:
- Capture policy: what gets saved for observability.
- Checkpoint policy: how retries and dashboard grouping apply to adapter-created checkpoints.
from kitaru.adapters.openai_agents import KitaruRunner, OpenAICapturePolicy
runner = KitaruRunner(
agent,
checkpoint_strategy="calls",
capture=OpenAICapturePolicy(
save_input=False, # privacy: do not persist full user input
save_final_output=True,
save_run_state=True, # needed for approval resume
save_interruption_payloads=True,
save_response_items=False, # opt in only when you need raw SDK items
save_usage=True,
),
model_checkpoint_config={"retries": 2},
tool_checkpoint_config={"retries": 1},
tool_checkpoint_config_by_name={
"charge_card": False, # do not checkpoint side-effectful tool
"search_docs": {"retries": 3},
},
)OpenAICapturePolicy defaults are designed for useful traces: child events, input, final output, run state, interruption payloads, usage, and OTel correlation are on; raw response items are off by default because they can be noisy.
Two privacy switches are worth calling out:
save_input=Falsekeeps raw model/tool inputs out of artifacts and redacts tool-input guardrail messages or exception text that may contain those inputs.save_interruption_payloads=Falsekeeps approval interruption summaries usable for resume decisions — index, kind, tool name, call ID, and message when the SDK exposes them — but omits rawargumentsandarguments_preview.
Checkpoint config accepts retries, type, and runtime. runtime="isolated" is rejected for adapter-managed checkpoints today because those synthetic checkpoint closures capture live OpenAI SDK objects; use inline runtime or omit runtime.
For interrupted OpenAI runs, the SDK stores its own RunState so the run can resume later. If that saved RunState contains context objects that are not JSON-serializable, pass context_serializer= and context_deserializer= to KitaruRunner. These hooks are for serializing resume state after an interruption; they do not control the fresh-run context= object you pass when starting a new run. By default strict_context=True, so Kitaru fails loudly instead of saving a resume state that cannot be reconstructed later.
Runnable example
This example uses the real OpenAI API (not a stub model), so set your key:
uv sync --extra local --extra openai-agents
export OPENAI_API_KEY='OPENAI_API_KEY_VALUE'
# default model in the example is gpt-5-nano
# optional override: any OpenAI model you have access to
# export OPENAI_AGENTS_MODEL='<another-openai-model>'
uv run examples/integrations/openai_agents_agent/openai_agents_adapter.pyEnd-to-end research bot example
For a larger example, run the OpenAI research bot:
cd examples/end_to_end/openai_research_bot
uv sync --extra local --extra openai-agents
uv run kitaru init
export OPENAI_API_KEY='OPENAI_API_KEY_VALUE'
uv run python research_bot.py "AI agent durability" --max-searches 2The workflow keeps the original research-bot shape:
planner → submitted search fan-out → writer reportThe planner and writer run at flow scope through KitaruRunner with
checkpoint_strategy="runner_call". The planned searches fan out with
run_search_item.submit(...), so each search is its own durable checkpoint.
Ordinary Kitaru checkpoints publish stable dashboard artifacts such as the
normalized research plan, search summaries, and final report.
The example also uses a local OpenAI Agents SDK @function_tool named
search_web instead of the hosted WebSearchTool. The local tool calls the
OpenAI Responses API with web_search, which makes the checkpoint trace clearer
with the adapter's current public behavior.
Look for these artifacts in the Kitaru UI:
research_plansearch_summariesdurability_drillfinal_reportresearch_report_metadata
To test the durable-retry story directly, set
KITARU_RESEARCH_BOT_FAIL_AFTER_SEARCHES=1 before running the example. It will
fail after the submitted searches complete. Unset the flag and run
kitaru executions replay <EXECUTION_ID> --from durability_drill_gate; the
replay should reuse the completed planner/search checkpoints and continue into
the writer. retry tries to restart the same failed execution and may be
unavailable on server-backed stacks after a run has concluded.
See also: Replay and overrides.