Source code for process_pilot.process

import json  # noqa: D100
import logging
import os
import shutil
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Any, cast

import psutil
import yaml
from pydantic import BaseModel, Field, model_validator

from process_pilot.plugin import LifecycleHookType, ReadyStrategyType, StatHandlerType
from process_pilot.types import ProcessHookType, ShutdownStrategy


[docs] class ProcessState(str, Enum): """Enumeration for the state of a process.""" STARTING = "starting" RUNNING = "running" STOPPING = "stopping" STOPPED = "stopped"
[docs] class ProcessStatus(BaseModel): """Model for the status of a process.""" name: str """Name of the process.""" pid: int """Process ID of the process.""" status: ProcessState """Current state of the process.""" return_code: int | None """Return code of the process if it has exited."""
[docs] @dataclass class ProcessStats: """Container for process statistics.""" name: str """Name of the process.""" path: Path """Path to the process executable.""" memory_usage_mb: float """Current memory usage in megabytes.""" cpu_usage_percent: float """Current CPU usage as a percentage.""" max_memory_usage_mb: float """Maximum memory usage recorded in megabytes.""" max_cpu_usage_percent: float """Maximum CPU usage recorded as a percentage."""
[docs] class ProcessRuntimeInfo: """Contains process-related runtime information.""" def __init__(self) -> None: """Construct a ProcessRuntimeInfo instance.""" self._memory_usage_mb = 0.0 self._cpu_usage_percent = 0.0 self._max_memory_usage_mb = 0.0 self._max_cpu_usage = 0.0 @property def memory_usage_mb(self) -> float: """Return the current memory usage in megabytes.""" return self._memory_usage_mb @memory_usage_mb.setter def memory_usage_mb(self, value: float) -> None: self._memory_usage_mb = value self._max_memory_usage_mb = max(value, self._max_memory_usage_mb) @property def cpu_usage_percent(self) -> float: """Return the current CPU utilization as a percentage.""" return self._cpu_usage_percent @cpu_usage_percent.setter def cpu_usage_percent(self, value: float) -> None: self._cpu_usage_percent = value self._max_cpu_usage = max(value, self._max_cpu_usage) @property def max_memory_usage_mb(self) -> float: """Return the maximum memory usage in megabytes.""" return self._max_memory_usage_mb @property def max_cpu_usage(self) -> float: """Return the maximum CPU usage (as a %).""" return self._max_cpu_usage
[docs] class Process(BaseModel): """Pydantic model of an individual process that is being managed.""" name: str """The name of the process.""" working_directory: Path | None = None """The working directory (cwd) to use when starting the process. Defaults to the location of the executable.""" path: Path """The path to the executable that will be run.""" args: list[str] = Field(default=[]) """The arguments to pass to the executable when it is run.""" env: dict[str, str] = Field(default_factory=dict) """Environment variables to pass to the process. These are merged with the parent process environment.""" timeout: float | None = 5.0 """The amount of time to wait for the process to exit before forcibly killing it.""" shutdown_strategy: ShutdownStrategy | None = "restart" """The strategy to use when the process exits. If not specified, the default is to restart the process.""" dependencies: list[str] | list["Process"] = Field(default=[]) """ A list of dependencies that must be started before this process can be started. This is a list of other names in the manifest. """ lifecycle_hooks: list[str] = Field(default=[]) """ An optional series of function names to call at various points in the process lifecycle. The function names must match the names of the functions in the provided plugin. That is, if you have loaded a plugin that provides function 'on_start' that you want called, the manifest entry should include 'on_start' in its list. """ stat_handlers: list[str] = Field(default=[]) """ An optional series of function names to call whenever the process statistics are gathered. The function names must match the names of the functions in the provided plugin. That is, if you have loaded a plugin that provides function 'email_stats' that you want called, the manifest entry should include 'email_stats' in its list. """ ready_strategy: str | None = None """Optional strategy to determine if the process is ready""" affinity: list[int] | None = None """Optional list of CPU cores that a given process should run on.""" _runtime_info: ProcessRuntimeInfo = ProcessRuntimeInfo() """Runtime information about the process""" ready_timeout_sec: float = 5.0 """The amount of time to wait for the process to signal readiness before giving up""" ready_params: dict[str, Any] = Field(default_factory=dict) """Additional parameters for the ready strategy""" _ready_strategy_function: ReadyStrategyType | None = None """The function that implements the ready strategy - set to private so that it will not be serialized""" @property def ready_strategy_function(self) -> ReadyStrategyType | None: """Return the ready strategy function for the process.""" return self._ready_strategy_function @ready_strategy_function.setter def ready_strategy_function(self, strategy: ReadyStrategyType) -> None: """Set the ready strategy function for the process.""" self._ready_strategy_function = strategy _lifecycle_hook_functions: dict[ProcessHookType, list[LifecycleHookType]] = { "on_restart": [], "on_shutdown": [], "post_start": [], "pre_start": [], } _pid: int = 0 _status: ProcessState = ProcessState.STOPPED _return_code: int = -1 _EXIT_CODE_FOR_RUNNING_PROCESS: int = -1 _VALID_TRANSITIONS = { ProcessState.STOPPED: {ProcessState.STARTING}, ProcessState.STARTING: {ProcessState.RUNNING, ProcessState.STOPPING}, ProcessState.RUNNING: {ProcessState.STOPPING, ProcessState.STOPPED}, ProcessState.STOPPING: {ProcessState.STOPPED}, } @property def lifecycle_hook_functions(self) -> dict[ProcessHookType, list[LifecycleHookType]]: """Return the lifecycle hooks dictionary.""" return self._lifecycle_hook_functions @lifecycle_hook_functions.setter def lifecycle_hook_functions(self, hooks: dict[ProcessHookType, list[LifecycleHookType]]) -> None: """Set the lifecycle hooks dictionary.""" self._lifecycle_hook_functions = hooks _stats_handler_functions: list[StatHandlerType] = [] @property def stats_handler_functions(self) -> list[StatHandlerType]: """Return the stats handler functions.""" return self._stats_handler_functions @stats_handler_functions.setter def stats_handler_functions(self, handlers: list[StatHandlerType]) -> None: """Set the stats handler functions.""" self._stats_handler_functions = handlers @property def command(self) -> list[str]: """ Return the path to the executable along with all arguments. :returns: A combined list of strings that contains both the executable path and all arguments """ return [str(self.path), *self.args]
[docs] def record_process_stats(self, pid: int) -> None: """Get the memory and cpu usage of a process by its PID.""" try: found_process = psutil.Process(pid) memory_usage = found_process.memory_info() cpu_usage = found_process.cpu_percent() except psutil.NoSuchProcess: logging.exception("Unable to find process to get stats for with PID %i", pid) return else: self._runtime_info.cpu_usage_percent = cpu_usage self._runtime_info.memory_usage_mb = memory_usage.rss / (1024 * 1024)
[docs] def wait_until_ready(self) -> bool: """Wait for process to signal readiness.""" # TODO: Don't think we need to wait for processes that have no dependents if not self.ready_strategy_function: return True return self.ready_strategy_function(self, 0.1)
[docs] def get_stats(self) -> ProcessStats: """Create a ProcessStats object from current process state.""" return ProcessStats( name=self.name, path=self.path, memory_usage_mb=self._runtime_info.memory_usage_mb, cpu_usage_percent=self._runtime_info.cpu_usage_percent, max_memory_usage_mb=self._runtime_info.max_memory_usage_mb, max_cpu_usage_percent=self._runtime_info.max_cpu_usage, )
[docs] def update_status( self, status: ProcessState, pid: int | None = None, return_code: int | None = None, ) -> None: """Update the process status.""" # Just logging a warning for now in case I've missed some edge cases. if status not in self._VALID_TRANSITIONS[self._status]: logging.warning("Invalid status transition: %s -> %s", self._status, status) self._status = status if status == ProcessState.STOPPED: self._pid = 0 if status == ProcessState.RUNNING: self._return_code = self._EXIT_CODE_FOR_RUNNING_PROCESS # Set the PID if provided and not already set if pid is not None and self._pid == 0: self._pid = pid # Only set the return code if it was provided if return_code is not None: self._return_code = return_code
[docs] def get_status(self) -> ProcessStatus: """ Create a ProcessStatus object from current process state. :returns: A ProcessStatus object containing the current state """ return ProcessStatus( name=self.name, pid=self._pid, status=self._status, return_code=self._return_code, )
[docs] class ProcessManifest(BaseModel): """Pydantic model of each process that is being managed.""" processes: list[Process] """List of processes to be managed.""" control_server: str | None = None """Name of the control server implementation to use - must be provided by a plugin.""" kill_timeout: float = 5.0 """The amount of time to wait for the process to exit before forcibly killing everything.""" base_directory: Path | None = None """Base directory to use for all relative paths in the manifest.""" _manifest_path: Path | None = None
[docs] @model_validator(mode="after") def resolve_paths(self) -> "ProcessManifest": """ Resolve and validate paths for each process in the manifest. :returns: The updated manifest with resolved paths :raises ValueError: If any path is invalid or executable not found """ manifest_dir = self._manifest_path.parent if self._manifest_path else Path.cwd() for process in self.processes: # Check if the path has no separators and if the executable is on the PATH if os.sep not in str(process.path): executable_path = shutil.which(str(process.path)) if executable_path: process.path = Path(executable_path) else: logging.warning("%s not found in PATH.", process.path) # Normalize path separators and resolve relative paths if not process.path.is_absolute(): process.path = ( manifest_dir / process.path if self.base_directory is None else self.base_directory / process.path ) process.path = process.path.resolve() # Handle wildcard matches if "*" in str(process.path): matched_paths = ( list(process.path.parent.rglob(process.path.name)) if self.base_directory is None else list(self.base_directory.rglob(process.path.name)) ) if not matched_paths: error_message = f"No matches found for wildcard path: {process.path}" raise ValueError(error_message) if len(matched_paths) > 1: logging.warning( "Multiple matches found for wildcard path: %s: %s\n\nChoosing the first match.", process.path.name, matched_paths, ) process.path = Path(matched_paths[0]).resolve() # Validate that the executable exists if not process.path.exists() or not process.path.is_file(): error_message = f"Executable not found: {process.path.resolve()}" raise ValueError(error_message) return self
[docs] @model_validator(mode="after") def resolve_dependencies(self) -> "ProcessManifest": """ Resolve dependencies for each process in the manifest. :returns: The updated manifest with resolved dependencies """ process_dict = {process.name: process for process in self.processes} process_name_set: set[str] = set() for process in self.processes: resolved_dependencies = [] # Ensure no duplicate names in the manifest if process.name in process_name_set: error_message = f"Duplicate process name found: '{process.name}'" raise ValueError(error_message) process_name_set.add(process.name) for dep_name in process.dependencies: if dep_name in process_dict and isinstance(dep_name, str): resolved_dependencies.append(process_dict[dep_name]) else: error_message = f"Dependency '{dep_name}' for process '{process.name}' not found." raise ValueError(error_message) process.dependencies = resolved_dependencies return self
[docs] @model_validator(mode="after") def order_dependencies(self) -> "ProcessManifest": """ Orders the process list based on the dependencies of each process. :returns: The updated manifest with ordered dependencies :raises: ValueError if circular dependencies are detected """ ordered_processes = [] visited: set[str] = set() visiting: set[str] = set() def visit(process: Process) -> None: if process.name in visited: return if process.name in visiting: error_message = ( f"Circular dependency detected involving process {process.name} and process {list(visiting)[-1]}" ) raise ValueError(error_message) visiting.add(process.name) process.dependencies = cast(list[Process], process.dependencies) for dep in process.dependencies: visit(dep) visiting.remove(process.name) visited.add(process.name) ordered_processes.append(process) for process in self.processes: visit(process) self.processes = ordered_processes return self
[docs] @model_validator(mode="after") def validate_ready_config(self) -> "ProcessManifest": """Validate the ready strategy configuration.""" for p in self.processes: if p.ready_strategy is None: continue if p.ready_strategy in ("file", "pipe") and "path" not in p.ready_params: error_message = f"File and pipe ready strategies require 'path' parameter: {p.name}" raise ValueError(error_message) if p.ready_strategy in ("file", "pipe"): # We need to normalize paths to their target OS p.ready_params["path"] = str(Path(p.ready_params["path"])) if p.ready_strategy == "tcp" and "port" not in p.ready_params: error_message = f"TCP ready strategy requires 'port' parameter: {p.name}" raise ValueError(error_message) return self
[docs] @model_validator(mode="after") def validate_cpu_affinity(self) -> "ProcessManifest": """Validate that the CPU affinities that are set align with core counts.""" num_cores = psutil.cpu_count(logical=False) if num_cores is None: logging.error("Unable to determine hardware core counts--setting all process affinities to their defaults") for p in self.processes: p.affinity = None return self for p in self.processes: if p.affinity is None: continue if min(p.affinity) < 0: error_message = f"Affinity values must be between 0 and {num_cores - 1}" raise ValueError(error_message) if max(p.affinity) >= num_cores: error_message = f"Affinity core {max(p.affinity)} is out of range for process: {p.name}" raise ValueError(error_message) return self
[docs] @model_validator(mode="after") def set_working_directory(self) -> "ProcessManifest": """Set the working directory for each process. Defaults to the executable's parent directory.""" for p in self.processes: if p.working_directory is None: p.working_directory = p.path.parent else: p.working_directory = Path(p.working_directory) if not p.working_directory.is_dir(): error_message = f"Working directory does not exist: {p.working_directory}" raise ValueError(error_message) return self
[docs] @classmethod def from_json(cls, path: Path) -> "ProcessManifest": """ Load a JSON formatted process manifest. :param path: Path to the JSON file """ with path.open("r") as f: json_data = json.loads(f.read()) return cls(**json_data)
[docs] @classmethod def from_yaml(cls, path: Path) -> "ProcessManifest": """ Load a YAML formatted process manifest. :param path: Path to the YAML file """ with path.open("r") as f: yaml_data = yaml.safe_load(f) return cls(**yaml_data)