checkpoint
Checkpoint decorator for durable work boundaries.
A checkpoint is a unit of work inside a flow whose outcome is persisted. Successful outputs become artifacts; failures are recorded for retry.
func_register_checkpoint_source_alias(*, func, alias, step_obj) -> NoneRegister the ZenML step object under a module-level alias.
paramfuncCallable[..., Any]paramaliasstrparamstep_objAnyReturns
Nonefunc_normalize_retries(retries) -> intValidate and normalize checkpoint retries.
paramretriesintReturns
intfunc_to_retry_config(retries) -> StepRetryConfig | NoneConvert retry count to ZenML retry config.
paramretriesintReturns
zenml.config.retry_config.StepRetryConfig | Nonefunc_build_checkpoint_extra(checkpoint_type) -> dict[str, Any]Build namespaced step metadata for dashboard rendering.
paramcheckpoint_typestr | NoneReturns
dict[str, typing.Any]func_to_step_type(checkpoint_type) -> StepType | NoneMap well-known checkpoint types to ZenML's StepType enum.
paramcheckpoint_typestr | NoneReturns
zenml.enums.StepType | Nonefunc_wrap_entrypoint(func, *, checkpoint_type) -> Callable[..., Any]Wrap the user function with Kitaru checkpoint runtime scope.
paramfuncCallable[..., Any]paramcheckpoint_typestr | NoneReturns
collections.abc.Callable[..., typing.Any]funccheckpoint(func=None, *, retries=0, type=None) -> _CheckpointDefinition | Callable[[Callable[..., Any]], _CheckpointDefinition]Mark a function as a durable checkpoint.
Can be used as a bare decorator or with arguments::
from kitaru import checkpoint
@checkpoint def my_step(): ...
@checkpoint(retries=3, type="llm_call") def my_step(): ...
paramfuncCallable[..., Any] | None= NoneOptional function for bare decorator use.
paramretriesint= 0Number of checkpoint-level retries on failure.
paramtypestr | None= NoneCheckpoint type for dashboard visualization.
Returns
_CheckpointDefinition | Callable[[Callable[..., Any]], _CheckpointDefinition]The wrapped checkpoint object or a decorator that returns it.