API Documentation

class process_pilot.pilot.ProcessPilot(manifest: ProcessManifest, plugin_directory: Path | None = None, process_poll_interval: float = 0.1, ready_check_interval: float = 0.1, logger_config: dict[str, Any] | None = None)[source]

Bases: object

Class that manages a manifest-driven set of processes.

static execute_lifecycle_hooks(process: Process, popen: Popen[str] | None, hook_type: Literal['pre_start', 'post_start', 'on_shutdown', 'on_restart']) None[source]

Execute the lifecycle hooks for a particular process.

get_manifest_processes() list[Process][source]

Get all processes specified in the manifest.

get_process_by_name(name: str) Process | None[source]

Get a process’ manifest details by its name.

get_running_process(process_id: None = None) list[ProcessStatus] | None[source]
get_running_process(process_id: int) ProcessStatus | None
get_running_process(process_id: str) ProcessStatus | None

Get a running process by its ID.

Parameters:

process_id – The ID of the process to retrieve (integer PID or string name). If None, return all processes.

is_running() bool[source]

Check if the ProcessPilot is currently running.

load_plugins(plugin_dir: Path) None[source]

Load plugins from the specified directory.

Parameters:

plugin_dir – The directory to load plugins from

register_plugins(plugins: list[Plugin]) None[source]

Register plugins and their hooks/strategies.

restart_processes(process_names: list[str] | str) None[source]

Restart specific processes by name.

Parameters:

process_names – List of process name(s) to restart, or a single process name

Raises:

ValueError – If any process name is not found

set_process_affinity(process: Popen[str], affinity: list[int] | None) None[source]

Set the CPU affinity for a given process. Not supported in Mac OS X.

Parameters:

process – Process to set the affinity for

start() None[source]

Start all services.

start_process(name: str) None[source]

Start a specific process by its manifest name.

stop() None[source]

Stop all services.

stop_process(name: str) None[source]

Stop a specific process by its manifest name.

Parameters:

name – Name of the process to stop

Raises:

ValueError – If process name is not found

Defines the Plugin abstract base class for registering function hooks and strategies.

class process_pilot.plugin.ControlServer(*args, **kwargs)[source]

Bases: Protocol

Protocol defining the interface for control servers.

start() None[source]

Start the control server.

stop() None[source]

Stop the control server.

class process_pilot.plugin.Plugin[source]

Bases: object

Abstract base class for plugins.

get_control_servers() dict[str, Callable[[ProcessPilot], ControlServer]][source]

Register control server implementations.

Control servers provide remote control capabilities for ProcessPilot. The server name must match what is provided in the manifest’s control_server field.

Returns:

A dictionary mapping control server names to their factory functions

get_lifecycle_hooks() dict[str, dict[Literal['pre_start', 'post_start', 'on_shutdown', 'on_restart'], list[Callable[[Process, Popen[str] | None], None]]]][source]

Register custom hooks.

Each hook is applied to a specific process and process event as described in the README. The hooks are tied to a specific process through the name of the lifecycle hook. The hook name must match what is provided in the manifest for it to apply to a given process.

Returns:

A nested dictionary mapping the name of the lifecycle hook to process hook types and their functions.

get_ready_strategies() dict[str, Callable[[Process, float], bool]][source]

Register custom ready strategies.

These strategies are used to determine if a process is ready to be considered healthy and fully started. When a process has dependent processes, the dependency is not considered fulfilled until the ready strategy returns True. The strategies are tied to a specific process through the name of the ready strategy. The ready strategy name must match what is provided in the manifest for it to apply to a given process.

Returns:

A dictionary mapping strategy names to their corresponding functions.

get_stats_handlers() dict[str, list[Callable[[list[ProcessStats]], None]]][source]

Register handlers for process statistics.

These handlers are called periodically to process the statistics of all processes. Each handler function is called everytime the statistics are collected from the process. The handlers are tied to a specific process through the name of the handler. The handler name must match what is provided in the manifest for it to apply to a given process.

Returns:

A dictionary mapping the name of the stat handler to a list of stat handler functions

property name: str

Name of the plugin.

