Execution Model
How flows run, return results, and surface errors
This page explains how Kitaru executes your workflows and what to expect when things succeed or fail.
The mental model
Kitaru tracks your workflow at flow and checkpoint boundaries. Think of it like this:
- A flow defines the outer durable boundary
- Checkpoints inside the flow are the persisted work units
- Kitaru records the outcome of each checkpoint as it runs
- On re-execution, completed checkpoints can return persisted results instead of running again
Your Python code between checkpoints runs normally — Kitaru does not intercept every line. The durability guarantees apply at the checkpoint boundaries.
What happens when you run a flow
handle = my_agent.run(url="https://example.com")
# execution runs in the background
result = handle.wait()Behind the scenes:
Kitaru submits a new execution
.run() returns a FlowHandle immediately
The execution runs in the background
Calling handle.wait() blocks until the execution finishes
On success, the flow's return value is extracted and returned
On failure, handle.wait() raises KitaruExecutionError (or a more
specific subclass)
For a synchronous one-liner, chain .wait():
result = my_agent.run(url="https://example.com").wait()How errors surface
If a flow execution fails, handle.wait() raises a typed
KitaruExecutionError:
import kitaru
try:
result = my_agent.run(topic="bad-input").wait()
except kitaru.KitaruExecutionError as exc:
print(exc.exec_id, exc.status, exc.failure_origin)Result reuse
By default, checkpoint result reuse is enabled (cache=True on @flow).
This means:
- Checkpoints that already succeeded in a previous execution can return their recorded output without re-running
- This is especially useful during development when you want to iterate on later checkpoints without repeating expensive earlier work
You can disable result reuse at the flow level or per-run:
from kitaru import flow
# Disable result reuse by default
@flow(cache=False)
def always_fresh(url: str) -> str:
...
# Or override per-run
handle = my_agent.run("https://example.com", cache=False)Related guides
- Learn how to revisit a durable run with Replay and Overrides
- Inspect runtime behavior with Execution Logs
Next steps
- Start building with Flows and Checkpoints
- Add observability with Logging and Metadata
- See the full API in the Python Reference