kitaru
Kitaru: durable execution for AI agents.
Kitaru provides primitives for making AI agent workflows persistent,
replayable, and observable. Decorate your orchestration function with
@flow and your work units with @checkpoint to get automatic
durability.
Example::
from kitaru import flow, checkpoint
@checkpoint def fetch_data(url: str) -> str: return requests.get(url).text
@flow def my_agent(url: str) -> str: data = fetch_data(url) return data.upper()
Current status:
- Implemented:
@flow,@checkpoint,kitaru.log(),save(),load(),wait(),llm(),connect(),configure(), stack lifecycle helpers (list_stacks(),current_stack(),use_stack(),create_stack(),delete_stack()), model alias helpers via CLI (kitaru model register/list),KitaruClientexecution/artifact APIs (get/list/latest/logs/input/retry/resume/cancel/replay+ artifacts), and a typed Kitaru exception hierarchy with failure journaling (Execution.failure,CheckpointCall.attempts). - Implemented: replay support (
KitaruClient.executions.replay(...)).
The CLI also supports global runtime log-store configuration via
kitaru log-store set/show/reset, stack lifecycle via
kitaru stack list/current/use/create/delete, and execution lifecycle commands via
kitaru executions get/list/logs/input/replay/retry/resume/cancel.
func_safe_save(name, value, *, artifact_type, save_func) -> strSave an artifact and fall back to a blob repr if serialization fails.
paramnamestrparamvalueAnyparamartifact_typestrparamsave_func_SaveFnReturns
strfuncnormalize_optional_stack_string(value) -> str | NoneNormalize an optional stack input, treating blanks as omitted.
paramvaluestr | NoneReturns
str | Nonefunc_render_oxford_comma_list(items) -> strRender a list of items with Oxford comma formatting.
paramitemslist[str]Returns
strfunc_render_supported_stack_types(allowed_stack_types) -> strRender the supported stack types for a validation error.
paramallowed_stack_typestuple[StackType, ...]Returns
strfuncnormalize_stack_type(raw_type, *, allowed_stack_types=_DEFAULT_INTERFACE_STACK_TYPES) -> StackTypeNormalize a stack-type input into the internal enum.
paramraw_typestrparamallowed_stack_typestuple[StackType, ...]= _DEFAULT_INTERFACE_STACK_TYPESReturns
kitaru._config._stacks.StackTypefuncinfer_cloud_provider(artifact_store_uri) -> CloudProviderInfer the cloud provider from an artifact-store URI.
paramartifact_store_uristrReturns
kitaru._config._stacks.CloudProviderfunc_render_field_labels(field_names, *, labels) -> strRender interface-specific labels for one or more stack fields.
paramfield_nameslist[str]paramlabelsStackOptionLabelsReturns
strfunc_render_stack_type_requirement(allowed_stack_types, *, labels) -> strRender stack-type requirements for validation messages.
paramallowed_stack_typesfrozenset[StackType]paramlabelsStackOptionLabelsReturns
strfunc_option_group_label(allowed_stack_types) -> strReturn a human-friendly label for a set of allowed stack types.
paramallowed_stack_typesfrozenset[StackType]Returns
strfunc_validate_explicit_field_usage(*, stack_type, provided_fields, labels) -> NoneReject fields that were explicitly provided for the wrong stack type.
paramstack_typeStackTypeparamprovided_fieldsMapping[str, bool]paramlabelsStackOptionLabelsReturns
Nonefunc_deep_merge_config_values(base, override) -> AnyRecursively merge two config values, preferring the later leaf value.
parambaseAnyparamoverrideAnyReturns
typing.Anyfunc_merge_nested_mapping(base, override) -> dict[str, Any]Merge two nested config mappings into a fresh dictionary.
parambaseMapping[str, Any]paramoverrideMapping[str, Any]Returns
dict[str, typing.Any]funcmerge_component_overrides(base, override) -> StackComponentConfigOverridesMerge two component-override payloads with later values winning.
parambaseStackComponentConfigOverrides | NoneparamoverrideStackComponentConfigOverrides | NoneReturns
kitaru._config._stacks.StackComponentConfigOverridesfunc_validate_nested_override_mapping(raw, *, option_name) -> dict[str, Any]Validate a nested override mapping and normalize it to plain dicts.
paramrawMapping[str, Any]paramoption_namestrReturns
dict[str, typing.Any]func_target_field_label(target, *, labels) -> strRender one component target as an interface-specific extra field label.
paramtargetStackComponentTargetparamlabelsStackOptionLabelsReturns
strfuncnormalize_component_overrides_mapping(raw, *, labels) -> StackComponentConfigOverridesNormalize YAML/MCP component overrides into the shared override model.
paramrawMapping[str, Any]paramlabelsStackOptionLabelsReturns
kitaru._config._stacks.StackComponentConfigOverridesfunc_assign_nested_override(root, path_parts, value) -> NoneAssign a nested override path into a mutable mapping.
paramrootdict[str, Any]parampath_partslist[str]paramvalueAnyReturns
Nonefunc_parse_component_override_value(raw_value, *, labels) -> AnyParse one CLI override value using YAML scalar/object semantics.
paramraw_valuestrparamlabelsStackOptionLabelsReturns
typing.Anyfuncparse_cli_component_overrides(assignments, *, labels) -> StackComponentConfigOverridesParse repeatable CLI TARGET.FIELD=VALUE assignments into overrides.
paramassignmentsSequence[str]paramlabelsStackOptionLabelsReturns
kitaru._config._stacks.StackComponentConfigOverridesfuncapply_async_override(overrides, *, async_enabled) -> StackComponentConfigOverridesInject synchronous=false unless the user already set it explicitly.
paramoverridesStackComponentConfigOverrides | Noneparamasync_enabledboolReturns
kitaru._config._stacks.StackComponentConfigOverridesfuncvalidate_component_override_targets(stack_type, overrides, *, labels) -> NoneValidate that component override targets are legal for the stack type.
paramstack_typeStackTypeparamoverridesStackComponentConfigOverrides | NoneparamlabelsStackOptionLabelsReturns
Nonefuncbuild_remote_stack_spec(*, stack_type, artifact_store, container_registry, cluster, region, subscription_id, resource_group, workspace, execution_role, namespace, credentials, verify, labels) -> RemoteStackSpec | NoneValidate interface inputs and build a remote stack spec when needed.
paramstack_typeStackTypeparamartifact_storestr | Noneparamcontainer_registrystr | Noneparamclusterstr | Noneparamregionstr | Noneparamsubscription_idstr | Noneparamresource_groupstr | Noneparamworkspacestr | Noneparamexecution_rolestr | Noneparamnamespacestr | Noneparamcredentialsstr | NoneparamverifyboolparamlabelsStackOptionLabelsReturns
kitaru._config._stacks.RemoteStackSpec | Nonefuncbuild_stack_create_request(*, name, activate, stack_type, artifact_store, container_registry, cluster, region, subscription_id, resource_group, workspace, execution_role, namespace, credentials, verify, component_overrides=None, async_enabled=False, labels, allowed_stack_types=_DEFAULT_INTERFACE_STACK_TYPES) -> ManageStackCreateRequestValidate create inputs and build a structured stack-create request.
paramnamestrparamactivateboolparamstack_typestrparamartifact_storestr | Noneparamcontainer_registrystr | Noneparamclusterstr | Noneparamregionstr | Noneparamsubscription_idstr | Noneparamresource_groupstr | Noneparamworkspacestr | Noneparamexecution_rolestr | Noneparamnamespacestr | Noneparamcredentialsstr | Noneparamverifyboolparamcomponent_overridesStackComponentConfigOverrides | None= Noneparamasync_enabledbool= FalseparamlabelsStackOptionLabelsparamallowed_stack_typestuple[StackType, ...]= _DEFAULT_INTERFACE_STACK_TYPESReturns
kitaru._interface_stacks.ManageStackCreateRequestfuncbuild_manage_stack_request(*, action, name, activate, recursive, force, stack_type, artifact_store, container_registry, cluster, region, subscription_id, resource_group, workspace, execution_role, namespace, credentials, verify, extra=None, async_mode=False) -> ManageStackCreateRequest | ManageStackDeleteRequestValidate MCP manage-stack inputs and build a structured request.
paramactionLiteral['create', 'delete'] | strparamnamestrparamactivateboolparamrecursiveboolparamforceboolparamstack_typestrparamartifact_storestr | Noneparamcontainer_registrystr | Noneparamclusterstr | Noneparamregionstr | Noneparamsubscription_idstr | Noneparamresource_groupstr | Noneparamworkspacestr | Noneparamexecution_rolestr | Noneparamnamespacestr | Noneparamcredentialsstr | NoneparamverifyboolparamextraMapping[str, Any] | None= Noneparamasync_modebool= FalseReturns
kitaru._interface_stacks.ManageStackCreateRequest | kitaru._interface_stacks.ManageStackDeleteRequestfunccallable_name(func) -> strExtract the plain name of a callable.
paramfuncCallable[..., Any]Returns
strfunc_normalize_callable_name(raw_name, *, fallback) -> strSanitize a callable name into a valid Python identifier fragment.
paramraw_namestrparamfallbackstrReturns
strfuncbuild_pipeline_registration_name(name) -> strBuild the plain ZenML pipeline name for a flow function.
paramnamestrReturns
strfuncbuild_checkpoint_registration_name(name) -> strBuild the plain ZenML step name for a checkpoint function.
paramnamestrReturns
strfuncbuild_pipeline_source_alias(name) -> strBuild the internal source alias for a flow (pipeline) function.
paramnamestrReturns
strfuncbuild_checkpoint_source_alias(name) -> strBuild the internal source alias for a checkpoint (step) function.
paramnamestrReturns
strfuncnormalize_flow_name(value) -> str | NoneStrip the pipeline alias prefix from a flow name.
Returns None for None, empty, or whitespace-only input.
paramvalueobject | NoneReturns
str | Nonefuncnormalize_checkpoint_name(step_name) -> strStrip the checkpoint alias prefix from a checkpoint (step) name.
paramstep_namestrReturns
strfuncnormalize_aliases_in_text(text) -> strReplace all alias-prefixed names with their user-facing names in free text.
paramtextstrReturns
strfunc_safe_rmtree(path, label) -> NoneRemove a directory tree, logging a warning on failure.
parampathPathparamlabelstrReturns
Nonefunc_resolve_zenml_dashboard_dir() -> PathLocate ZenML's installed dashboard directory.
Returns
pathlib.Pathfunc_resolve_bundled_ui_dir() -> Path | NoneReturn the path to the bundled Kitaru UI dist directory, or None.
Returns
pathlib.Path | Nonefunc_load_manifest_json(path) -> dict[str, Any] | NoneLoad and validate a manifest JSON file.
Returns the parsed dict if the file exists, is valid JSON, is a dict, and has the expected schema version. Returns None otherwise.
parampathPathReturns
dict[str, typing.Any] | Nonefunc_load_bundled_manifest() -> dict[str, Any] | NoneLoad the bundled bundle_manifest.json, or None if unavailable.
Returns
dict[str, typing.Any] | Nonefunc_load_installed_sentinel(dashboard_dir) -> dict[str, Any] | NoneLoad the sentinel manifest from the installed dashboard, or None.
paramdashboard_dirPathReturns
dict[str, typing.Any] | Nonefunc_dashboard_needs_update(dashboard_dir, bundled) -> boolCheck whether the installed dashboard needs to be replaced.
Returns False only when the installed sentinel matches the bundled manifest on both version and checksum, AND index.html is present.
paramdashboard_dirPathparambundleddict[str, Any]Returns
boolfunc_apply_dashboard_patch(dashboard_dir, bundled_manifest) -> NoneReplace the ZenML dashboard directory with bundled Kitaru UI.
Uses an atomic-ish rename strategy: copy to a temp sibling, rename old to backup, rename temp to target, then remove backup. If the final rename fails, the backup is restored.
paramdashboard_dirPathparambundled_manifestdict[str, Any]Returns
Nonefunc_ensure_kitaru_dashboard() -> boolEnsure ZenML's dashboard directory has the Kitaru UI.
Returns True if the dashboard was updated, False if already current. Silently returns False if no bundled UI is available (dev installs).
Returns
boolfunc_validate_local_server_inputs(*, port, timeout) -> NoneValidate local-server start inputs.
paramportint | NoneparamtimeoutintReturns
Nonefunc_existing_local_server_url(local_server) -> str | NoneReturn the current or configured local-server URL, if known.
paramlocal_serverAnyReturns
str | Nonefunc_existing_local_server_port(local_server) -> int | NoneReturn the configured local-server port, if known.
paramlocal_serverAnyReturns
int | Nonefunc_local_server_log_path() -> strResolve the daemon log file path for startup error guidance.
Returns
strfunc_local_server_start_error(*, action, exc) -> KitaruBackendErrorBuild a user-facing startup/restart failure.
paramactionstrparamexcExceptionReturns
kitaru.errors.KitaruBackendErrorfunc_load_local_server_runtime() -> tuple[type[Any], type[Any], type[Any], Any]Load local-server runtime helpers lazily.
Returns
tuple[type[typing.Any], type[typing.Any], type[typing.Any], typing.Any]func_ensure_local_server_dependencies() -> NoneValidate optional local-server dependencies.
Returns
Nonefunc_build_local_server_config(*, deployment_config_cls, provider_type, port) -> AnyBuild a daemon-backed local-server deployment config.
paramdeployment_config_clstype[Any]paramprovider_typeAnyparamportintReturns
typing.Anyfunc_is_server_running(local_server) -> boolCheck whether a local server deployment is running.
paramlocal_serverAnyReturns
boolfunc_deploy_and_connect(*, deployer, deployment_config_cls, provider_type, port, timeout, action) -> LocalServerConnectionResultDeploy a new local server, connect, and return the result.
paramdeployerAnyparamdeployment_config_clstype[Any]paramprovider_typeAnyparamportintparamtimeoutintparamactionLiteral['started', 'restarted']Returns
kitaru._local_server.LocalServerConnectionResultfuncstart_or_connect_local_server(*, port, timeout) -> LocalServerConnectionResultStart a daemon local server or connect to an existing one.
Before starting or connecting, ensures the ZenML dashboard directory contains Kitaru UI assets. If a compatible server is already running but serving a stale dashboard, it is restarted.
paramportint | NoneparamtimeoutintReturns
kitaru._local_server.LocalServerConnectionResultfuncstop_registered_local_server() -> LocalServerStopResultStop the registered local server if one exists.
Returns
kitaru._local_server.LocalServerStopResultfunc_parse_scope_uuid(scope_id, *, scope_name, api_name) -> UUIDParse a runtime scope identifier as a UUID.
paramscope_idstrRaw scope identifier from runtime context.
paramscope_namestrHuman-readable scope name for error messages.
paramapi_namestrCalling Kitaru API name.
Returns
uuid.UUIDParsed UUID.
func_level_to_kind(levelno) -> _TerminalKindparamlevelnointReturns
kitaru._terminal_logging._TerminalKindfunc_apply_zenml_rules(logger_name, msg, levelno) -> _TerminalDecision | NoneApply rewrite/drop rules to a ZenML log message.
paramlogger_namestrparammsgstrparamlevelnointReturns
kitaru._terminal_logging._TerminalDecision | Nonefunc_decide(record) -> _TerminalDecision | NoneDecide how to render a log record for the terminal.
Returns None to indicate the record should be dropped (not displayed).
paramrecordlogging.LogRecordReturns
kitaru._terminal_logging._TerminalDecision | Nonefunc_render(decision, *, interactive) -> strRender a terminal decision to a display string.
paramdecision_TerminalDecisionparaminteractiveboolReturns
strfunc_get_bypass_write() -> Callable[[str], Any]Get a write callable that bypasses ZenML's stdout wrapper.
Returns
collections.abc.Callable[[str], typing.Any]funcinstall_terminal_log_intercept() -> NoneReplace ZenML's console handler with a Kitaru terminal handler.
This function is idempotent: calling it multiple times (including across
importlib.reload()) will not add duplicate handlers.
Returns
Nonefunctranslate_to_user_error(exc) -> InterfaceErrorDetailsTranslate one exception into user-facing error details.
paramexcBaseExceptionReturns
kitaru._interface_errors.InterfaceErrorDetailsfuncrun_with_cli_error_boundary(operation, *, command, output, exit_with_error, handled_exceptions=(Exception,), translator=translate_to_user_error) -> TRun one CLI operation and emit a consistent error on handled failure.
paramoperationCallable[[], T]paramcommandstrparamoutputCLIOutputFormatparamexit_with_errorCLIErrorEmitterparamhandled_exceptionstuple[type[Exception], ...]= (Exception,)paramtranslatorCallable[[Exception], InterfaceErrorDetails]= translate_to_user_errorReturns
kitaru._interface_errors.Tfuncrun_with_mcp_error_boundary(operation, *, handled_exceptions=(Exception,), translator=None) -> TRun one MCP operation and preserve passthrough exception behavior.
paramoperationCallable[[], T]paramhandled_exceptionstuple[type[Exception], ...]= (Exception,)paramtranslatorCallable[[Exception], BaseException] | None= NoneReturns
kitaru._interface_errors.Tfunc_load_module_from_python_path(module_path, *, module_name_prefix) -> ModuleTypeLoad a Python module from a filesystem path.
parammodule_pathstrparammodule_name_prefixstrReturns
types.ModuleTypefunc_load_flow_target(target, *, module_name_prefix) -> _FlowTargetLoad \<module_or_file>:\<flow_name> into a runnable flow object.
paramtargetstrparammodule_name_prefixstrReturns
kitaru._flow_loading._FlowTargetfunclist_executions_filtered(client, *, flow, status, stack, limit) -> list[Execution]List executions with optional client-side stack filtering.
paramclientKitaruClientparamflowstr | Noneparamstatusstr | Noneparamstackstr | Noneparamlimitint | NoneReturns
list[kitaru.client.Execution]funclatest_execution_filtered(client, *, flow, status, stack) -> ExecutionResolve the latest execution with optional client-side stack filtering.
paramclientKitaruClientparamflowstr | Noneparamstatusstr | Noneparamstackstr | NoneReturns
kitaru.client.Executionfunc_validate_schema_type(value, schema_type) -> boolValidate one value against a shallow JSON-schema type label.
paramvalueAnyparamschema_typestrReturns
boolfuncvalidate_wait_input_schema(*, wait_schema, value) -> NoneRun lightweight wait-input validation when a schema type is present.
paramwait_schemadict[str, Any] | NoneparamvalueAnyReturns
Nonefuncvalidate_pending_wait_input(*, execution, wait, value) -> NoneValidate input against the current pending wait when the target matches.
paramexecutionExecutionparamwaitstrparamvalueAnyReturns
Nonefunc_format_mcp_log_entry(entry) -> strRender a readable one-line text representation for MCP log output.
paramentryLogEntryReturns
strfuncformat_mcp_execution_logs(entries) -> strRender execution logs as plain text for MCP agent readability.
paramentriesSequence[LogEntry]Returns
strfuncinvoke_flow_target(*, target, args, stack, module_name_prefix) -> FlowInvocationResultLoad and invoke a flow target for CLI or MCP run surfaces.
paramtargetstrparamargsdict[str, Any] | Noneparamstackstr | Noneparammodule_name_prefixstrReturns
kitaru._interface_executions.FlowInvocationResultfuncresolve_started_execution_details(*, exec_id, client) -> StartedExecutionDetailsBest-effort execution lookup after a flow launch.
paramexec_idstrparamclientKitaruClientReturns
kitaru._interface_executions.StartedExecutionDetailsfuncbuild_started_execution_payload(*, target, details) -> dict[str, Any]Build the shared structured payload for flow launch responses.
paramtargetstrparamdetailsStartedExecutionDetailsReturns
dict[str, typing.Any]funcresolve_installed_version() -> strResolve the installed Kitaru version lazily.
Returns
strfuncmain() -> NoneShow help when invoked without arguments.
Returns
None