Kitaru

kitaru

Kitaru: durable execution for AI agents.

Kitaru provides primitives for making AI agent workflows persistent, replayable, and observable. Decorate your orchestration function with @flow and your work units with @checkpoint to get automatic durability.

Example::

from kitaru import flow, checkpoint

@checkpoint def fetch_data(url: str) -> str: return requests.get(url).text

@flow def my_agent(url: str) -> str: data = fetch_data(url) return data.upper()

Current status:

  • Implemented: @flow, @checkpoint, kitaru.log(), save(), load(), wait(), llm(), connect(), configure(), stack lifecycle helpers (list_stacks(), current_stack(), use_stack(), create_stack(), delete_stack()), model alias helpers via CLI (kitaru model register/list), KitaruClient execution/artifact APIs (get/list/latest/logs/input/retry/resume/cancel/replay + artifacts), and a typed Kitaru exception hierarchy with failure journaling (Execution.failure, CheckpointCall.attempts).
  • Implemented: replay support (KitaruClient.executions.replay(...)).

The CLI also supports global runtime log-store configuration via kitaru log-store set/show/reset, stack lifecycle via kitaru stack list/current/use/create/delete, and execution lifecycle commands via kitaru executions get/list/logs/input/replay/retry/resume/cancel.

func_safe_save(name, value, *, artifact_type, save_func) -> str

Save an artifact and fall back to a blob repr if serialization fails.

paramnamestr
paramvalueAny
paramartifact_typestr
paramsave_func_SaveFn

Returns

str
funcnormalize_optional_stack_string(value) -> str | None

Normalize an optional stack input, treating blanks as omitted.

paramvaluestr | None

Returns

str | None
func_render_oxford_comma_list(items) -> str

Render a list of items with Oxford comma formatting.

paramitemslist[str]

Returns

str
func_render_supported_stack_types(allowed_stack_types) -> str

Render the supported stack types for a validation error.

paramallowed_stack_typestuple[StackType, ...]

Returns

str
funcnormalize_stack_type(raw_type, *, allowed_stack_types=_DEFAULT_INTERFACE_STACK_TYPES) -> StackType

Normalize a stack-type input into the internal enum.

paramraw_typestr
paramallowed_stack_typestuple[StackType, ...]
= _DEFAULT_INTERFACE_STACK_TYPES

Returns

kitaru._config._stacks.StackType
funcinfer_cloud_provider(artifact_store_uri) -> CloudProvider

Infer the cloud provider from an artifact-store URI.

paramartifact_store_uristr

Returns

kitaru._config._stacks.CloudProvider
func_render_field_labels(field_names, *, labels) -> str

Render interface-specific labels for one or more stack fields.

paramfield_nameslist[str]
paramlabelsStackOptionLabels

Returns

str
func_render_stack_type_requirement(allowed_stack_types, *, labels) -> str

Render stack-type requirements for validation messages.

paramallowed_stack_typesfrozenset[StackType]
paramlabelsStackOptionLabels

Returns

str
func_option_group_label(allowed_stack_types) -> str

Return a human-friendly label for a set of allowed stack types.

paramallowed_stack_typesfrozenset[StackType]

Returns

str
func_validate_explicit_field_usage(*, stack_type, provided_fields, labels) -> None

Reject fields that were explicitly provided for the wrong stack type.

paramstack_typeStackType
paramprovided_fieldsMapping[str, bool]
paramlabelsStackOptionLabels

Returns

None
func_deep_merge_config_values(base, override) -> Any

Recursively merge two config values, preferring the later leaf value.

parambaseAny
paramoverrideAny

Returns

typing.Any
func_merge_nested_mapping(base, override) -> dict[str, Any]

Merge two nested config mappings into a fresh dictionary.

parambaseMapping[str, Any]
paramoverrideMapping[str, Any]

Returns

dict[str, typing.Any]
funcmerge_component_overrides(base, override) -> StackComponentConfigOverrides

Merge two component-override payloads with later values winning.

parambaseStackComponentConfigOverrides | None
paramoverrideStackComponentConfigOverrides | None

Returns

