Source code for process_pilot.plugins.pipe_ready

"""
PipeReadyPlugin class.

The PipeReadyPlugin class which checks if a process is ready by verifying the existence
of 'ready' in a named pipe.
"""

import os
import sys
import time
from pathlib import Path
from typing import TYPE_CHECKING

from process_pilot.plugin import Plugin, ReadyStrategyType

if TYPE_CHECKING:
    from process_pilot.process import Process


[docs] class PipeReadyPlugin(Plugin): """Plugin that implements a named pipe (FIFO) based readiness check strategy."""
[docs] def get_ready_strategies(self) -> dict[str, ReadyStrategyType]: """ Register strategies for the plugin. :returns: A dictionary mapping strategy names to their corresponding functions. """ return { "pipe": self._wait_pipe_ready, }
def _wait_pipe_ready(self, process: "Process", ready_check_interval_secs: float) -> bool: """Wait for ready signal via named pipe.""" if sys.platform == "win32": return self._wait_pipe_ready_windows(process, ready_check_interval_secs) return self._wait_pipe_ready_unix(process, ready_check_interval_secs) def _wait_pipe_ready_windows(self, process: "Process", ready_check_interval_secs: float) -> bool: """Windows-specific named pipe implementation.""" try: if sys.platform != "win32": error_message = "Windows-specific pipe implementation called on non-Windows platform" raise RuntimeError(error_message) # Only import on Windows import win32file except ImportError: error_message = "win32pipe module required for Windows pipe support" raise RuntimeError(error_message) from None pipe_name = process.ready_params.get("path") pipe_path = f"\\\\.\\pipe{pipe_name}" pipe = None start_time = time.time() success = False while (time.time() - start_time) < process.ready_timeout_sec: try: # Open the named pipe pipe = win32file.CreateFile( pipe_path, win32file.GENERIC_READ, 0, # no sharing None, # default security attributes win32file.OPEN_EXISTING, 0, # default attributes None, # no template file ) result, data = win32file.ReadFile(pipe, 64 * 1024) # type: ignore[call-overload, unused-ignore] if result == 0: success = data.decode().strip() == "ready" break except Exception: # noqa: BLE001 TODO: Fix this time.sleep(ready_check_interval_secs) if pipe: win32file.CloseHandle(pipe) # type: ignore[arg-type, unused-ignore] return success def _wait_pipe_ready_unix(self, process: "Process", ready_check_interval_secs: float) -> bool: """Unix-specific FIFO implementation.""" pipe_path = process.ready_params.get("path") if not pipe_path: msg = "Path not specified for pipe ready strategy" raise RuntimeError(msg) pipe_path = Path(pipe_path) try: start_time = time.time() while (time.time() - start_time) < process.ready_timeout_sec: try: pipe_file_id = os.open(pipe_path, os.O_RDONLY | os.O_NONBLOCK) with os.fdopen(pipe_file_id) as fifo: data_read = fifo.read() if data_read.strip() == "ready": return True time.sleep(ready_check_interval_secs) continue except Exception: # noqa: BLE001 time.sleep(ready_check_interval_secs) return False finally: if pipe_path.exists(): pipe_path.unlink()