replay
Replay planning utilities.
This module translates Kitaru replay semantics (from_ + overrides) into the
ZenML replay inputs consumed by Pipeline.replay(...).
Ordering uses DAG topology derived from StepSpec.upstream_steps, matching
ZenML's own Compiler._get_sorted_invocations() strategy. Timestamps are
used only as a last-resort fallback for legacy runs missing topology metadata.
func_timestamp(value) -> floatparamvaluedatetime | NoneReturns
floatfunc_topo_sort_steps(steps) -> list[StepRunResponse]Topologically sort steps using upstream_steps from step specs.
Within a topological layer, steps are sorted by invocation ID for determinism. Falls back to timestamp ordering when upstream metadata is missing from all steps.
paramstepsMapping[str, StepRunResponse]Returns
list[zenml.models.StepRunResponse]func_ordered_checkpoints(run) -> list[_OrderedCheckpoint]Build checkpoint list in DAG-topological order.
paramrunPipelineRunResponseReturns
list[kitaru.replay._OrderedCheckpoint]func_available_checkpoint_selectors(checkpoints) -> strparamcheckpointsSequence[_OrderedCheckpoint]Returns
strfunc_resolve_checkpoint_selector(selector, checkpoints) -> _OrderedCheckpointparamselectorstrparamcheckpointsSequence[_OrderedCheckpoint]Returns
kitaru.replay._OrderedCheckpointfunc_iter_step_input_specs(step) -> Iterator[tuple[str, Any]]paramstepStepRunResponseReturns
collections.abc.Iterator[tuple[str, typing.Any]]func_single_checkpoint_output_name(checkpoint) -> strparamcheckpoint_OrderedCheckpointReturns
strfunc_find_downstream_consumers(*, source, checkpoints) -> tuple[list[tuple[str, str]], list[int]]paramsource_OrderedCheckpointparamcheckpointsSequence[_OrderedCheckpoint]Returns
tuple[list[tuple[str, str]], list[int]]func_split_overrides(overrides) -> dict[str, Any]paramoverridesMapping[str, Any] | NoneReturns
dict[str, typing.Any]funcbuild_replay_plan(*, run, from_, overrides=None, flow_inputs=None) -> ReplayPlanBuild a replay plan for a completed/paused execution.
paramrunPipelineRunResponseSource execution to replay from.
paramfrom_strCheckpoint selector (checkpoint name, invocation ID, or call ID).
paramoverridesMapping[str, Any] | None= NoneOptional checkpoint override map (checkpoint.* keys).
paramflow_inputsMapping[str, Any] | None= NoneOptional flow input overrides.
Returns
kitaru.replay.ReplayPlanA resolved replay plan.