kitaru._config._stacks.StackComponentConfigOverrides
func_validate_nested_override_mapping(raw, *, option_name) -> dict[str, Any]

Validate a nested override mapping and normalize it to plain dicts.

paramrawMapping[str, Any]
paramoption_namestr

Returns

dict[str, typing.Any]
func_target_field_label(target, *, labels) -> str

Render one component target as an interface-specific extra field label.

paramtargetStackComponentTarget
paramlabelsStackOptionLabels

Returns

str
funcnormalize_component_overrides_mapping(raw, *, labels) -> StackComponentConfigOverrides

Normalize YAML/MCP component overrides into the shared override model.

paramrawMapping[str, Any]
paramlabelsStackOptionLabels

Returns

kitaru._config._stacks.StackComponentConfigOverrides
func_assign_nested_override(root, path_parts, value) -> None

Assign a nested override path into a mutable mapping.

paramrootdict[str, Any]
parampath_partslist[str]
paramvalueAny

Returns

None
func_parse_component_override_value(raw_value, *, labels) -> Any

Parse one CLI override value using YAML scalar/object semantics.

paramraw_valuestr
paramlabelsStackOptionLabels

Returns

typing.Any
funcparse_cli_component_overrides(assignments, *, labels) -> StackComponentConfigOverrides

Parse repeatable CLI TARGET.FIELD=VALUE assignments into overrides.

paramassignmentsSequence[str]
paramlabelsStackOptionLabels

Returns

kitaru._config._stacks.StackComponentConfigOverrides
funcapply_async_override(overrides, *, async_enabled) -> StackComponentConfigOverrides

Inject synchronous=false unless the user already set it explicitly.

paramoverridesStackComponentConfigOverrides | None
paramasync_enabledbool

Returns

kitaru._config._stacks.StackComponentConfigOverrides
funcvalidate_component_override_targets(stack_type, overrides, *, labels) -> None

Validate that component override targets are legal for the stack type.

paramstack_typeStackType
paramoverridesStackComponentConfigOverrides | None
paramlabelsStackOptionLabels

Returns

None
funcbuild_remote_stack_spec(*, stack_type, artifact_store, container_registry, cluster, region, subscription_id, resource_group, workspace, execution_role, namespace, credentials, verify, labels) -> RemoteStackSpec | None

Validate interface inputs and build a remote stack spec when needed.

paramstack_typeStackType
paramartifact_storestr | None
paramcontainer_registrystr | None
paramclusterstr | None
paramregionstr | None
paramsubscription_idstr | None
paramresource_groupstr | None
paramworkspacestr | None
paramexecution_rolestr | None
paramnamespacestr | None
paramcredentialsstr | None
paramverifybool
paramlabelsStackOptionLabels

Returns

kitaru._config._stacks.RemoteStackSpec | None
funcbuild_stack_create_request(*, name, activate, stack_type, artifact_store, container_registry, cluster, region, subscription_id, resource_group, workspace, execution_role, namespace, credentials, verify, component_overrides=None, async_enabled=False, labels, allowed_stack_types=_DEFAULT_INTERFACE_STACK_TYPES) -> ManageStackCreateRequest

Validate create inputs and build a structured stack-create request.

paramnamestr
paramactivatebool
paramstack_typestr
paramartifact_storestr | None
paramcontainer_registrystr | None
paramclusterstr | None
paramregionstr | None
paramsubscription_idstr | None
paramresource_groupstr | None
paramworkspacestr | None
paramexecution_rolestr | None
paramnamespacestr | None
paramcredentialsstr | None
paramverifybool
paramcomponent_overridesStackComponentConfigOverrides | None
= None
paramasync_enabledbool
= False
paramlabelsStackOptionLabels
paramallowed_stack_typestuple[StackType, ...]
= _DEFAULT_INTERFACE_STACK_TYPES

Returns

kitaru._interface_stacks.ManageStackCreateRequest
funcbuild_manage_stack_request(*, action, name, activate, recursive, force, stack_type, artifact_store, container_registry, cluster, region, subscription_id, resource_group, workspace, execution_role, namespace, credentials, verify, extra=None, async_mode=False) -> ManageStackCreateRequest | ManageStackDeleteRequest

