Flows
Define durable execution boundaries for your AI agent workflows
A flow is the outer durable boundary in Kitaru — the unit of work your platform invokes and the runner executes. Everything inside the flow is tracked at checkpoint boundaries: persisted outputs, retry, replay, resume, and wait. Your harness (Pydantic AI, LangGraph, Claude Agent SDK, raw Python) lives inside the checkpoints. Your platform sits in front of the flow's invocation API. See Harness, Runtime, Platform for the bigger picture.
The shape of a flow
The @flow boundary defines the durable execution. Checkpoints inside it are
the persisted replay boundaries; the flow body is ordinary Python that
orchestrates those checkpoints. Side effects in the flow body itself are not
automatically durable — put anything that needs to survive a crash, be replayed,
or be rolled back behind a @checkpoint.
The flow body is the orchestration layer. The checkpoints inside are the replay boundaries.
Defining a flow
Decorate your orchestration function with @flow:
from kitaru import flow
@flow
def my_agent(url: str) -> str:
data = fetch_data(url)
return process_data(data)The decorated function becomes a callable wrapper object. Inside the flow body, you compose checkpoints — the units of work whose outputs are persisted.
Running a flow
Use .run() to start an execution:
handle = my_agent.run(url="https://example.com")
print(handle.exec_id) # unique execution identifier
# ... do other work ...
result = handle.wait() # block until finished.run() submits the execution and immediately returns a FlowHandle. The flow
runs in the background while your code continues. Call handle.wait() when you
need the result.
For a synchronous one-liner, chain .wait():
result = my_agent.run(url="https://example.com").wait()To target a remote stack for one execution, pass stack=:
handle = my_agent.run(url="https://example.com", stack="production")Deploying and invoking flows
Use .run() when the current source process is starting a flow directly. Use
.deploy() when you want to save a reusable, versioned flow entrypoint that
other processes can invoke later.
# Producer side: create a reusable deployment version.
my_agent.deploy(url="https://example.com")
# Consumer side: start a new execution from the deployed default route.
handle = my_agent.invoke(url="https://example.com").invoke() is the remote invocation verb for deployed flows. If you do not pass
version= or tag=, it invokes the reserved default tag. To pin a specific
version, pass version=2; to route through a named tag, pass tag="stable".
See Deployments for auto-versioning, tag semantics, auth context, and worked producer/consumer examples.
FlowHandle
A FlowHandle is returned by .run(). It gives you
access to the running execution:
| Property / Method | What it does |
|---|---|
handle.exec_id | The unique execution identifier (a string you can store or log) |
handle.status | Current execution status (refreshed on each access) |
handle.wait() | Block until the execution finishes, then return the result |
handle.get() | Return the result immediately if finished, otherwise raise an error |
handle.get() does not wait. If the execution is still running, it raises
a KitaruStateError. Use handle.wait() when you want to block.
How errors surface
If the flow execution fails, handle.wait() raises a typed
KitaruExecutionError (or a more specific subclass) with the execution ID,
final status, and the failure origin attached:
import kitaru
try:
result = my_agent.run(url="bad-input").wait()
except kitaru.KitaruExecutionError as exc:
print(exc.exec_id, exc.status, exc.failure_origin)Runtime options
You can configure execution behavior at the decorator level (defaults) or
override per-run via .run():
from kitaru import flow
@flow(retries=2, cache=False)
def my_agent(url: str) -> str:
...
# Override at call time
handle = my_agent.run(url="https://example.com", retries=3, cache=True)| Option | Default | What it controls |
|---|---|---|
retries | 0 | Number of automatic retries on failure |
cache | True | Whether checkpointed outputs can be reused from previous runs. Set False to disable. |
stack | None | Target execution environment for this run (overrides the active stack default) |
image | None | Container image for remote execution (string, mapping, or settings object). Supports base_image, requirements, environment, apt_packages, replicate_local_python_environment, and dockerfile. |
Per-run values override decorator defaults. If you don't pass an override, the decorator default applies.
For stack, the full precedence chain is:
.run(..., stack="...")@flow(stack="...")kitaru.configure(stack="...")KITARU_STACK[tool.kitaru].stackinpyproject.toml- active stack selected via
kitaru stack use ...
When a higher layer supplies stack, Kitaru binds that stack only for the
submission of that execution and then restores the previous active stack.
That override does not permanently switch your default stack.
Run, then deploy
.run() starts an ad-hoc execution from the current process. It's the
right loop for iteration and for calls made from your own code.
For production, a flow is deployed: .deploy() captures an immutable
versioned snapshot that consumers invoke by flow name. Tags route traffic
between versions (default is the tag your platform normally targets), so you
can roll a new version out without changing the invocation surface. Auth is
workspace-scoped; there are no per-deployment tokens to rotate.
# Ad-hoc run from the current process
handle = my_agent.run(url="https://example.com")
# Deploy an immutable snapshot (new version each call).
# Parameterized flows take representative deployment-time inputs;
# consumers can override them at invocation time.
my_agent.deploy(url="https://example.com")
# Consumers that have the flow imported can invoke by attribute
handle = my_agent.invoke(url="https://example.com")
# Or invoke source-free by flow name (from Python, CLI, MCP, or HTTP)
from kitaru import KitaruClient
handle = KitaruClient().deployments.invoke(
flow="my_agent",
inputs={"url": "https://example.com"},
)Rules to know
- Flow functions should compose checkpoints. The flow body is the orchestration layer — heavy work belongs in checkpoints.
- Use
.run()to start flows directly from source. Direct calls (my_agent(...)) are not supported and raiseKitaruUsageError. Usemy_agent.run(...)for source-backed executions, or.invoke(...)for deployment-backed executions. - Retries must be non-negative. Passing a negative
retriesvalue raises aKitaruUsageError.
Next steps
- Learn how to break work into durable units with Checkpoints
- Attach structured data to your executions with Logging and Metadata