Kitaru

events

Best-effort live event publishing from inside Kitaru checkpoints.

attributelogger
= logging.getLogger(__name__)
attributeCHECKPOINT_STARTED_KIND
= 'kitaru.checkpoint.started'
attributeCHECKPOINT_PROGRESS_KIND
= 'kitaru.checkpoint.progress'
attributeCHECKPOINT_RETURNED_KIND
= 'kitaru.checkpoint.returned'
attributeCHECKPOINT_FAILED_KIND
= 'kitaru.checkpoint.failed'
funcflush(timeout=2.0) -> bool

Flush pending live events when ZenML streaming is available.

Flushing does not require checkpoint scope because it only drains the process-local ZenML publisher queue. It returns False if flushing itself fails or times out.

paramtimeoutfloat
= 2.0

Returns

bool
funcpublish(kind, payload=None, *, message=None, correlation_id=None, index=None, flush=False) -> None

Publish a custom live event from inside the current checkpoint.

Publishing is best effort: transport failures are logged and dropped, but misuse such as calling this outside @checkpoint raises immediately.

paramkindstr
parampayloadMapping[str, Any] | None
= None
parammessagestr | None
= None
paramcorrelation_idstr | None
= None
paramindexint | None
= None
paramflushbool
= False

Returns

None
funcprogress(message, *, percent=None, correlation_id=None, flush=False, **fields) -> None

Publish a standard checkpoint progress event.

parammessagestr
parampercentfloat | None
= None
paramcorrelation_idstr | None
= None
paramflushbool
= False
paramfieldsAny
= {}

Returns

None