Validate MCP manage-stack inputs and build a structured request.

paramactionLiteral['create', 'delete'] | str
paramnamestr
paramactivatebool
paramrecursivebool
paramforcebool
paramstack_typestr
paramartifact_storestr | None
paramcontainer_registrystr | None
paramclusterstr | None
paramregionstr | None
paramsubscription_idstr | None
paramresource_groupstr | None
paramworkspacestr | None
paramexecution_rolestr | None
paramnamespacestr | None
paramcredentialsstr | None
paramverifybool
paramextraMapping[str, Any] | None
= None
paramasync_modebool
= False

Returns

kitaru._interface_stacks.ManageStackCreateRequest | kitaru._interface_stacks.ManageStackDeleteRequest
funccallable_name(func) -> str

Extract the plain name of a callable.

paramfuncCallable[..., Any]

Returns

str
func_normalize_callable_name(raw_name, *, fallback) -> str

Sanitize a callable name into a valid Python identifier fragment.

paramraw_namestr
paramfallbackstr

Returns

str
funcbuild_pipeline_registration_name(name) -> str

Build the plain ZenML pipeline name for a flow function.

paramnamestr

Returns

str
funcbuild_checkpoint_registration_name(name) -> str

Build the plain ZenML step name for a checkpoint function.

paramnamestr

Returns

str
funcbuild_pipeline_source_alias(name) -> str

Build the internal source alias for a flow (pipeline) function.

paramnamestr

Returns

str
funcbuild_checkpoint_source_alias(name) -> str

Build the internal source alias for a checkpoint (step) function.

paramnamestr

Returns

str
funcnormalize_flow_name(value) -> str | None

Strip the pipeline alias prefix from a flow name.

Returns None for None, empty, or whitespace-only input.

paramvalueobject | None

Returns

str | None
funcnormalize_checkpoint_name(step_name) -> str

Strip the checkpoint alias prefix from a checkpoint (step) name.

paramstep_namestr

Returns

str
funcnormalize_aliases_in_text(text) -> str

Replace all alias-prefixed names with their user-facing names in free text.

paramtextstr

Returns

str
func_safe_rmtree(path, label) -> None

Remove a directory tree, logging a warning on failure.

parampathPath
paramlabelstr

Returns

None
func_resolve_zenml_dashboard_dir() -> Path

Locate ZenML's installed dashboard directory.

Returns

pathlib.Path
func_resolve_bundled_ui_dir() -> Path | None

Return the path to the bundled Kitaru UI dist directory, or None.

Returns

pathlib.Path | None
func_load_manifest_json(path) -> dict[str, Any] | None

Load and validate a manifest JSON file.

Returns the parsed dict if the file exists, is valid JSON, is a dict, and has the expected schema version. Returns None otherwise.

parampathPath

Returns

dict[str, typing.Any] | None
func_load_bundled_manifest() -> dict[str, Any] | None

Load the bundled bundle_manifest.json, or None if unavailable.

Returns

dict[str, typing.Any] | None
func_load_installed_sentinel(dashboard_dir) -> dict[str, Any] | None

Load the sentinel manifest from the installed dashboard, or None.

paramdashboard_dirPath

Returns

dict[str, typing.Any] | None
func_dashboard_needs_update(dashboard_dir, bundled) -> bool

Check whether the installed dashboard needs to be replaced.

Returns False only when the installed sentinel matches the bundled manifest on both version and checksum, AND index.html is present.

paramdashboard_dirPath
parambundleddict[str, Any]

Returns

bool
func_apply_dashboard_patch(dashboard_dir, bundled_manifest) -> None

Replace the ZenML dashboard directory with bundled Kitaru UI.

Uses an atomic-ish rename strategy: copy to a temp sibling, rename old to backup, rename temp to target, then remove backup. If the final rename fails, the backup is restored.

paramdashboard_dirPath
parambundled_manifestdict[str, Any]

Returns

None
func_ensure_kitaru_dashboard() -> bool