The name is only used to identify the plugin in logging statements and is not used for any other purpose. The base implementation returns the name of the class.

Returns:

The name of the plugin

class process_pilot.process.Process(*, name: str, working_directory: Path | None = None, path: Path, args: list[str] = [], env: dict[str, str]=<factory>, timeout: float | None = 5.0, shutdown_strategy: Literal['restart', 'do_not_restart', 'shutdown_everything'] | None='restart', dependencies: list[str] | list[Process] = [], lifecycle_hooks: list[str] = [], stat_handlers: list[str] = [], ready_strategy: str | None = None, affinity: list[int] | None = None, ready_timeout_sec: float = 5.0, ready_params: dict[str, ~typing.Any]=<factory>)[source]

Bases: BaseModel

Pydantic model of an individual process that is being managed.

affinity: list[int] | None

Optional list of CPU cores that a given process should run on.

args: list[str]

The arguments to pass to the executable when it is run.

property command: list[str]

Return the path to the executable along with all arguments.

Returns:

A combined list of strings that contains both the executable path and all arguments

dependencies: list[str] | list[Process]

A list of dependencies that must be started before this process can be started. This is a list of other names in the manifest.

env: dict[str, str]

Environment variables to pass to the process. These are merged with the parent process environment.

get_stats() ProcessStats[source]

Create a ProcessStats object from current process state.

get_status() ProcessStatus[source]

Create a ProcessStatus object from current process state.

Returns:

A ProcessStatus object containing the current state

property lifecycle_hook_functions: dict[Literal['pre_start', 'post_start', 'on_shutdown', 'on_restart'], list[Callable[[Process, Popen[str] | None], None]]]

Return the lifecycle hooks dictionary.

lifecycle_hooks: list[str]

An optional series of function names to call at various points in the process lifecycle. The function names must match the names of the functions in the provided plugin. That is, if you have loaded a plugin that provides function ‘on_start’ that you want called, the manifest entry should include ‘on_start’ in its list.

model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(context: Any, /) None

This function is meant to behave like a BaseModel method to initialize private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Parameters:
  • self – The BaseModel instance.

  • context – The context.

name: str

The name of the process.

path: Path

The path to the executable that will be run.

ready_params: dict[str, Any]

Additional parameters for the ready strategy

ready_strategy: str | None

Optional strategy to determine if the process is ready

property ready_strategy_function: Callable[[Process, float], bool] | None

Return the ready strategy function for the process.

ready_timeout_sec: float

The amount of time to wait for the process to signal readiness before giving up

record_process_stats(pid: int) None[source]

Get the memory and cpu usage of a process by its PID.

shutdown_strategy: Literal['restart', 'do_not_restart', 'shutdown_everything'] | None

The strategy to use when the process exits. If not specified, the default is to restart the process.

stat_handlers: list[str]

An optional series of function names to call whenever the process statistics are gathered. The function names must match the names of the functions in the provided plugin. That is, if you have loaded a plugin that provides function ‘email_stats’ that you want called, the manifest entry should include ‘email_stats’ in its list.

property stats_handler_functions: list[Callable[[list[ProcessStats]], None]]

Return the stats handler functions.

timeout: float | None

The amount of time to wait for the process to exit before forcibly killing it.

update_status(status: ProcessState, pid: int | None = None, return_code: int | None = None) None[source]

Update the process status.

wait_until_ready() bool[source]

Wait for process to signal readiness.

working_directory: Path | None

The working directory (cwd) to use when starting the process. Defaults to the location of the executable.

class process_pilot.process.ProcessManifest(*, processes: list[Process], control_server: str | None = None, kill_timeout: float = 5.0, base_directory: Path | None = None)[source]

Bases: BaseModel

Pydantic model of each process that is being managed.

base_directory: Path | None

Base directory to use for all relative paths in the manifest.

control_server: str | None

Name of the control server implementation to use - must be provided by a plugin.

classmethod from_json(path: Path) ProcessManifest[source]

Load a JSON formatted process manifest.

Parameters:

path – Path to the JSON file

classmethod from_yaml(path: Path) ProcessManifest[source]

Load a YAML formatted process manifest.

Parameters:

