API Documentation
- class process_pilot.pilot.ProcessPilot(manifest: ProcessManifest, plugin_directory: Path | None = None, process_poll_interval: float = 0.1, ready_check_interval: float = 0.1, logger_config: dict[str, Any] | None = None)[source]
Bases:
objectClass that manages a manifest-driven set of processes.
- static execute_lifecycle_hooks(process: Process, popen: Popen[str] | None, hook_type: Literal['pre_start', 'post_start', 'on_shutdown', 'on_restart']) None[source]
Execute the lifecycle hooks for a particular process.
- get_running_process(process_id: None = None) list[ProcessStatus] | None[source]
- get_running_process(process_id: int) ProcessStatus | None
- get_running_process(process_id: str) ProcessStatus | None
Get a running process by its ID.
- Parameters:
process_id – The ID of the process to retrieve (integer PID or string name). If None, return all processes.
- load_plugins(plugin_dir: Path) None[source]
Load plugins from the specified directory.
- Parameters:
plugin_dir – The directory to load plugins from
- restart_processes(process_names: list[str] | str) None[source]
Restart specific processes by name.
- Parameters:
process_names – List of process name(s) to restart, or a single process name
- Raises:
ValueError – If any process name is not found
Defines the Plugin abstract base class for registering function hooks and strategies.
- class process_pilot.plugin.ControlServer(*args, **kwargs)[source]
Bases:
ProtocolProtocol defining the interface for control servers.
- class process_pilot.plugin.Plugin[source]
Bases:
objectAbstract base class for plugins.
- get_control_servers() dict[str, Callable[[ProcessPilot], ControlServer]][source]
Register control server implementations.
Control servers provide remote control capabilities for ProcessPilot. The server name must match what is provided in the manifest’s control_server field.
- Returns:
A dictionary mapping control server names to their factory functions
- get_lifecycle_hooks() dict[str, dict[Literal['pre_start', 'post_start', 'on_shutdown', 'on_restart'], list[Callable[[Process, Popen[str] | None], None]]]][source]
Register custom hooks.
Each hook is applied to a specific process and process event as described in the README. The hooks are tied to a specific process through the name of the lifecycle hook. The hook name must match what is provided in the manifest for it to apply to a given process.
- Returns:
A nested dictionary mapping the name of the lifecycle hook to process hook types and their functions.
- get_ready_strategies() dict[str, Callable[[Process, float], bool]][source]
Register custom ready strategies.
These strategies are used to determine if a process is ready to be considered healthy and fully started. When a process has dependent processes, the dependency is not considered fulfilled until the ready strategy returns True. The strategies are tied to a specific process through the name of the ready strategy. The ready strategy name must match what is provided in the manifest for it to apply to a given process.
- Returns:
A dictionary mapping strategy names to their corresponding functions.
- get_stats_handlers() dict[str, list[Callable[[list[ProcessStats]], None]]][source]
Register handlers for process statistics.
These handlers are called periodically to process the statistics of all processes. Each handler function is called everytime the statistics are collected from the process. The handlers are tied to a specific process through the name of the handler. The handler name must match what is provided in the manifest for it to apply to a given process.
- Returns:
A dictionary mapping the name of the stat handler to a list of stat handler functions
- property name: str
Name of the plugin.
The name is only used to identify the plugin in logging statements and is not used for any other purpose. The base implementation returns the name of the class.
- Returns:
The name of the plugin
- class process_pilot.process.Process(*, name: str, working_directory: Path | None = None, path: Path, args: list[str] = [], env: dict[str, str]=<factory>, timeout: float | None = 5.0, shutdown_strategy: Literal['restart', 'do_not_restart', 'shutdown_everything'] | None='restart', dependencies: list[str] | list[Process] = [], lifecycle_hooks: list[str] = [], stat_handlers: list[str] = [], ready_strategy: str | None = None, affinity: list[int] | None = None, ready_timeout_sec: float = 5.0, ready_params: dict[str, ~typing.Any]=<factory>)[source]
Bases:
BaseModelPydantic model of an individual process that is being managed.
- affinity: list[int] | None
Optional list of CPU cores that a given process should run on.
- args: list[str]
The arguments to pass to the executable when it is run.
- property command: list[str]
Return the path to the executable along with all arguments.
- Returns:
A combined list of strings that contains both the executable path and all arguments
- dependencies: list[str] | list[Process]
A list of dependencies that must be started before this process can be started. This is a list of other names in the manifest.
- env: dict[str, str]
Environment variables to pass to the process. These are merged with the parent process environment.
- get_stats() ProcessStats[source]
Create a ProcessStats object from current process state.
- get_status() ProcessStatus[source]
Create a ProcessStatus object from current process state.
- Returns:
A ProcessStatus object containing the current state
- property lifecycle_hook_functions: dict[Literal['pre_start', 'post_start', 'on_shutdown', 'on_restart'], list[Callable[[Process, Popen[str] | None], None]]]
Return the lifecycle hooks dictionary.
- lifecycle_hooks: list[str]
An optional series of function names to call at various points in the process lifecycle. The function names must match the names of the functions in the provided plugin. That is, if you have loaded a plugin that provides function ‘on_start’ that you want called, the manifest entry should include ‘on_start’ in its list.
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_post_init(context: Any, /) None
This function is meant to behave like a BaseModel method to initialize private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Parameters:
self – The BaseModel instance.
context – The context.
- name: str
The name of the process.
- path: Path
The path to the executable that will be run.
- ready_params: dict[str, Any]
Additional parameters for the ready strategy
- ready_strategy: str | None
Optional strategy to determine if the process is ready
- property ready_strategy_function: Callable[[Process, float], bool] | None
Return the ready strategy function for the process.
- ready_timeout_sec: float
The amount of time to wait for the process to signal readiness before giving up
- shutdown_strategy: Literal['restart', 'do_not_restart', 'shutdown_everything'] | None
The strategy to use when the process exits. If not specified, the default is to restart the process.
- stat_handlers: list[str]
An optional series of function names to call whenever the process statistics are gathered. The function names must match the names of the functions in the provided plugin. That is, if you have loaded a plugin that provides function ‘email_stats’ that you want called, the manifest entry should include ‘email_stats’ in its list.
- property stats_handler_functions: list[Callable[[list[ProcessStats]], None]]
Return the stats handler functions.
- timeout: float | None
The amount of time to wait for the process to exit before forcibly killing it.
- update_status(status: ProcessState, pid: int | None = None, return_code: int | None = None) None[source]
Update the process status.
- working_directory: Path | None
The working directory (cwd) to use when starting the process. Defaults to the location of the executable.
- class process_pilot.process.ProcessManifest(*, processes: list[Process], control_server: str | None = None, kill_timeout: float = 5.0, base_directory: Path | None = None)[source]
Bases:
BaseModelPydantic model of each process that is being managed.
- base_directory: Path | None
Base directory to use for all relative paths in the manifest.
- control_server: str | None
Name of the control server implementation to use - must be provided by a plugin.
- classmethod from_json(path: Path) ProcessManifest[source]
Load a JSON formatted process manifest.
- Parameters:
path – Path to the JSON file
- classmethod from_yaml(path: Path) ProcessManifest[source]
Load a YAML formatted process manifest.
- Parameters:
path – Path to the YAML file
- kill_timeout: float
The amount of time to wait for the process to exit before forcibly killing everything.
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_post_init(context: Any, /) None
This function is meant to behave like a BaseModel method to initialize private attributes.
It takes context as an argument since that’s what pydantic-core passes when calling it.
- Parameters:
self – The BaseModel instance.
context – The context.
- order_dependencies() ProcessManifest[source]
Orders the process list based on the dependencies of each process.
- Returns:
The updated manifest with ordered dependencies
- Raises:
ValueError if circular dependencies are detected
- resolve_dependencies() ProcessManifest[source]
Resolve dependencies for each process in the manifest.
- Returns:
The updated manifest with resolved dependencies
- resolve_paths() ProcessManifest[source]
Resolve and validate paths for each process in the manifest.
- Returns:
The updated manifest with resolved paths
- Raises:
ValueError – If any path is invalid or executable not found
- set_working_directory() ProcessManifest[source]
Set the working directory for each process. Defaults to the executable’s parent directory.
- validate_cpu_affinity() ProcessManifest[source]
Validate that the CPU affinities that are set align with core counts.
- validate_ready_config() ProcessManifest[source]
Validate the ready strategy configuration.
- class process_pilot.process.ProcessRuntimeInfo[source]
Bases:
objectContains process-related runtime information.
- property cpu_usage_percent: float
Return the current CPU utilization as a percentage.
- property max_cpu_usage: float
Return the maximum CPU usage (as a %).
- property max_memory_usage_mb: float
Return the maximum memory usage in megabytes.
- property memory_usage_mb: float
Return the current memory usage in megabytes.
- class process_pilot.process.ProcessState(value)[source]
Bases:
str,EnumEnumeration for the state of a process.
- RUNNING = 'running'
- STARTING = 'starting'
- STOPPED = 'stopped'
- STOPPING = 'stopping'
- class process_pilot.process.ProcessStats(name: str, path: Path, memory_usage_mb: float, cpu_usage_percent: float, max_memory_usage_mb: float, max_cpu_usage_percent: float)[source]
Bases:
objectContainer for process statistics.
- cpu_usage_percent: float
Current CPU usage as a percentage.
- max_cpu_usage_percent: float
Maximum CPU usage recorded as a percentage.
- max_memory_usage_mb: float
Maximum memory usage recorded in megabytes.
- memory_usage_mb: float
Current memory usage in megabytes.
- name: str
Name of the process.
- path: Path
Path to the process executable.
- class process_pilot.process.ProcessStatus(*, name: str, pid: int, status: ProcessState, return_code: int | None)[source]
Bases:
BaseModelModel for the status of a process.
- model_config = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- name: str
Name of the process.
- pid: int
Process ID of the process.
- return_code: int | None
Return code of the process if it has exited.
- status: ProcessState
Current state of the process.
Defines type aliases for various strategies and hook types used in the process pilot.
The FileReadyPlugin class which checks if a process is ready by verifying the existence of a file.
- class process_pilot.plugins.file_ready.FileReadyPlugin[source]
Bases:
PluginPlugin to check if a process is ready by checking if a file exists.
TCPReadyPlugin class.
The TCPReadyPlugin class which checks if a process is ready by verifying a TCP connection with the child process.
- class process_pilot.plugins.tcp_ready.TCPReadyPlugin[source]
Bases:
PluginPlugin that provides TCP-based readiness check strategies.
PipeReadyPlugin class.
The PipeReadyPlugin class which checks if a process is ready by verifying the existence of ‘ready’ in a named pipe.