Flows
Define durable execution boundaries for your AI agent workflows
A flow is the outer durable boundary in Kitaru. It marks the top-level function whose execution becomes persistent, replayable, and observable.
Defining a flow
Decorate your orchestration function with @flow:
from kitaru import flow
@flow
def my_agent(url: str) -> str:
data = fetch_data(url)
return process_data(data)The decorated function becomes a callable wrapper object. Inside the flow body, you compose checkpoints — the units of work whose outputs are persisted.
Running a flow
Use .run() to start an execution:
handle = my_agent.run(url="https://example.com")
print(handle.exec_id) # unique execution identifier
# ... do other work ...
result = handle.wait() # block until finished.run() submits the execution and immediately returns a FlowHandle. The flow
runs in the background while your code continues. Call handle.wait() when you
need the result.
For a synchronous one-liner, chain .wait():
result = my_agent.run(url="https://example.com").wait()To target a remote stack for one execution, pass stack=:
handle = my_agent.run(url="https://example.com", stack="production")FlowHandle
A FlowHandle is returned by .run(). It gives you
access to the running execution:
| Property / Method | What it does |
|---|---|
handle.exec_id | The unique execution identifier (a string you can store or log) |
handle.status | Current execution status (refreshed on each access) |
handle.wait() | Block until the execution finishes, then return the result |
handle.get() | Return the result immediately if finished, otherwise raise an error |
handle.get() does not wait. If the execution is still running, it raises
a KitaruStateError. Use handle.wait() when you want to block.
Runtime options
You can configure execution behavior at the decorator level (defaults) or
override per-run via .run():
from kitaru import flow
@flow(retries=2, cache=False)
def my_agent(url: str) -> str:
...
# Override at call time
handle = my_agent.run(url="https://example.com", retries=3, cache=True)| Option | Default | What it controls |
|---|---|---|
retries | 0 | Number of automatic retries on failure |
cache | True | Whether checkpointed outputs can be reused from previous runs |
stack | None | Target execution environment for this run (overrides the active stack default) |
image | None | Container image for remote execution (string, mapping, or settings object). Supports base_image, requirements, environment, apt_packages, replicate_local_python_environment, and dockerfile. |
Per-run values override decorator defaults. If you don't pass an override, the decorator default applies.
For stack, the full precedence chain is:
.run(..., stack="...")@flow(stack="...")kitaru.configure(stack="...")KITARU_STACK[tool.kitaru].stackinpyproject.toml- active stack selected via
kitaru stack use ...
When a higher layer supplies stack, Kitaru binds that stack only for the
submission of that execution and then restores the previous active stack.
That override does not permanently switch your default stack.
Rules to know
- Flow functions should compose checkpoints. The flow body is the orchestration layer — heavy work belongs in checkpoints.
- Use
.run()to invoke flows. Direct calls (my_agent(...)) are not supported and raiseKitaruUsageError. Usemy_agent.run(...)to start an execution. - Retries must be non-negative. Passing a negative
retriesvalue raises aKitaruUsageError.
Next steps
- Learn how to break work into durable units with Checkpoints
- Attach structured data to your executions with Logging and Metadata
- Understand how results and errors surface in the Execution Model