Kitaru

replay

Replay planning utilities.

This module translates Kitaru replay semantics (from_ + overrides) into the ZenML replay inputs consumed by Pipeline.replay(...).

Ordering uses DAG topology derived from StepSpec.upstream_steps, matching ZenML's own Compiler._get_sorted_invocations() strategy. Timestamps are used only as a last-resort fallback for legacy runs missing topology metadata.

func_timestamp(value) -> float
paramvaluedatetime | None

Returns

float
func_topo_sort_steps(steps) -> list[StepRunResponse]

Topologically sort steps using upstream_steps from step specs.

Within a topological layer, steps are sorted by invocation ID for determinism. Falls back to timestamp ordering when upstream metadata is missing from all steps.

paramstepsMapping[str, StepRunResponse]

Returns

list[zenml.models.StepRunResponse]
func_ordered_checkpoints(run) -> list[_OrderedCheckpoint]

Build checkpoint list in DAG-topological order.

paramrunPipelineRunResponse

Returns

list[kitaru.replay._OrderedCheckpoint]
func_available_checkpoint_selectors(checkpoints) -> str
paramcheckpointsSequence[_OrderedCheckpoint]

Returns

str
func_resolve_checkpoint_selector(selector, checkpoints) -> _OrderedCheckpoint
paramselectorstr
paramcheckpointsSequence[_OrderedCheckpoint]

Returns

kitaru.replay._OrderedCheckpoint
func_iter_step_input_specs(step) -> Iterator[tuple[str, Any]]
paramstepStepRunResponse

Returns

collections.abc.Iterator[tuple[str, typing.Any]]
func_single_checkpoint_output_name(checkpoint) -> str
paramcheckpoint_OrderedCheckpoint

Returns

str
func_find_downstream_consumers(*, source, checkpoints) -> tuple[list[tuple[str, str]], list[int]]
paramsource_OrderedCheckpoint
paramcheckpointsSequence[_OrderedCheckpoint]

Returns

tuple[list[tuple[str, str]], list[int]]
func_split_overrides(overrides) -> dict[str, Any]
paramoverridesMapping[str, Any] | None

Returns

dict[str, typing.Any]
funcbuild_replay_plan(*, run, from_, overrides=None, flow_inputs=None) -> ReplayPlan

Build a replay plan for a completed/paused execution.

paramrunPipelineRunResponse

Source execution to replay from.

paramfrom_str

Checkpoint selector (checkpoint name, invocation ID, or call ID).

paramoverridesMapping[str, Any] | None
= None

Optional checkpoint override map (checkpoint.* keys).

paramflow_inputsMapping[str, Any] | None
= None

Optional flow input overrides.

Returns

kitaru.replay.ReplayPlan

A resolved replay plan.