Kitaru

flow

Flow decorator for defining durable executions.

A flow is the outer orchestration boundary in Kitaru. It marks the top-level function whose execution becomes durable, replayable, and observable.

attributeImageSetting
= ImageInput
attributelogger
= logging.getLogger(__name__)
func_temporary_active_stack(stack_name_or_id) -> Iterator[None]

Temporarily activate a stack for one flow invocation.

paramstack_name_or_idstr | None

Optional stack name or ID. When None, the currently active ZenML stack is used unchanged.

Returns

collections.abc.Iterator[None]
func_register_pipeline_source_alias(*, func, alias, pipeline_obj) -> None

Register the ZenML pipeline object under a module-level alias.

ZenML dynamic runs reload pipelines from their source import path. Kitaru wraps ZenML pipelines, so we expose the underlying pipeline object under a dedicated alias and point source resolution there.

paramfuncCallable[..., Any]

User flow function.

paramaliasstr

Module-level alias name.

parampipeline_objPipeline

Underlying ZenML pipeline object.

Returns

None
func_wrap_flow_entrypoint(func) -> Callable[..., Any]

Wrap a flow entrypoint with Kitaru flow runtime scope.

paramfuncCallable[..., Any]

Returns

collections.abc.Callable[..., typing.Any]
func_normalize_retries(retries) -> int

Validate and normalize flow retries.

paramretriesint

Retry count.

Returns

int

The normalized retry count.

func_to_retry_config(retries) -> StepRetryConfig | None

Convert a retry count to ZenML retry config.

paramretriesint

Retry count.

Returns

StepRetryConfig | None

A ZenML retry config, or None when retries are disabled.

func_build_settings(image) -> dict[str, DockerSettings]

Build ZenML settings payload for flow execution.

Kitaru is always included in the Docker requirements so that remote containers have the SDK available at runtime.

paramimageImageSettings | None

Optional image configuration.

Returns

dict

Pipeline settings dictionary.

func_inject_model_registry_env(image, *, read_local_registry) -> tuple[ImageSettings, ModelRegistryConfig, bool]

Return image settings with a transported model-registry snapshot.

paramimageImageSettings | None
paramread_local_registryCallable[[], ModelRegistryConfig]

Returns

tuple[kitaru.config.ImageSettings, kitaru.config.ModelRegistryConfig, bool]
func_prepare_model_registry_transport(image) -> tuple[ImageSettings, ModelRegistryConfig]

Inject the model registry into image env and log the outcome.

paramimageImageSettings | None

Returns

tuple[kitaru.config.ImageSettings, kitaru.config.ModelRegistryConfig]
func_build_execution_overrides(*, stack=None, image=None, cache=None, retries=None) -> KitaruConfig

Build a partial execution config from flow and invocation overrides.

paramstackstr | None
= None
paramimageImageSetting | None
= None
paramcachebool | None
= None
paramretriesint | None
= None

Returns

kitaru.config.KitaruConfig
func_extract_values_from_output_specs(run) -> list[Any]

Extract return values using explicit pipeline output specs.

paramrunPipelineRunResponse

Returns

list[typing.Any]
func_extract_values_from_terminal_steps(run) -> list[Any]

Extract return values from terminal step outputs as a fallback.

This fallback is intentionally conservative to avoid returning values in an incorrect order when ZenML pipeline-level output specs are unavailable.

paramrunPipelineRunResponse

Returns

list[typing.Any]
func_extract_flow_result(run) -> Any

Extract user-facing flow return value from a finished pipeline run.

paramrunPipelineRunResponse

The pipeline run.

Returns

typing.Any

The flow result (None, a single value, or a tuple of values).

func_raise_for_unsuccessful_run(run) -> None

Raise a typed Kitaru execution error with run failure context.

paramrunPipelineRunResponse

Returns

None
funcflow(func=None, *, stack=None, image=None, cache=None, retries=None) -> _FlowDefinition | Callable[[Callable[..., Any]], _FlowDefinition]

Mark a function as a durable flow.

Can be used as a bare decorator or with arguments::

@flow def my_flow(...): ...

@flow(stack="prod", retries=2) def my_other_flow(...): ...

paramfuncCallable[..., Any] | None
= None

Optional function for bare decorator use.

paramstackstr | None
= None

Default execution stack.

paramimageImageSetting | None
= None

Default image settings.

paramcachebool | None
= None

Optional cache override (when omitted, lower-precedence config sources apply and eventually default to True).

paramretriesint | None
= None

Optional retry override (when omitted, lower-precedence config sources apply and eventually default to 0).

Returns

_FlowDefinition | Callable[[Callable[..., Any]], _FlowDefinition]

The wrapped flow object or a decorator that returns it.