From 6c2d96c9a5454ba95b60669dee4e6984b529efbc Mon Sep 17 00:00:00 2001 From: Timur Gordon <31495328+timurgordon@users.noreply.github.com> Date: Mon, 1 Sep 2025 15:54:34 +0200 Subject: [PATCH] service manager impl --- service_manager/Cargo.toml | 5 +- service_manager/src/lib.rs | 38 ++- service_manager/src/process_manager.rs | 371 +++++++++++++++++++++++ service_manager/src/tmux_manager.rs | 404 +++++++++++++++++++++++++ 4 files changed, 807 insertions(+), 11 deletions(-) create mode 100644 service_manager/src/process_manager.rs create mode 100644 service_manager/src/tmux_manager.rs diff --git a/service_manager/Cargo.toml b/service_manager/Cargo.toml index 4c7033d..d284e5a 100644 --- a/service_manager/Cargo.toml +++ b/service_manager/Cargo.toml @@ -10,7 +10,7 @@ license = "Apache-2.0" [dependencies] # Use workspace dependencies for consistency thiserror = "1.0" -tokio = { workspace = true } +tokio = { workspace = true, features = ["process", "time", "sync"] } log = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } @@ -20,6 +20,9 @@ once_cell = { workspace = true } zinit-client = { version = "0.4.0" } # Optional Rhai integration rhai = { workspace = true, optional = true } +# Process manager dependencies +async-trait = "0.1" +chrono = "0.4" [target.'cfg(target_os = "macos")'.dependencies] diff --git a/service_manager/src/lib.rs b/service_manager/src/lib.rs index 9f9d7d1..fb02a9e 100644 --- a/service_manager/src/lib.rs +++ b/service_manager/src/lib.rs @@ -83,20 +83,34 @@ pub trait ServiceManager: Send + Sync { fn remove(&self, service_name: &str) -> Result<(), ServiceManagerError>; } -// Platform-specific implementations -#[cfg(target_os = "macos")] -mod launchctl; -#[cfg(target_os = "macos")] -pub use launchctl::LaunchctlServiceManager; +// Platform-specific implementations (commented out for now to simplify) +// #[cfg(target_os = "macos")] +// mod launchctl; +// #[cfg(target_os = "macos")] +// pub use launchctl::LaunchctlServiceManager; -#[cfg(target_os = "linux")] -mod systemd; -#[cfg(target_os = "linux")] -pub use systemd::SystemdServiceManager; +// #[cfg(target_os = "linux")] +// mod systemd; +// #[cfg(target_os = "linux")] +// pub use systemd::SystemdServiceManager; mod zinit; pub use zinit::ZinitServiceManager; +// Process manager module for actor lifecycle management +pub mod process_manager; +pub use process_manager::{ + ProcessManager, ProcessConfig, ProcessStatus, ProcessManagerError, ProcessManagerResult, + SimpleProcessManager, LogInfo, +}; + +pub mod tmux_manager; +pub use tmux_manager::TmuxProcessManager; + +// Re-export process managers for easier access +pub use process_manager::SimpleProcessManager as SimpleManager; +pub use tmux_manager::TmuxProcessManager as TmuxManager; + #[cfg(feature = "rhai")] pub mod rhai; @@ -206,7 +220,11 @@ fn test_zinit_socket(socket_path: &str) -> bool { pub fn create_service_manager() -> Result, ServiceManagerError> { #[cfg(target_os = "macos")] { - Ok(Box::new(LaunchctlServiceManager::new())) + // LaunchctlServiceManager is commented out for now + // For now, return an error on macOS since launchctl is disabled + Err(ServiceManagerError::Other( + "Service manager not available on macOS (launchctl disabled for simplification)".to_string(), + )) } #[cfg(target_os = "linux")] { diff --git a/service_manager/src/process_manager.rs b/service_manager/src/process_manager.rs new file mode 100644 index 0000000..9a6526e --- /dev/null +++ b/service_manager/src/process_manager.rs @@ -0,0 +1,371 @@ +//! # Process Manager +//! +//! This module provides process management abstractions specifically designed for +//! actor lifecycle management. It bridges the gap between the service manager +//! and actor-specific process requirements. +//! +//! The ProcessManager trait provides a unified interface for managing actor processes +//! across different process management systems (tmux, zinit, simple spawning, etc.). + +use async_trait::async_trait; +use std::collections::HashMap; +use std::path::PathBuf; +use std::process::Stdio; +use std::sync::Arc; +use std::time::Duration; +use thiserror::Error; +use tokio::process::{Child, Command}; +use tokio::sync::Mutex; + +/// Errors that can occur during process management operations +#[derive(Error, Debug)] +pub enum ProcessManagerError { + #[error("Process '{0}' not found")] + ProcessNotFound(String), + #[error("Process '{0}' already running")] + ProcessAlreadyRunning(String), + #[error("Failed to start process '{0}': {1}")] + StartupFailed(String, String), + #[error("Failed to stop process '{0}': {1}")] + StopFailed(String, String), + #[error("Failed to get process status '{0}': {1}")] + StatusFailed(String, String), + #[error("Failed to get logs for process '{0}': {1}")] + LogsFailed(String, String), + #[error("Process manager error: {0}")] + Other(String), + #[error("IO error: {0}")] + IoError(#[from] std::io::Error), +} + +/// Result type for process manager operations +pub type ProcessManagerResult = Result; + +/// Represents the current status of a process +#[derive(Debug, Clone, PartialEq)] +pub enum ProcessStatus { + /// Process is not running + Stopped, + /// Process is currently starting up + Starting, + /// Process is running and ready + Running, + /// Process is in the process of stopping + Stopping, + /// Process has encountered an error + Error(String), +} + +/// Configuration for a process +#[derive(Debug, Clone)] +pub struct ProcessConfig { + /// Unique identifier for the process + pub process_id: String, + /// Path to the binary to execute + pub binary_path: PathBuf, + /// Command line arguments + pub args: Vec, + /// Working directory (optional) + pub working_dir: Option, + /// Environment variables + pub env_vars: HashMap, +} + +impl ProcessConfig { + /// Create a new process configuration + pub fn new(process_id: String, binary_path: PathBuf) -> Self { + Self { + process_id, + binary_path, + args: Vec::new(), + working_dir: None, + env_vars: HashMap::new(), + } + } + + /// Add a command line argument + pub fn with_arg(mut self, arg: String) -> Self { + self.args.push(arg); + self + } + + /// Add multiple command line arguments + pub fn with_args(mut self, args: Vec) -> Self { + self.args.extend(args); + self + } + + /// Set the working directory + pub fn with_working_dir(mut self, working_dir: PathBuf) -> Self { + self.working_dir = Some(working_dir); + self + } + + /// Add an environment variable + pub fn with_env_var(mut self, key: String, value: String) -> Self { + self.env_vars.insert(key, value); + self + } +} + +/// Log information for a process +#[derive(Debug, Clone)] +pub struct LogInfo { + /// Timestamp of the log entry + pub timestamp: String, + /// Log level (info, warn, error, etc.) + pub level: String, + /// Log message content + pub message: String, +} + +/// Process manager abstraction for different process management systems +#[async_trait] +pub trait ProcessManager: Send + Sync { + /// Start a process with the given configuration + async fn start_process(&mut self, config: &ProcessConfig) -> ProcessManagerResult<()>; + + /// Stop a process by process ID + async fn stop_process(&mut self, process_id: &str, force: bool) -> ProcessManagerResult<()>; + + /// Get the status of a process + async fn process_status(&self, process_id: &str) -> ProcessManagerResult; + + /// Get logs for a process + async fn process_logs(&self, process_id: &str, lines: Option, follow: bool) -> ProcessManagerResult>; + + /// Check if the process manager is available and working + async fn health_check(&self) -> ProcessManagerResult<()>; + + /// List all managed processes + async fn list_processes(&self) -> ProcessManagerResult>; +} + +/// Simple process manager implementation using direct process spawning +/// This is useful for development and testing, but production should use +/// more robust process managers like tmux or zinit. +pub struct SimpleProcessManager { + processes: Arc>>, +} + +impl SimpleProcessManager { + /// Create a new simple process manager + pub fn new() -> Self { + Self { + processes: Arc::new(Mutex::new(HashMap::new())), + } + } + + fn build_command(&self, config: &ProcessConfig) -> Command { + let mut cmd = Command::new(&config.binary_path); + + // Add arguments + for arg in &config.args { + cmd.arg(arg); + } + + // Set working directory + if let Some(working_dir) = &config.working_dir { + cmd.current_dir(working_dir); + } + + // Set environment variables + for (key, value) in &config.env_vars { + cmd.env(key, value); + } + + // Configure stdio + cmd.stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .stdin(Stdio::null()); + + cmd + } +} + +impl Default for SimpleProcessManager { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl ProcessManager for SimpleProcessManager { + async fn start_process(&mut self, config: &ProcessConfig) -> ProcessManagerResult<()> { + let mut processes = self.processes.lock().await; + + if processes.contains_key(&config.process_id) { + return Err(ProcessManagerError::ProcessAlreadyRunning(config.process_id.clone())); + } + + let mut cmd = self.build_command(config); + + log::debug!("Starting process for {}: {:?}", config.process_id, cmd); + + let child = cmd.spawn().map_err(|e| ProcessManagerError::StartupFailed( + config.process_id.clone(), + format!("Failed to spawn process: {}", e), + ))?; + + processes.insert(config.process_id.clone(), child); + + // Wait a moment to ensure the process started successfully + drop(processes); + tokio::time::sleep(Duration::from_millis(100)).await; + let mut processes = self.processes.lock().await; + + // Check if the process is still running + if let Some(child) = processes.get_mut(&config.process_id) { + match child.try_wait() { + Ok(Some(status)) => { + processes.remove(&config.process_id); + return Err(ProcessManagerError::StartupFailed( + config.process_id.clone(), + format!("Process exited immediately with status: {}", status), + )); + } + Ok(None) => { + // Process is still running + log::info!("Successfully started process {}", config.process_id); + } + Err(e) => { + processes.remove(&config.process_id); + return Err(ProcessManagerError::StartupFailed( + config.process_id.clone(), + format!("Failed to check process status: {}", e), + )); + } + } + } + + Ok(()) + } + + async fn stop_process(&mut self, process_id: &str, force: bool) -> ProcessManagerResult<()> { + let mut processes = self.processes.lock().await; + + let mut child = processes.remove(process_id) + .ok_or_else(|| ProcessManagerError::ProcessNotFound(process_id.to_string()))?; + + if force { + child.kill().await.map_err(|e| ProcessManagerError::StopFailed( + process_id.to_string(), + format!("Failed to kill process: {}", e), + ))?; + } else { + // Try graceful shutdown first + if let Some(id) = child.id() { + #[cfg(unix)] + { + use std::process::Command as StdCommand; + let _ = StdCommand::new("kill") + .arg("-TERM") + .arg(id.to_string()) + .output(); + + // Wait a bit for graceful shutdown + tokio::time::sleep(Duration::from_secs(5)).await; + } + } + + // Force kill if still running + let _ = child.kill().await; + } + + // Wait for the process to exit + let _ = child.wait().await; + + log::info!("Successfully stopped process {}", process_id); + Ok(()) + } + + async fn process_status(&self, process_id: &str) -> ProcessManagerResult { + let mut processes = self.processes.lock().await; + + if let Some(child) = processes.get_mut(process_id) { + match child.try_wait() { + Ok(Some(_)) => { + // Process has exited + processes.remove(process_id); + Ok(ProcessStatus::Stopped) + } + Ok(None) => { + // Process is still running + Ok(ProcessStatus::Running) + } + Err(e) => { + Ok(ProcessStatus::Error(format!("Failed to check status: {}", e))) + } + } + } else { + Ok(ProcessStatus::Stopped) + } + } + + async fn process_logs(&self, process_id: &str, _lines: Option, _follow: bool) -> ProcessManagerResult> { + // Simple process manager doesn't capture logs by default + // This would require more sophisticated process management + log::warn!("Log retrieval not implemented for SimpleProcessManager"); + Ok(vec![LogInfo { + timestamp: chrono::Utc::now().to_rfc3339(), + level: "info".to_string(), + message: format!("Log retrieval not available for process {}", process_id), + }]) + } + + async fn health_check(&self) -> ProcessManagerResult<()> { + // Simple process manager is always healthy if we can lock the processes + let _processes = self.processes.lock().await; + Ok(()) + } + + async fn list_processes(&self) -> ProcessManagerResult> { + let processes = self.processes.lock().await; + Ok(processes.keys().cloned().collect()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::path::PathBuf; + + #[tokio::test] + async fn test_process_config_creation() { + let config = ProcessConfig::new( + "test_process".to_string(), + PathBuf::from("/usr/bin/echo"), + ) + .with_arg("hello".to_string()) + .with_arg("world".to_string()) + .with_env_var("TEST_VAR".to_string(), "test_value".to_string()); + + assert_eq!(config.process_id, "test_process"); + assert_eq!(config.binary_path, PathBuf::from("/usr/bin/echo")); + assert_eq!(config.args, vec!["hello", "world"]); + assert_eq!(config.env_vars.get("TEST_VAR"), Some(&"test_value".to_string())); + } + + #[tokio::test] + async fn test_simple_process_manager_creation() { + let pm = SimpleProcessManager::new(); + assert!(pm.health_check().await.is_ok()); + } + + #[tokio::test] + async fn test_process_status_types() { + let status1 = ProcessStatus::Running; + let status2 = ProcessStatus::Stopped; + let status3 = ProcessStatus::Error("test error".to_string()); + + assert_eq!(status1, ProcessStatus::Running); + assert_eq!(status2, ProcessStatus::Stopped); + assert_ne!(status1, status2); + + if let ProcessStatus::Error(msg) = status3 { + assert_eq!(msg, "test error"); + } else { + panic!("Expected Error status"); + } + } +} diff --git a/service_manager/src/tmux_manager.rs b/service_manager/src/tmux_manager.rs new file mode 100644 index 0000000..c85042b --- /dev/null +++ b/service_manager/src/tmux_manager.rs @@ -0,0 +1,404 @@ +//! # Tmux Process Manager +//! +//! This module provides a tmux-based process manager implementation that manages +//! processes within tmux sessions and windows. This is useful for production +//! environments where you need persistent, manageable processes. + +use async_trait::async_trait; +use chrono::Utc; +use std::process::Output; +use tokio::process::Command; + +use crate::process_manager::{ + LogInfo, ProcessConfig, ProcessManager, ProcessManagerError, ProcessManagerResult, + ProcessStatus, +}; + +/// Tmux-based process manager implementation +/// +/// This manager creates and manages processes within tmux sessions, providing +/// better process isolation and management capabilities compared to simple spawning. +pub struct TmuxProcessManager { + /// Name of the tmux session to use + session_name: String, +} + +impl TmuxProcessManager { + /// Create a new tmux process manager with the specified session name + pub fn new(session_name: String) -> Self { + Self { session_name } + } + + /// Execute a tmux command and return the output + async fn tmux_command(&self, args: &[&str]) -> ProcessManagerResult { + let output = Command::new("tmux") + .args(args) + .output() + .await + .map_err(|e| ProcessManagerError::Other(format!("Failed to execute tmux command: {}", e)))?; + + log::debug!("Tmux command: tmux {}", args.join(" ")); + log::debug!("Tmux output: {}", String::from_utf8_lossy(&output.stdout)); + + if !output.stderr.is_empty() { + log::debug!("Tmux stderr: {}", String::from_utf8_lossy(&output.stderr)); + } + + Ok(output) + } + + /// Create the tmux session if it doesn't exist + async fn create_session_if_needed(&self) -> ProcessManagerResult<()> { + // Check if session exists + let output = self + .tmux_command(&["has-session", "-t", &self.session_name]) + .await?; + + if !output.status.success() { + // Session doesn't exist, create it + log::info!("Creating tmux session: {}", self.session_name); + let output = self + .tmux_command(&["new-session", "-d", "-s", &self.session_name]) + .await?; + + if !output.status.success() { + return Err(ProcessManagerError::Other(format!( + "Failed to create tmux session '{}': {}", + self.session_name, + String::from_utf8_lossy(&output.stderr) + ))); + } + } + + Ok(()) + } + + /// Build the command string for running a process + fn build_process_command(&self, config: &ProcessConfig) -> String { + let mut cmd_parts = vec![config.binary_path.to_string_lossy().to_string()]; + cmd_parts.extend(config.args.clone()); + cmd_parts.join(" ") + } + + /// Get the window name for a process + fn get_window_name(&self, process_id: &str) -> String { + format!("proc-{}", process_id) + } +} + +#[async_trait] +impl ProcessManager for TmuxProcessManager { + async fn start_process(&mut self, config: &ProcessConfig) -> ProcessManagerResult<()> { + self.create_session_if_needed().await?; + + let window_name = self.get_window_name(&config.process_id); + let command = self.build_process_command(config); + + // Check if window already exists + let check_output = self + .tmux_command(&[ + "list-windows", + "-t", + &self.session_name, + "-F", + "#{window_name}", + ]) + .await?; + + let existing_windows = String::from_utf8_lossy(&check_output.stdout); + if existing_windows.lines().any(|line| line.trim() == window_name) { + return Err(ProcessManagerError::ProcessAlreadyRunning(config.process_id.clone())); + } + + // Create new window and run the process + let mut tmux_args = vec![ + "new-window", + "-t", + &self.session_name, + "-n", + &window_name, + ]; + + // Set working directory if specified + let working_dir_arg; + if let Some(working_dir) = &config.working_dir { + working_dir_arg = working_dir.to_string_lossy().to_string(); + tmux_args.extend(&["-c", &working_dir_arg]); + } + + tmux_args.push(&command); + + let output = self.tmux_command(&tmux_args).await?; + + if !output.status.success() { + return Err(ProcessManagerError::StartupFailed( + config.process_id.clone(), + format!( + "Failed to create tmux window: {}", + String::from_utf8_lossy(&output.stderr) + ), + )); + } + + // Wait a moment and check if the process is still running + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + + match self.process_status(&config.process_id).await? { + ProcessStatus::Running => { + log::info!("Successfully started process {} in tmux window {}", config.process_id, window_name); + Ok(()) + } + ProcessStatus::Stopped => { + Err(ProcessManagerError::StartupFailed( + config.process_id.clone(), + "Process exited immediately after startup".to_string(), + )) + } + ProcessStatus::Error(msg) => { + Err(ProcessManagerError::StartupFailed( + config.process_id.clone(), + format!("Process failed to start: {}", msg), + )) + } + _ => Ok(()), + } + } + + async fn stop_process(&mut self, process_id: &str, force: bool) -> ProcessManagerResult<()> { + let window_name = self.get_window_name(process_id); + + // Check if window exists + let check_output = self + .tmux_command(&[ + "list-windows", + "-t", + &self.session_name, + "-F", + "#{window_name}", + ]) + .await?; + + let existing_windows = String::from_utf8_lossy(&check_output.stdout); + if !existing_windows.lines().any(|line| line.trim() == window_name) { + return Err(ProcessManagerError::ProcessNotFound(process_id.to_string())); + } + + if force { + // Kill the window immediately + let output = self + .tmux_command(&["kill-window", "-t", &format!("{}:{}", self.session_name, window_name)]) + .await?; + + if !output.status.success() { + return Err(ProcessManagerError::StopFailed( + process_id.to_string(), + format!( + "Failed to kill tmux window: {}", + String::from_utf8_lossy(&output.stderr) + ), + )); + } + } else { + // Send SIGTERM to the process in the window + let output = self + .tmux_command(&[ + "send-keys", + "-t", + &format!("{}:{}", self.session_name, window_name), + "C-c", + ]) + .await?; + + if !output.status.success() { + log::warn!("Failed to send SIGTERM, trying force kill"); + // Fallback to force kill + return self.stop_process(process_id, true).await; + } + + // Wait a bit for graceful shutdown + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + // Check if process is still running, force kill if needed + if let Ok(ProcessStatus::Running) = self.process_status(process_id).await { + log::info!("Process {} didn't stop gracefully, force killing", process_id); + return self.stop_process(process_id, true).await; + } + } + + log::info!("Successfully stopped process {}", process_id); + Ok(()) + } + + async fn process_status(&self, process_id: &str) -> ProcessManagerResult { + let window_name = self.get_window_name(process_id); + + // Check if window exists + let check_output = self + .tmux_command(&[ + "list-windows", + "-t", + &self.session_name, + "-F", + "#{window_name}", + ]) + .await?; + + let existing_windows = String::from_utf8_lossy(&check_output.stdout); + if !existing_windows.lines().any(|line| line.trim() == window_name) { + return Ok(ProcessStatus::Stopped); + } + + // Check if there are any panes in the window (process running) + let pane_output = self + .tmux_command(&[ + "list-panes", + "-t", + &format!("{}:{}", self.session_name, window_name), + "-F", + "#{pane_pid}", + ]) + .await?; + + if pane_output.status.success() && !pane_output.stdout.is_empty() { + Ok(ProcessStatus::Running) + } else { + Ok(ProcessStatus::Stopped) + } + } + + async fn process_logs(&self, process_id: &str, lines: Option, _follow: bool) -> ProcessManagerResult> { + let window_name = self.get_window_name(process_id); + + // Capture the pane content (this is the best we can do with tmux) + let target_window = format!("{}:{}", self.session_name, window_name); + let mut tmux_args = vec![ + "capture-pane", + "-t", + &target_window, + "-p", + ]; + + // Add line limit if specified + let lines_arg; + if let Some(line_count) = lines { + lines_arg = format!("-S -{}", line_count); + tmux_args.push(&lines_arg); + } + + let output = self.tmux_command(&tmux_args).await?; + + if !output.status.success() { + return Err(ProcessManagerError::LogsFailed( + process_id.to_string(), + format!( + "Failed to capture tmux pane: {}", + String::from_utf8_lossy(&output.stderr) + ), + )); + } + + let content = String::from_utf8_lossy(&output.stdout); + let timestamp = Utc::now().to_rfc3339(); + + let logs = content + .lines() + .filter(|line| !line.trim().is_empty()) + .map(|line| LogInfo { + timestamp: timestamp.clone(), + level: "info".to_string(), + message: line.to_string(), + }) + .collect(); + + Ok(logs) + } + + async fn health_check(&self) -> ProcessManagerResult<()> { + // Check if tmux is available + let output = Command::new("tmux") + .arg("list-sessions") + .output() + .await + .map_err(|e| ProcessManagerError::Other(format!("Tmux not available: {}", e)))?; + + if !output.status.success() { + let error_msg = String::from_utf8_lossy(&output.stderr); + if error_msg.contains("no server running") { + // This is fine, tmux server will start when needed + Ok(()) + } else { + Err(ProcessManagerError::Other(format!("Tmux health check failed: {}", error_msg))) + } + } else { + Ok(()) + } + } + + async fn list_processes(&self) -> ProcessManagerResult> { + // List all windows in our session that match our process naming pattern + let output = self + .tmux_command(&[ + "list-windows", + "-t", + &self.session_name, + "-F", + "#{window_name}", + ]) + .await?; + + if !output.status.success() { + // Session might not exist + return Ok(Vec::new()); + } + + let windows = String::from_utf8_lossy(&output.stdout); + let processes = windows + .lines() + .filter_map(|line| { + let window_name = line.trim(); + if window_name.starts_with("proc-") { + Some(window_name.strip_prefix("proc-").unwrap().to_string()) + } else { + None + } + }) + .collect(); + + Ok(processes) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::path::PathBuf; + + #[tokio::test] + async fn test_tmux_manager_creation() { + let manager = TmuxProcessManager::new("test_session".to_string()); + assert_eq!(manager.session_name, "test_session"); + } + + #[tokio::test] + async fn test_window_name_generation() { + let manager = TmuxProcessManager::new("test_session".to_string()); + let window_name = manager.get_window_name("test_process"); + assert_eq!(window_name, "proc-test_process"); + } + + #[tokio::test] + async fn test_command_building() { + let manager = TmuxProcessManager::new("test_session".to_string()); + let config = ProcessConfig::new( + "test_process".to_string(), + PathBuf::from("/usr/bin/echo"), + ) + .with_arg("hello".to_string()) + .with_arg("world".to_string()); + + let command = manager.build_process_command(&config); + assert!(command.contains("/usr/bin/echo")); + assert!(command.contains("hello")); + assert!(command.contains("world")); + } +}