path – Path to the YAML file

kill_timeout: float

The amount of time to wait for the process to exit before forcibly killing everything.

model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(context: Any, /) None

This function is meant to behave like a BaseModel method to initialize private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Parameters:
  • self – The BaseModel instance.

  • context – The context.

order_dependencies() ProcessManifest[source]

Orders the process list based on the dependencies of each process.

Returns:

The updated manifest with ordered dependencies

Raises:

ValueError if circular dependencies are detected

processes: list[Process]

List of processes to be managed.

resolve_dependencies() ProcessManifest[source]

Resolve dependencies for each process in the manifest.

Returns:

The updated manifest with resolved dependencies

resolve_paths() ProcessManifest[source]

Resolve and validate paths for each process in the manifest.

Returns:

The updated manifest with resolved paths

Raises:

ValueError – If any path is invalid or executable not found

set_working_directory() ProcessManifest[source]

Set the working directory for each process. Defaults to the executable’s parent directory.

validate_cpu_affinity() ProcessManifest[source]

Validate that the CPU affinities that are set align with core counts.

validate_ready_config() ProcessManifest[source]

Validate the ready strategy configuration.

class process_pilot.process.ProcessRuntimeInfo[source]

Bases: object

Contains process-related runtime information.

property cpu_usage_percent: float

Return the current CPU utilization as a percentage.

property max_cpu_usage: float

Return the maximum CPU usage (as a %).

property max_memory_usage_mb: float

Return the maximum memory usage in megabytes.

property memory_usage_mb: float

Return the current memory usage in megabytes.

class process_pilot.process.ProcessState(value)[source]

Bases: str, Enum

Enumeration for the state of a process.

RUNNING = 'running'
STARTING = 'starting'
STOPPED = 'stopped'
STOPPING = 'stopping'
class process_pilot.process.ProcessStats(name: str, path: Path, memory_usage_mb: float, cpu_usage_percent: float, max_memory_usage_mb: float, max_cpu_usage_percent: float)[source]

Bases: object

Container for process statistics.

cpu_usage_percent: float

Current CPU usage as a percentage.

max_cpu_usage_percent: float

Maximum CPU usage recorded as a percentage.

max_memory_usage_mb: float

Maximum memory usage recorded in megabytes.

memory_usage_mb: float

Current memory usage in megabytes.

name: str

Name of the process.

path: Path

Path to the process executable.

class process_pilot.process.ProcessStatus(*, name: str, pid: int, status: ProcessState, return_code: int | None)[source]

Bases: BaseModel

Model for the status of a process.

model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str

Name of the process.

pid: int

Process ID of the process.

return_code: int | None

Return code of the process if it has exited.

status: ProcessState

Current state of the process.

Defines type aliases for various strategies and hook types used in the process pilot.

The FileReadyPlugin class which checks if a process is ready by verifying the existence of a file.

class process_pilot.plugins.file_ready.FileReadyPlugin[source]

Bases: Plugin

Plugin to check if a process is ready by checking if a file exists.

get_ready_strategies() dict[str, Callable[[Process, float], bool]][source]

Register strategies for the plugin.

Returns:

A dictionary mapping strategy names to their corresponding functions.

TCPReadyPlugin class.

The TCPReadyPlugin class which checks if a process is ready by verifying a TCP connection with the child process.

class process_pilot.plugins.tcp_ready.TCPReadyPlugin[source]

Bases: Plugin

Plugin that provides TCP-based readiness check strategies.

get_ready_strategies() dict[str, Callable[[Process, float], bool]][source]

Register strategies for the plugin.

Returns:

A dictionary mapping strategy names to their corresponding functions.

PipeReadyPlugin class.

The PipeReadyPlugin class which checks if a process is ready by verifying the existence of ‘ready’ in a named pipe.

class process_pilot.plugins.pipe_ready.PipeReadyPlugin[source]

Bases: Plugin

Plugin that implements a named pipe (FIFO) based readiness check strategy.

get_ready_strategies() dict[str, Callable[[Process, float], bool]][source]

Register strategies for the plugin.

Returns:

A dictionary mapping strategy names to their corresponding functions.