Kitaru

Flows

Define durable execution boundaries for your AI agent workflows

A flow is the outer durable boundary in Kitaru. It marks the top-level function whose execution becomes persistent, replayable, and observable.

Defining a flow

Decorate your orchestration function with @flow:

from kitaru import flow

@flow
def my_agent(url: str) -> str:
    data = fetch_data(url)
    return process_data(data)

The decorated function becomes a callable wrapper object. Inside the flow body, you compose checkpoints — the units of work whose outputs are persisted.

Running a flow

Use .run() to start an execution:

handle = my_agent.run(url="https://example.com")
print(handle.exec_id)   # unique execution identifier
# ... do other work ...
result = handle.wait()   # block until finished

.run() submits the execution and immediately returns a FlowHandle. The flow runs in the background while your code continues. Call handle.wait() when you need the result.

For a synchronous one-liner, chain .wait():

result = my_agent.run(url="https://example.com").wait()

To target a remote stack for one execution, pass stack=:

handle = my_agent.run(url="https://example.com", stack="production")

FlowHandle

A FlowHandle is returned by .run(). It gives you access to the running execution:

Property / MethodWhat it does
handle.exec_idThe unique execution identifier (a string you can store or log)
handle.statusCurrent execution status (refreshed on each access)
handle.wait()Block until the execution finishes, then return the result
handle.get()Return the result immediately if finished, otherwise raise an error

handle.get() does not wait. If the execution is still running, it raises a KitaruStateError. Use handle.wait() when you want to block.

Runtime options

You can configure execution behavior at the decorator level (defaults) or override per-run via .run():

from kitaru import flow

@flow(retries=2, cache=False)
def my_agent(url: str) -> str:
    ...

# Override at call time
handle = my_agent.run(url="https://example.com", retries=3, cache=True)
OptionDefaultWhat it controls
retries0Number of automatic retries on failure
cacheTrueWhether checkpointed outputs can be reused from previous runs
stackNoneTarget execution environment for this run (overrides the active stack default)
imageNoneContainer image for remote execution (string, mapping, or settings object). Supports base_image, requirements, environment, apt_packages, replicate_local_python_environment, and dockerfile.

Per-run values override decorator defaults. If you don't pass an override, the decorator default applies.

For stack, the full precedence chain is:

  1. .run(..., stack="...")
  2. @flow(stack="...")
  3. kitaru.configure(stack="...")
  4. KITARU_STACK
  5. [tool.kitaru].stack in pyproject.toml
  6. active stack selected via kitaru stack use ...

When a higher layer supplies stack, Kitaru binds that stack only for the submission of that execution and then restores the previous active stack. That override does not permanently switch your default stack.

Rules to know

  • Flow functions should compose checkpoints. The flow body is the orchestration layer — heavy work belongs in checkpoints.
  • Use .run() to invoke flows. Direct calls (my_agent(...)) are not supported and raise KitaruUsageError. Use my_agent.run(...) to start an execution.
  • Retries must be non-negative. Passing a negative retries value raises a KitaruUsageError.

Next steps

On this page