Manage Executions with KitaruClient and CLI
Inspect execution status, fetch runtime logs, resolve waits, and manage lifecycle actions
KitaruClient is the programmatic API for managing and inspecting executions
outside your flow functions.
KitaruClient and the CLI use your current Kitaru connection context. If you
want to inspect executions from a deployed Kitaru server, connect first with
kitaru login ... or provide KITARU_* connection variables in the current
environment.
Create a client
import kitaru
client = kitaru.KitaruClient()The client uses your current Kitaru connection/project context.
Inspect a single execution
execution = client.executions.get(exec_id)
print(execution.exec_id)
print(execution.flow_name)
print(execution.status) # running/waiting/completed/failed/cancelled
if execution.pending_wait:
print(execution.pending_wait.name, execution.pending_wait.question)
if execution.failure:
print(execution.failure.origin, execution.failure.message)Execution details include:
- start/end timestamps
- stack name
- summary metadata
- checkpoint calls
- pending wait details (
execution.pending_wait) - execution failure details (
execution.failure) when status isfailed - checkpoint retry/failure attempt history (
checkpoint.attempts) - artifacts
- frozen execution spec (when available)
List and query executions
recent = client.executions.list(limit=20)
completed_for_flow = client.executions.list(
flow="content_pipeline",
status="completed",
limit=10,
)
latest = client.executions.latest(flow="content_pipeline")Fetch runtime logs
entries = client.executions.logs(exec_id, checkpoint="write_draft", limit=100)
for entry in entries:
print(entry.timestamp, entry.level, entry.checkpoint_name, entry.message)Runtime log retrieval requires a server-backed connection. For CLI options, follow mode, grouped output, and retrieval caveats, see View Execution Runtime Logs.
Resolve wait input
On local interactive runs, the runtime prompts for input in the same terminal. For non-interactive or timed-out executions, resolve the pending wait externally:
execution = client.executions.input(
exec_id,
wait="approve_deploy",
value=True,
)If the execution does not continue automatically after input (e.g. the original
runner already exited), call resume(...):
execution = client.executions.resume(exec_id)Retry, replay, and cancel
# Same-execution retry (failed executions only)
retried = client.executions.retry(exec_id)
# Replay into a new execution from a checkpoint boundary
replayed = client.executions.replay(
exec_id,
from_="write_draft",
overrides={"checkpoint.research": "Edited notes"},
topic="New topic",
)
# Cancel a running execution
cancelled = client.executions.cancel(exec_id)Execution convenience methods
Execution objects returned by client.executions.get(...) also expose
convenience methods that call back into the same client:
execution = client.executions.get(exec_id)
fresh = execution.refresh() # re-fetch latest state
retried = execution.retry() # retry a failed execution
resumed = execution.resume() # resume after wait input
cancelled = execution.cancel() # cancel a running execution
replayed = execution.replay(from_="write_draft", overrides={...})
checkpoints = execution.list_checkpoints()
artifacts = execution.list_artifacts()These are equivalent to calling client.executions.retry(exec_id) etc. — they
return a new Execution snapshot rather than mutating the existing object.
Inspect or abort waits programmatically
List all pending wait conditions for an execution:
waits = client.executions.pending_waits(exec_id)
for w in waits:
print(w.name, w.question, w.schema)Abort a pending wait instead of continuing it:
execution = client.executions.abort_wait(exec_id, wait="approve_deploy")Browse and load artifacts
artifacts = client.artifacts.list(exec_id)
for artifact in artifacts:
print(artifact.name, artifact.kind, artifact.save_type)
context_artifact = client.artifacts.get(artifact_id)
value = context_artifact.load()You can also filter artifact lists:
client.artifacts.list(exec_id, kind="context")
client.artifacts.list(exec_id, producing_call="research")
client.artifacts.list(exec_id, name="research_context", limit=1)Manage executions from the CLI
# Inspect and filter executions
kitaru executions get kr-a8f3c2
kitaru executions get kr-a8f3c2 --output json
kitaru executions list
kitaru executions list --status waiting --flow content_pipeline --limit 20
kitaru executions list --status waiting --output json
kitaru executions logs kr-a8f3c2 --checkpoint write_draft
kitaru executions logs kr-a8f3c2 --output json
# Agent/script-friendly status and stack inspection
kitaru status --output json
kitaru stack list --output json
# Wait-input and lifecycle actions
kitaru executions input kr-a8f3c2 --value true
kitaru executions input kr-a8f3c2 --abort
kitaru executions input kr-a8f3c2 --interactive
kitaru executions input --interactive # sweep all waiting executions
kitaru executions resume kr-a8f3c2
kitaru executions replay kr-a8f3c2 --from write_draft --args '{"topic":"New topic"}' --overrides '{"checkpoint.research":"Edited notes"}'
kitaru executions retry kr-a8f3c2
kitaru executions cancel kr-a8f3c2Query executions through MCP
If you want assistant-native tooling (Claude Code, Cursor, etc.), install and run the MCP server:
pip install "kitaru[mcp]"
kitaru-mcpThen use tool calls like:
kitaru_executions_list(status="waiting")kitaru_executions_input(exec_id=..., wait=..., value=...)(MCP requires explicitwait)get_execution_logs(exec_id=...)kitaru_artifacts_get(artifact_id=...)kitaru_status()
If the execution does not continue automatically after wait input is resolved
(e.g. the original runner already exited), use the CLI or SDK resume(...) call.
MCP does not currently expose a separate resume tool.
See the full setup guide at MCP Server.
Try the examples
For the broader catalog, see Examples.
uv sync --extra local
uv run examples/execution_management/client_execution_management.py
uv run pytest tests/test_phase11_client_example.py
uv run examples/execution_management/wait_and_resume.py
uv run pytest tests/test_phase15_wait_example.py
uv run examples/replay/replay_with_overrides.py
uv run pytest tests/test_phase16_replay_example.py
uv sync --extra local --extra mcp
uv run examples/mcp/mcp_query_tools.py
uv run pytest tests/mcp/test_phase19_mcp_example.py