Checkpoints
Durable work units with persistence and concurrency support
A checkpoint is a unit of work inside a flow whose output is automatically persisted. When a flow re-executes, checkpoints that already succeeded can return their recorded results instead of running again.
Defining a checkpoint
Decorate work functions with @checkpoint:
from kitaru import checkpoint
@checkpoint
def fetch_data(url: str) -> str:
return requests.get(url).text
@checkpoint
def process_data(data: str) -> str:
return data.upper()Checkpoints are reusable — define them once and call them from any flow.
Composing checkpoints in a flow
Call checkpoints from inside a @flow to build your workflow:
from kitaru import flow
@flow
def my_agent(url: str) -> str:
data = fetch_data(url)
result = process_data(data)
return resultCheckpoints execute sequentially by default. The return value of one checkpoint can be passed directly as input to the next — standard Python data flow.
Concurrent execution
For independent work that can run in parallel, use .submit():
from kitaru import flow
@flow
def parallel_agent(urls: list[str]) -> list[str]:
futures = [fetch_data.submit(url) for url in urls]
return [f.result() for f in futures].submit() returns a future-like object. Call .result() on it to get the
checkpoint's return value. This is the primary fan-out pattern in Kitaru.
The object returned by .submit() is a runtime future — use .result() to
collect the value. You can submit multiple checkpoints and collect their
results later for fan-out / fan-in patterns.
Additional concurrent helpers
Kitaru also provides .map() and .product() for batch concurrent execution:
# .map() — apply checkpoint to each element of an iterable
results = fetch_data.map(["url1", "url2", "url3"])
# .product() — apply checkpoint to the cartesian product of inputs
results = my_checkpoint.product(["a", "b"], [1, 2])These are convenience wrappers over concurrent submission. See the API reference for detailed signatures.
Decorator options
from kitaru import checkpoint
@checkpoint(retries=3, type="llm_call")
def call_model(prompt: str) -> str:
...| Option | Default | What it controls |
|---|---|---|
retries | 0 | Automatic retries on checkpoint failure |
type | None | A label for UI visualization (e.g. "llm_call", "tool_call") |
Like flow options, retries must be non-negative.
When retries are enabled, Kitaru records each failed attempt before the final
checkpoint outcome. You can inspect this history through
KitaruClient().executions.get(exec_id).checkpoints[*].attempts.
Error handling and retries
When a checkpoint raises an unhandled exception, the flow stops immediately and the execution is marked as failed. No subsequent checkpoints run.
Automatic retries
The retries parameter on @checkpoint tells Kitaru to re-run the checkpoint
automatically before giving up:
@checkpoint(retries=3)
def call_model(prompt: str) -> str:
return client.chat(prompt) # retried up to 3 times on failureEach failed attempt is recorded, so you can inspect the full retry history through the execution's checkpoint attempts. If the checkpoint still fails after all retries, the flow fails.
For retrying the entire flow (not just a single checkpoint), see the
retries option on flows.
Resuming after failure
When a flow fails, you don't need to re-run everything from scratch. Use replay to re-execute from the point of failure — checkpoints that already succeeded return their recorded results, and execution picks up at the first incomplete checkpoint.
Return values
Checkpoint return values must be serializable — Kitaru persists them so they can be reused in future executions. Prefer:
- Built-in Python types (
str,int,float,bool,list,dict) - Pydantic models
- JSON-compatible data structures
Rules to know
Kitaru enforces several guardrails in the current release:
- Checkpoints only work inside a flow. Calling a checkpoint outside a
@flowraisesKitaruContextError. - No nested checkpoints. Calling one checkpoint from inside another is not
supported and raises
KitaruContextError. .submit()requires a running flow. Concurrent submission is only available during flow execution, not during flow compilation..map()and.product()follow the same rules as.submit()— they require a running flow context.
from kitaru import checkpoint
# This raises KitaruContextError — checkpoint called outside a flow
fetch_data("https://example.com")
# This also raises KitaruContextError — nested checkpoint
@checkpoint
def outer():
return inner() # inner is also a checkpoint — not allowedNext steps
- Add structured metadata to your checkpoints with Logging and Metadata
- Understand how checkpoint results and errors surface in the Execution Model
- See the full API in the Checkpoint Reference