Kitaru

Flows

Define durable execution boundaries for your AI agent workflows

A flow is the outer durable boundary in Kitaru — the unit of work your platform invokes and the runner executes. Everything inside the flow is tracked at checkpoint boundaries: persisted outputs, retry, replay, resume, and wait. Your harness (Pydantic AI, LangGraph, Claude Agent SDK, raw Python) lives inside the checkpoints. Your platform sits in front of the flow's invocation API. See Harness, Runtime, Platform for the bigger picture.

The shape of a flow

The @flow boundary defines the durable execution. Checkpoints inside it are the persisted replay boundaries; the flow body is ordinary Python that orchestrates those checkpoints. Side effects in the flow body itself are not automatically durable — put anything that needs to survive a crash, be replayed, or be rolled back behind a @checkpoint.

@flow
input
@checkpointpersisted output · replay boundary
@checkpointpersisted output · replay boundary
kitaru.wait()suspends · compute released · resumes on input
input arrives
@checkpointpersisted output · replay boundary
result
The flow body orchestrates; checkpoints are durable replay boundaries.

The flow body is the orchestration layer. The checkpoints inside are the replay boundaries.

Defining a flow

Decorate your orchestration function with @flow:

from kitaru import flow

@flow
def my_agent(url: str) -> str:
    data = fetch_data(url)
    return process_data(data)

The decorated function becomes a callable wrapper object. Inside the flow body, you compose checkpoints — the units of work whose outputs are persisted.

Running a flow

Use .run() to start an execution:

handle = my_agent.run(url="https://example.com")
print(handle.exec_id)   # unique execution identifier
# ... do other work ...
result = handle.wait()   # block until finished

.run() submits the execution and immediately returns a FlowHandle. The flow runs in the background while your code continues. Call handle.wait() when you need the result.

For a synchronous one-liner, chain .wait():

result = my_agent.run(url="https://example.com").wait()

To target a remote stack for one execution, pass stack=:

handle = my_agent.run(url="https://example.com", stack="production")

Deploying and invoking flows

Use .run() when the current source process is starting a flow directly. Use .deploy() when you want to save a reusable, versioned flow entrypoint that other processes can invoke later.

# Producer side: create a reusable deployment version.
my_agent.deploy(url="https://example.com")

# Consumer side: start a new execution from the deployed default route.
handle = my_agent.invoke(url="https://example.com")

.invoke() is the remote invocation verb for deployed flows. If you do not pass version= or tag=, it invokes the reserved default tag. To pin a specific version, pass version=2; to route through a named tag, pass tag="stable".

See Deployments for auto-versioning, tag semantics, auth context, and worked producer/consumer examples.

FlowHandle

A FlowHandle is returned by .run(). It gives you access to the running execution:

Property / MethodWhat it does
handle.exec_idThe unique execution identifier (a string you can store or log)
handle.statusCurrent execution status (refreshed on each access)
handle.wait()Block until the execution finishes, then return the result
handle.get()Return the result immediately if finished, otherwise raise an error

handle.get() does not wait. If the execution is still running, it raises a KitaruStateError. Use handle.wait() when you want to block.

How errors surface

If the flow execution fails, handle.wait() raises a typed KitaruExecutionError (or a more specific subclass) with the execution ID, final status, and the failure origin attached:

import kitaru

try:
    result = my_agent.run(url="bad-input").wait()
except kitaru.KitaruExecutionError as exc:
    print(exc.exec_id, exc.status, exc.failure_origin)

Runtime options

You can configure execution behavior at the decorator level (defaults) or override per-run via .run():

from kitaru import flow

@flow(retries=2, cache=False)
def my_agent(url: str) -> str:
    ...

# Override at call time
handle = my_agent.run(url="https://example.com", retries=3, cache=True)
OptionDefaultWhat it controls
retries0Number of automatic retries on failure
cacheTrueWhether checkpointed outputs can be reused from previous runs. Set False to disable.
stackNoneTarget execution environment for this run (overrides the active stack default)
imageNoneContainer image for remote execution (string, mapping, or settings object). Supports base_image, requirements, environment, apt_packages, replicate_local_python_environment, and dockerfile.

Per-run values override decorator defaults. If you don't pass an override, the decorator default applies.

For stack, the full precedence chain is:

  1. .run(..., stack="...")
  2. @flow(stack="...")
  3. kitaru.configure(stack="...")
  4. KITARU_STACK
  5. [tool.kitaru].stack in pyproject.toml
  6. active stack selected via kitaru stack use ...

When a higher layer supplies stack, Kitaru binds that stack only for the submission of that execution and then restores the previous active stack. That override does not permanently switch your default stack.

Run, then deploy

.run() starts an ad-hoc execution from the current process. It's the right loop for iteration and for calls made from your own code.

For production, a flow is deployed: .deploy() captures an immutable versioned snapshot that consumers invoke by flow name. Tags route traffic between versions (default is the tag your platform normally targets), so you can roll a new version out without changing the invocation surface. Auth is workspace-scoped; there are no per-deployment tokens to rotate.

# Ad-hoc run from the current process
handle = my_agent.run(url="https://example.com")

# Deploy an immutable snapshot (new version each call).
# Parameterized flows take representative deployment-time inputs;
# consumers can override them at invocation time.
my_agent.deploy(url="https://example.com")

# Consumers that have the flow imported can invoke by attribute
handle = my_agent.invoke(url="https://example.com")

# Or invoke source-free by flow name (from Python, CLI, MCP, or HTTP)
from kitaru import KitaruClient
handle = KitaruClient().deployments.invoke(
    flow="my_agent",
    inputs={"url": "https://example.com"},
)

Rules to know

  • Flow functions should compose checkpoints. The flow body is the orchestration layer — heavy work belongs in checkpoints.
  • Use .run() to start flows directly from source. Direct calls (my_agent(...)) are not supported and raise KitaruUsageError. Use my_agent.run(...) for source-backed executions, or .invoke(...) for deployment-backed executions.
  • Retries must be non-negative. Passing a negative retries value raises a KitaruUsageError.

Next steps

On this page