Ensure ZenML's dashboard directory has the Kitaru UI.

Returns True if the dashboard was updated, False if already current. Silently returns False if no bundled UI is available (dev installs).

Returns

bool
func_validate_local_server_inputs(*, port, timeout) -> None

Validate local-server start inputs.

paramportint | None
paramtimeoutint

Returns

None
func_existing_local_server_url(local_server) -> str | None

Return the current or configured local-server URL, if known.

paramlocal_serverAny

Returns

str | None
func_existing_local_server_port(local_server) -> int | None

Return the configured local-server port, if known.

paramlocal_serverAny

Returns

int | None
func_local_server_log_path() -> str

Resolve the daemon log file path for startup error guidance.

Returns

str
func_local_server_start_error(*, action, exc) -> KitaruBackendError

Build a user-facing startup/restart failure.

paramactionstr
paramexcException

Returns

kitaru.errors.KitaruBackendError
func_load_local_server_runtime() -> tuple[type[Any], type[Any], type[Any], Any]

Load local-server runtime helpers lazily.

Returns

tuple[type[typing.Any], type[typing.Any], type[typing.Any], typing.Any]
func_ensure_local_server_dependencies() -> None

Validate optional local-server dependencies.

Returns

None
func_build_local_server_config(*, deployment_config_cls, provider_type, port) -> Any

Build a daemon-backed local-server deployment config.

paramdeployment_config_clstype[Any]
paramprovider_typeAny
paramportint

Returns

typing.Any
func_is_server_running(local_server) -> bool

Check whether a local server deployment is running.

paramlocal_serverAny

Returns

bool
func_deploy_and_connect(*, deployer, deployment_config_cls, provider_type, port, timeout, action) -> LocalServerConnectionResult

Deploy a new local server, connect, and return the result.

paramdeployerAny
paramdeployment_config_clstype[Any]
paramprovider_typeAny
paramportint
paramtimeoutint
paramactionLiteral['started', 'restarted']

Returns

kitaru._local_server.LocalServerConnectionResult
funcstart_or_connect_local_server(*, port, timeout) -> LocalServerConnectionResult

Start a daemon local server or connect to an existing one.

Before starting or connecting, ensures the ZenML dashboard directory contains Kitaru UI assets. If a compatible server is already running but serving a stale dashboard, it is restarted.

paramportint | None
paramtimeoutint

Returns

kitaru._local_server.LocalServerConnectionResult
funcstop_registered_local_server() -> LocalServerStopResult

Stop the registered local server if one exists.

Returns

kitaru._local_server.LocalServerStopResult
func_parse_scope_uuid(scope_id, *, scope_name, api_name) -> UUID

Parse a runtime scope identifier as a UUID.

paramscope_idstr

Raw scope identifier from runtime context.

paramscope_namestr

Human-readable scope name for error messages.

paramapi_namestr

Calling Kitaru API name.

Returns

uuid.UUID

Parsed UUID.

func_level_to_kind(levelno) -> _TerminalKind
paramlevelnoint

Returns

kitaru._terminal_logging._TerminalKind
func_apply_zenml_rules(logger_name, msg, levelno) -> _TerminalDecision | None

Apply rewrite/drop rules to a ZenML log message.

paramlogger_namestr
parammsgstr
paramlevelnoint

Returns

kitaru._terminal_logging._TerminalDecision | None
func_decide(record) -> _TerminalDecision | None

Decide how to render a log record for the terminal.

Returns None to indicate the record should be dropped (not displayed).

paramrecordlogging.LogRecord

Returns

kitaru._terminal_logging._TerminalDecision | None
func_render(decision, *, interactive) -> str

Render a terminal decision to a display string.

paramdecision_TerminalDecision
paraminteractivebool

Returns

str
func_get_bypass_write() -> Callable[[str], Any]

Get a write callable that bypasses ZenML's stdout wrapper.

Returns

collections.abc.Callable[[str], typing.Any]
funcinstall_terminal_log_intercept() -> None

Replace ZenML's console handler with a Kitaru terminal handler.

This function is idempotent: calling it multiple times (including across importlib.reload()) will not add duplicate handlers.

