flow
Flow decorator for defining durable executions.
A flow is the outer orchestration boundary in Kitaru. It marks the top-level function whose execution becomes durable, replayable, and observable.
attributeImageSetting= ImageInputattributelogger= logging.getLogger(__name__)func_temporary_active_stack(stack_name_or_id) -> Iterator[None]Temporarily activate a stack for one flow invocation.
paramstack_name_or_idstr | NoneOptional stack name or ID. When None, the
currently active ZenML stack is used unchanged.
Returns
collections.abc.Iterator[None]func_register_pipeline_source_alias(*, func, alias, pipeline_obj) -> NoneRegister the ZenML pipeline object under a module-level alias.
ZenML dynamic runs reload pipelines from their source import path. Kitaru wraps ZenML pipelines, so we expose the underlying pipeline object under a dedicated alias and point source resolution there.
paramfuncCallable[..., Any]User flow function.
paramaliasstrModule-level alias name.
parampipeline_objPipelineUnderlying ZenML pipeline object.
Returns
Nonefunc_wrap_flow_entrypoint(func) -> Callable[..., Any]Wrap a flow entrypoint with Kitaru flow runtime scope.
paramfuncCallable[..., Any]Returns
collections.abc.Callable[..., typing.Any]func_normalize_retries(retries) -> intValidate and normalize flow retries.
paramretriesintRetry count.
Returns
intThe normalized retry count.
func_to_retry_config(retries) -> StepRetryConfig | NoneConvert a retry count to ZenML retry config.
paramretriesintRetry count.
Returns
StepRetryConfig | NoneA ZenML retry config, or None when retries are disabled.
func_build_settings(image) -> dict[str, DockerSettings]Build ZenML settings payload for flow execution.
Kitaru is always included in the Docker requirements so that remote containers have the SDK available at runtime.
paramimageImageSettings | NoneOptional image configuration.
Returns
dictPipeline settings dictionary.
func_inject_model_registry_env(image, *, read_local_registry) -> tuple[ImageSettings, ModelRegistryConfig, bool]Return image settings with a transported model-registry snapshot.
paramimageImageSettings | Noneparamread_local_registryCallable[[], ModelRegistryConfig]Returns
tuple[kitaru.config.ImageSettings, kitaru.config.ModelRegistryConfig, bool]func_prepare_model_registry_transport(image) -> tuple[ImageSettings, ModelRegistryConfig]Inject the model registry into image env and log the outcome.
paramimageImageSettings | NoneReturns
tuple[kitaru.config.ImageSettings, kitaru.config.ModelRegistryConfig]func_build_execution_overrides(*, stack=None, image=None, cache=None, retries=None) -> KitaruConfigBuild a partial execution config from flow and invocation overrides.
paramstackstr | None= NoneparamimageImageSetting | None= Noneparamcachebool | None= Noneparamretriesint | None= NoneReturns
kitaru.config.KitaruConfigfunc_extract_values_from_output_specs(run) -> list[Any]Extract return values using explicit pipeline output specs.
paramrunPipelineRunResponseReturns
list[typing.Any]func_extract_values_from_terminal_steps(run) -> list[Any]Extract return values from terminal step outputs as a fallback.
This fallback is intentionally conservative to avoid returning values in an incorrect order when ZenML pipeline-level output specs are unavailable.
paramrunPipelineRunResponseReturns
list[typing.Any]func_extract_flow_result(run) -> AnyExtract user-facing flow return value from a finished pipeline run.
paramrunPipelineRunResponseThe pipeline run.
Returns
typing.AnyThe flow result (None, a single value, or a tuple of values).
func_raise_for_unsuccessful_run(run) -> NoneRaise a typed Kitaru execution error with run failure context.
paramrunPipelineRunResponseReturns
Nonefuncflow(func=None, *, stack=None, image=None, cache=None, retries=None) -> _FlowDefinition | Callable[[Callable[..., Any]], _FlowDefinition]Mark a function as a durable flow.
Can be used as a bare decorator or with arguments::
@flow def my_flow(...): ...
@flow(stack="prod", retries=2) def my_other_flow(...): ...
paramfuncCallable[..., Any] | None= NoneOptional function for bare decorator use.
paramstackstr | None= NoneDefault execution stack.
paramimageImageSetting | None= NoneDefault image settings.
paramcachebool | None= NoneOptional cache override (when omitted, lower-precedence config
sources apply and eventually default to True).
paramretriesint | None= NoneOptional retry override (when omitted, lower-precedence config
sources apply and eventually default to 0).
Returns
_FlowDefinition | Callable[[Callable[..., Any]], _FlowDefinition]The wrapped flow object or a decorator that returns it.