Returns

None
functranslate_to_user_error(exc) -> InterfaceErrorDetails

Translate one exception into user-facing error details.

paramexcBaseException

Returns

kitaru._interface_errors.InterfaceErrorDetails
funcrun_with_cli_error_boundary(operation, *, command, output, exit_with_error, handled_exceptions=(Exception,), translator=translate_to_user_error) -> T

Run one CLI operation and emit a consistent error on handled failure.

paramoperationCallable[[], T]
paramcommandstr
paramoutputCLIOutputFormat
paramexit_with_errorCLIErrorEmitter
paramhandled_exceptionstuple[type[Exception], ...]
= (Exception,)
paramtranslatorCallable[[Exception], InterfaceErrorDetails]
= translate_to_user_error

Returns

kitaru._interface_errors.T
funcrun_with_mcp_error_boundary(operation, *, handled_exceptions=(Exception,), translator=None) -> T

Run one MCP operation and preserve passthrough exception behavior.

paramoperationCallable[[], T]
paramhandled_exceptionstuple[type[Exception], ...]
= (Exception,)
paramtranslatorCallable[[Exception], BaseException] | None
= None

Returns

kitaru._interface_errors.T
func_load_module_from_python_path(module_path, *, module_name_prefix) -> ModuleType

Load a Python module from a filesystem path.

parammodule_pathstr
parammodule_name_prefixstr

Returns

types.ModuleType
func_load_flow_target(target, *, module_name_prefix) -> _FlowTarget

Load \<module_or_file>:\<flow_name> into a runnable flow object.

paramtargetstr
parammodule_name_prefixstr

Returns

kitaru._flow_loading._FlowTarget
funclist_executions_filtered(client, *, flow, status, stack, limit) -> list[Execution]

List executions with optional client-side stack filtering.

paramclientKitaruClient
paramflowstr | None
paramstatusstr | None
paramstackstr | None
paramlimitint | None

Returns

list[kitaru.client.Execution]
funclatest_execution_filtered(client, *, flow, status, stack) -> Execution

Resolve the latest execution with optional client-side stack filtering.

paramclientKitaruClient
paramflowstr | None
paramstatusstr | None
paramstackstr | None

Returns

kitaru.client.Execution
func_validate_schema_type(value, schema_type) -> bool

Validate one value against a shallow JSON-schema type label.

paramvalueAny
paramschema_typestr

Returns

bool
funcvalidate_wait_input_schema(*, wait_schema, value) -> None

Run lightweight wait-input validation when a schema type is present.

paramwait_schemadict[str, Any] | None
paramvalueAny

Returns

None
funcvalidate_pending_wait_input(*, execution, wait, value) -> None

Validate input against the current pending wait when the target matches.

paramexecutionExecution
paramwaitstr
paramvalueAny

Returns

None
func_format_mcp_log_entry(entry) -> str

Render a readable one-line text representation for MCP log output.

paramentryLogEntry

Returns

str
funcformat_mcp_execution_logs(entries) -> str

Render execution logs as plain text for MCP agent readability.

paramentriesSequence[LogEntry]

Returns

str
funcinvoke_flow_target(*, target, args, stack, module_name_prefix) -> FlowInvocationResult

Load and invoke a flow target for CLI or MCP run surfaces.

paramtargetstr
paramargsdict[str, Any] | None
paramstackstr | None
parammodule_name_prefixstr

Returns

kitaru._interface_executions.FlowInvocationResult
funcresolve_started_execution_details(*, exec_id, client) -> StartedExecutionDetails

Best-effort execution lookup after a flow launch.

paramexec_idstr
paramclientKitaruClient

Returns

kitaru._interface_executions.StartedExecutionDetails
funcbuild_started_execution_payload(*, target, details) -> dict[str, Any]

Build the shared structured payload for flow launch responses.

paramtargetstr
paramdetailsStartedExecutionDetails

Returns

dict[str, typing.Any]
funcresolve_installed_version() -> str

Resolve the installed Kitaru version lazily.

Returns

str
funcmain() -> None

Show help when invoked without arguments.

Returns

None