service manager impl

This commit is contained in:
Timur Gordon
2025-09-01 15:54:34 +02:00
parent 7856fc0a4e
commit 6c2d96c9a5
4 changed files with 807 additions and 11 deletions

View File

@@ -10,7 +10,7 @@ license = "Apache-2.0"
[dependencies] [dependencies]
# Use workspace dependencies for consistency # Use workspace dependencies for consistency
thiserror = "1.0" thiserror = "1.0"
tokio = { workspace = true } tokio = { workspace = true, features = ["process", "time", "sync"] }
log = { workspace = true } log = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }
@@ -20,6 +20,9 @@ once_cell = { workspace = true }
zinit-client = { version = "0.4.0" } zinit-client = { version = "0.4.0" }
# Optional Rhai integration # Optional Rhai integration
rhai = { workspace = true, optional = true } rhai = { workspace = true, optional = true }
# Process manager dependencies
async-trait = "0.1"
chrono = "0.4"
[target.'cfg(target_os = "macos")'.dependencies] [target.'cfg(target_os = "macos")'.dependencies]

View File

@@ -83,20 +83,34 @@ pub trait ServiceManager: Send + Sync {
fn remove(&self, service_name: &str) -> Result<(), ServiceManagerError>; fn remove(&self, service_name: &str) -> Result<(), ServiceManagerError>;
} }
// Platform-specific implementations // Platform-specific implementations (commented out for now to simplify)
#[cfg(target_os = "macos")] // #[cfg(target_os = "macos")]
mod launchctl; // mod launchctl;
#[cfg(target_os = "macos")] // #[cfg(target_os = "macos")]
pub use launchctl::LaunchctlServiceManager; // pub use launchctl::LaunchctlServiceManager;
#[cfg(target_os = "linux")] // #[cfg(target_os = "linux")]
mod systemd; // mod systemd;
#[cfg(target_os = "linux")] // #[cfg(target_os = "linux")]
pub use systemd::SystemdServiceManager; // pub use systemd::SystemdServiceManager;
mod zinit; mod zinit;
pub use zinit::ZinitServiceManager; pub use zinit::ZinitServiceManager;
// Process manager module for actor lifecycle management
pub mod process_manager;
pub use process_manager::{
ProcessManager, ProcessConfig, ProcessStatus, ProcessManagerError, ProcessManagerResult,
SimpleProcessManager, LogInfo,
};
pub mod tmux_manager;
pub use tmux_manager::TmuxProcessManager;
// Re-export process managers for easier access
pub use process_manager::SimpleProcessManager as SimpleManager;
pub use tmux_manager::TmuxProcessManager as TmuxManager;
#[cfg(feature = "rhai")] #[cfg(feature = "rhai")]
pub mod rhai; pub mod rhai;
@@ -206,7 +220,11 @@ fn test_zinit_socket(socket_path: &str) -> bool {
pub fn create_service_manager() -> Result<Box<dyn ServiceManager>, ServiceManagerError> { pub fn create_service_manager() -> Result<Box<dyn ServiceManager>, ServiceManagerError> {
#[cfg(target_os = "macos")] #[cfg(target_os = "macos")]
{ {
Ok(Box::new(LaunchctlServiceManager::new())) // LaunchctlServiceManager is commented out for now
// For now, return an error on macOS since launchctl is disabled
Err(ServiceManagerError::Other(
"Service manager not available on macOS (launchctl disabled for simplification)".to_string(),
))
} }
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
{ {

View File

@@ -0,0 +1,371 @@
//! # Process Manager
//!
//! This module provides process management abstractions specifically designed for
//! actor lifecycle management. It bridges the gap between the service manager
//! and actor-specific process requirements.
//!
//! The ProcessManager trait provides a unified interface for managing actor processes
//! across different process management systems (tmux, zinit, simple spawning, etc.).
use async_trait::async_trait;
use std::collections::HashMap;
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::process::{Child, Command};
use tokio::sync::Mutex;
/// Errors that can occur during process management operations
#[derive(Error, Debug)]
pub enum ProcessManagerError {
#[error("Process '{0}' not found")]
ProcessNotFound(String),
#[error("Process '{0}' already running")]
ProcessAlreadyRunning(String),
#[error("Failed to start process '{0}': {1}")]
StartupFailed(String, String),
#[error("Failed to stop process '{0}': {1}")]
StopFailed(String, String),
#[error("Failed to get process status '{0}': {1}")]
StatusFailed(String, String),
#[error("Failed to get logs for process '{0}': {1}")]
LogsFailed(String, String),
#[error("Process manager error: {0}")]
Other(String),
#[error("IO error: {0}")]
IoError(#[from] std::io::Error),
}
/// Result type for process manager operations
pub type ProcessManagerResult<T> = Result<T, ProcessManagerError>;
/// Represents the current status of a process
#[derive(Debug, Clone, PartialEq)]
pub enum ProcessStatus {
/// Process is not running
Stopped,
/// Process is currently starting up
Starting,
/// Process is running and ready
Running,
/// Process is in the process of stopping
Stopping,
/// Process has encountered an error
Error(String),
}
/// Configuration for a process
#[derive(Debug, Clone)]
pub struct ProcessConfig {
/// Unique identifier for the process
pub process_id: String,
/// Path to the binary to execute
pub binary_path: PathBuf,
/// Command line arguments
pub args: Vec<String>,
/// Working directory (optional)
pub working_dir: Option<PathBuf>,
/// Environment variables
pub env_vars: HashMap<String, String>,
}
impl ProcessConfig {
/// Create a new process configuration
pub fn new(process_id: String, binary_path: PathBuf) -> Self {
Self {
process_id,
binary_path,
args: Vec::new(),
working_dir: None,
env_vars: HashMap::new(),
}
}
/// Add a command line argument
pub fn with_arg(mut self, arg: String) -> Self {
self.args.push(arg);
self
}
/// Add multiple command line arguments
pub fn with_args(mut self, args: Vec<String>) -> Self {
self.args.extend(args);
self
}
/// Set the working directory
pub fn with_working_dir(mut self, working_dir: PathBuf) -> Self {
self.working_dir = Some(working_dir);
self
}
/// Add an environment variable
pub fn with_env_var(mut self, key: String, value: String) -> Self {
self.env_vars.insert(key, value);
self
}
}
/// Log information for a process
#[derive(Debug, Clone)]
pub struct LogInfo {
/// Timestamp of the log entry
pub timestamp: String,
/// Log level (info, warn, error, etc.)
pub level: String,
/// Log message content
pub message: String,
}
/// Process manager abstraction for different process management systems
#[async_trait]
pub trait ProcessManager: Send + Sync {
/// Start a process with the given configuration
async fn start_process(&mut self, config: &ProcessConfig) -> ProcessManagerResult<()>;
/// Stop a process by process ID
async fn stop_process(&mut self, process_id: &str, force: bool) -> ProcessManagerResult<()>;
/// Get the status of a process
async fn process_status(&self, process_id: &str) -> ProcessManagerResult<ProcessStatus>;
/// Get logs for a process
async fn process_logs(&self, process_id: &str, lines: Option<usize>, follow: bool) -> ProcessManagerResult<Vec<LogInfo>>;
/// Check if the process manager is available and working
async fn health_check(&self) -> ProcessManagerResult<()>;
/// List all managed processes
async fn list_processes(&self) -> ProcessManagerResult<Vec<String>>;
}
/// Simple process manager implementation using direct process spawning
/// This is useful for development and testing, but production should use
/// more robust process managers like tmux or zinit.
pub struct SimpleProcessManager {
processes: Arc<Mutex<HashMap<String, Child>>>,
}
impl SimpleProcessManager {
/// Create a new simple process manager
pub fn new() -> Self {
Self {
processes: Arc::new(Mutex::new(HashMap::new())),
}
}
fn build_command(&self, config: &ProcessConfig) -> Command {
let mut cmd = Command::new(&config.binary_path);
// Add arguments
for arg in &config.args {
cmd.arg(arg);
}
// Set working directory
if let Some(working_dir) = &config.working_dir {
cmd.current_dir(working_dir);
}
// Set environment variables
for (key, value) in &config.env_vars {
cmd.env(key, value);
}
// Configure stdio
cmd.stdout(Stdio::piped())
.stderr(Stdio::piped())
.stdin(Stdio::null());
cmd
}
}
impl Default for SimpleProcessManager {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl ProcessManager for SimpleProcessManager {
async fn start_process(&mut self, config: &ProcessConfig) -> ProcessManagerResult<()> {
let mut processes = self.processes.lock().await;
if processes.contains_key(&config.process_id) {
return Err(ProcessManagerError::ProcessAlreadyRunning(config.process_id.clone()));
}
let mut cmd = self.build_command(config);
log::debug!("Starting process for {}: {:?}", config.process_id, cmd);
let child = cmd.spawn().map_err(|e| ProcessManagerError::StartupFailed(
config.process_id.clone(),
format!("Failed to spawn process: {}", e),
))?;
processes.insert(config.process_id.clone(), child);
// Wait a moment to ensure the process started successfully
drop(processes);
tokio::time::sleep(Duration::from_millis(100)).await;
let mut processes = self.processes.lock().await;
// Check if the process is still running
if let Some(child) = processes.get_mut(&config.process_id) {
match child.try_wait() {
Ok(Some(status)) => {
processes.remove(&config.process_id);
return Err(ProcessManagerError::StartupFailed(
config.process_id.clone(),
format!("Process exited immediately with status: {}", status),
));
}
Ok(None) => {
// Process is still running
log::info!("Successfully started process {}", config.process_id);
}
Err(e) => {
processes.remove(&config.process_id);
return Err(ProcessManagerError::StartupFailed(
config.process_id.clone(),
format!("Failed to check process status: {}", e),
));
}
}
}
Ok(())
}
async fn stop_process(&mut self, process_id: &str, force: bool) -> ProcessManagerResult<()> {
let mut processes = self.processes.lock().await;
let mut child = processes.remove(process_id)
.ok_or_else(|| ProcessManagerError::ProcessNotFound(process_id.to_string()))?;
if force {
child.kill().await.map_err(|e| ProcessManagerError::StopFailed(
process_id.to_string(),
format!("Failed to kill process: {}", e),
))?;
} else {
// Try graceful shutdown first
if let Some(id) = child.id() {
#[cfg(unix)]
{
use std::process::Command as StdCommand;
let _ = StdCommand::new("kill")
.arg("-TERM")
.arg(id.to_string())
.output();
// Wait a bit for graceful shutdown
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
// Force kill if still running
let _ = child.kill().await;
}
// Wait for the process to exit
let _ = child.wait().await;
log::info!("Successfully stopped process {}", process_id);
Ok(())
}
async fn process_status(&self, process_id: &str) -> ProcessManagerResult<ProcessStatus> {
let mut processes = self.processes.lock().await;
if let Some(child) = processes.get_mut(process_id) {
match child.try_wait() {
Ok(Some(_)) => {
// Process has exited
processes.remove(process_id);
Ok(ProcessStatus::Stopped)
}
Ok(None) => {
// Process is still running
Ok(ProcessStatus::Running)
}
Err(e) => {
Ok(ProcessStatus::Error(format!("Failed to check status: {}", e)))
}
}
} else {
Ok(ProcessStatus::Stopped)
}
}
async fn process_logs(&self, process_id: &str, _lines: Option<usize>, _follow: bool) -> ProcessManagerResult<Vec<LogInfo>> {
// Simple process manager doesn't capture logs by default
// This would require more sophisticated process management
log::warn!("Log retrieval not implemented for SimpleProcessManager");
Ok(vec![LogInfo {
timestamp: chrono::Utc::now().to_rfc3339(),
level: "info".to_string(),
message: format!("Log retrieval not available for process {}", process_id),
}])
}
async fn health_check(&self) -> ProcessManagerResult<()> {
// Simple process manager is always healthy if we can lock the processes
let _processes = self.processes.lock().await;
Ok(())
}
async fn list_processes(&self) -> ProcessManagerResult<Vec<String>> {
let processes = self.processes.lock().await;
Ok(processes.keys().cloned().collect())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
#[tokio::test]
async fn test_process_config_creation() {
let config = ProcessConfig::new(
"test_process".to_string(),
PathBuf::from("/usr/bin/echo"),
)
.with_arg("hello".to_string())
.with_arg("world".to_string())
.with_env_var("TEST_VAR".to_string(), "test_value".to_string());
assert_eq!(config.process_id, "test_process");
assert_eq!(config.binary_path, PathBuf::from("/usr/bin/echo"));
assert_eq!(config.args, vec!["hello", "world"]);
assert_eq!(config.env_vars.get("TEST_VAR"), Some(&"test_value".to_string()));
}
#[tokio::test]
async fn test_simple_process_manager_creation() {
let pm = SimpleProcessManager::new();
assert!(pm.health_check().await.is_ok());
}
#[tokio::test]
async fn test_process_status_types() {
let status1 = ProcessStatus::Running;
let status2 = ProcessStatus::Stopped;
let status3 = ProcessStatus::Error("test error".to_string());
assert_eq!(status1, ProcessStatus::Running);
assert_eq!(status2, ProcessStatus::Stopped);
assert_ne!(status1, status2);
if let ProcessStatus::Error(msg) = status3 {
assert_eq!(msg, "test error");
} else {
panic!("Expected Error status");
}
}
}

View File

@@ -0,0 +1,404 @@
//! # Tmux Process Manager
//!
//! This module provides a tmux-based process manager implementation that manages
//! processes within tmux sessions and windows. This is useful for production
//! environments where you need persistent, manageable processes.
use async_trait::async_trait;
use chrono::Utc;
use std::process::Output;
use tokio::process::Command;
use crate::process_manager::{
LogInfo, ProcessConfig, ProcessManager, ProcessManagerError, ProcessManagerResult,
ProcessStatus,
};
/// Tmux-based process manager implementation
///
/// This manager creates and manages processes within tmux sessions, providing
/// better process isolation and management capabilities compared to simple spawning.
pub struct TmuxProcessManager {
/// Name of the tmux session to use
session_name: String,
}
impl TmuxProcessManager {
/// Create a new tmux process manager with the specified session name
pub fn new(session_name: String) -> Self {
Self { session_name }
}
/// Execute a tmux command and return the output
async fn tmux_command(&self, args: &[&str]) -> ProcessManagerResult<Output> {
let output = Command::new("tmux")
.args(args)
.output()
.await
.map_err(|e| ProcessManagerError::Other(format!("Failed to execute tmux command: {}", e)))?;
log::debug!("Tmux command: tmux {}", args.join(" "));
log::debug!("Tmux output: {}", String::from_utf8_lossy(&output.stdout));
if !output.stderr.is_empty() {
log::debug!("Tmux stderr: {}", String::from_utf8_lossy(&output.stderr));
}
Ok(output)
}
/// Create the tmux session if it doesn't exist
async fn create_session_if_needed(&self) -> ProcessManagerResult<()> {
// Check if session exists
let output = self
.tmux_command(&["has-session", "-t", &self.session_name])
.await?;
if !output.status.success() {
// Session doesn't exist, create it
log::info!("Creating tmux session: {}", self.session_name);
let output = self
.tmux_command(&["new-session", "-d", "-s", &self.session_name])
.await?;
if !output.status.success() {
return Err(ProcessManagerError::Other(format!(
"Failed to create tmux session '{}': {}",
self.session_name,
String::from_utf8_lossy(&output.stderr)
)));
}
}
Ok(())
}
/// Build the command string for running a process
fn build_process_command(&self, config: &ProcessConfig) -> String {
let mut cmd_parts = vec![config.binary_path.to_string_lossy().to_string()];
cmd_parts.extend(config.args.clone());
cmd_parts.join(" ")
}
/// Get the window name for a process
fn get_window_name(&self, process_id: &str) -> String {
format!("proc-{}", process_id)
}
}
#[async_trait]
impl ProcessManager for TmuxProcessManager {
async fn start_process(&mut self, config: &ProcessConfig) -> ProcessManagerResult<()> {
self.create_session_if_needed().await?;
let window_name = self.get_window_name(&config.process_id);
let command = self.build_process_command(config);
// Check if window already exists
let check_output = self
.tmux_command(&[
"list-windows",
"-t",
&self.session_name,
"-F",
"#{window_name}",
])
.await?;
let existing_windows = String::from_utf8_lossy(&check_output.stdout);
if existing_windows.lines().any(|line| line.trim() == window_name) {
return Err(ProcessManagerError::ProcessAlreadyRunning(config.process_id.clone()));
}
// Create new window and run the process
let mut tmux_args = vec![
"new-window",
"-t",
&self.session_name,
"-n",
&window_name,
];
// Set working directory if specified
let working_dir_arg;
if let Some(working_dir) = &config.working_dir {
working_dir_arg = working_dir.to_string_lossy().to_string();
tmux_args.extend(&["-c", &working_dir_arg]);
}
tmux_args.push(&command);
let output = self.tmux_command(&tmux_args).await?;
if !output.status.success() {
return Err(ProcessManagerError::StartupFailed(
config.process_id.clone(),
format!(
"Failed to create tmux window: {}",
String::from_utf8_lossy(&output.stderr)
),
));
}
// Wait a moment and check if the process is still running
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
match self.process_status(&config.process_id).await? {
ProcessStatus::Running => {
log::info!("Successfully started process {} in tmux window {}", config.process_id, window_name);
Ok(())
}
ProcessStatus::Stopped => {
Err(ProcessManagerError::StartupFailed(
config.process_id.clone(),
"Process exited immediately after startup".to_string(),
))
}
ProcessStatus::Error(msg) => {
Err(ProcessManagerError::StartupFailed(
config.process_id.clone(),
format!("Process failed to start: {}", msg),
))
}
_ => Ok(()),
}
}
async fn stop_process(&mut self, process_id: &str, force: bool) -> ProcessManagerResult<()> {
let window_name = self.get_window_name(process_id);
// Check if window exists
let check_output = self
.tmux_command(&[
"list-windows",
"-t",
&self.session_name,
"-F",
"#{window_name}",
])
.await?;
let existing_windows = String::from_utf8_lossy(&check_output.stdout);
if !existing_windows.lines().any(|line| line.trim() == window_name) {
return Err(ProcessManagerError::ProcessNotFound(process_id.to_string()));
}
if force {
// Kill the window immediately
let output = self
.tmux_command(&["kill-window", "-t", &format!("{}:{}", self.session_name, window_name)])
.await?;
if !output.status.success() {
return Err(ProcessManagerError::StopFailed(
process_id.to_string(),
format!(
"Failed to kill tmux window: {}",
String::from_utf8_lossy(&output.stderr)
),
));
}
} else {
// Send SIGTERM to the process in the window
let output = self
.tmux_command(&[
"send-keys",
"-t",
&format!("{}:{}", self.session_name, window_name),
"C-c",
])
.await?;
if !output.status.success() {
log::warn!("Failed to send SIGTERM, trying force kill");
// Fallback to force kill
return self.stop_process(process_id, true).await;
}
// Wait a bit for graceful shutdown
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
// Check if process is still running, force kill if needed
if let Ok(ProcessStatus::Running) = self.process_status(process_id).await {
log::info!("Process {} didn't stop gracefully, force killing", process_id);
return self.stop_process(process_id, true).await;
}
}
log::info!("Successfully stopped process {}", process_id);
Ok(())
}
async fn process_status(&self, process_id: &str) -> ProcessManagerResult<ProcessStatus> {
let window_name = self.get_window_name(process_id);
// Check if window exists
let check_output = self
.tmux_command(&[
"list-windows",
"-t",
&self.session_name,
"-F",
"#{window_name}",
])
.await?;
let existing_windows = String::from_utf8_lossy(&check_output.stdout);
if !existing_windows.lines().any(|line| line.trim() == window_name) {
return Ok(ProcessStatus::Stopped);
}
// Check if there are any panes in the window (process running)
let pane_output = self
.tmux_command(&[
"list-panes",
"-t",
&format!("{}:{}", self.session_name, window_name),
"-F",
"#{pane_pid}",
])
.await?;
if pane_output.status.success() && !pane_output.stdout.is_empty() {
Ok(ProcessStatus::Running)
} else {
Ok(ProcessStatus::Stopped)
}
}
async fn process_logs(&self, process_id: &str, lines: Option<usize>, _follow: bool) -> ProcessManagerResult<Vec<LogInfo>> {
let window_name = self.get_window_name(process_id);
// Capture the pane content (this is the best we can do with tmux)
let target_window = format!("{}:{}", self.session_name, window_name);
let mut tmux_args = vec![
"capture-pane",
"-t",
&target_window,
"-p",
];
// Add line limit if specified
let lines_arg;
if let Some(line_count) = lines {
lines_arg = format!("-S -{}", line_count);
tmux_args.push(&lines_arg);
}
let output = self.tmux_command(&tmux_args).await?;
if !output.status.success() {
return Err(ProcessManagerError::LogsFailed(
process_id.to_string(),
format!(
"Failed to capture tmux pane: {}",
String::from_utf8_lossy(&output.stderr)
),
));
}
let content = String::from_utf8_lossy(&output.stdout);
let timestamp = Utc::now().to_rfc3339();
let logs = content
.lines()
.filter(|line| !line.trim().is_empty())
.map(|line| LogInfo {
timestamp: timestamp.clone(),
level: "info".to_string(),
message: line.to_string(),
})
.collect();
Ok(logs)
}
async fn health_check(&self) -> ProcessManagerResult<()> {
// Check if tmux is available
let output = Command::new("tmux")
.arg("list-sessions")
.output()
.await
.map_err(|e| ProcessManagerError::Other(format!("Tmux not available: {}", e)))?;
if !output.status.success() {
let error_msg = String::from_utf8_lossy(&output.stderr);
if error_msg.contains("no server running") {
// This is fine, tmux server will start when needed
Ok(())
} else {
Err(ProcessManagerError::Other(format!("Tmux health check failed: {}", error_msg)))
}
} else {
Ok(())
}
}
async fn list_processes(&self) -> ProcessManagerResult<Vec<String>> {
// List all windows in our session that match our process naming pattern
let output = self
.tmux_command(&[
"list-windows",
"-t",
&self.session_name,
"-F",
"#{window_name}",
])
.await?;
if !output.status.success() {
// Session might not exist
return Ok(Vec::new());
}
let windows = String::from_utf8_lossy(&output.stdout);
let processes = windows
.lines()
.filter_map(|line| {
let window_name = line.trim();
if window_name.starts_with("proc-") {
Some(window_name.strip_prefix("proc-").unwrap().to_string())
} else {
None
}
})
.collect();
Ok(processes)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
#[tokio::test]
async fn test_tmux_manager_creation() {
let manager = TmuxProcessManager::new("test_session".to_string());
assert_eq!(manager.session_name, "test_session");
}
#[tokio::test]
async fn test_window_name_generation() {
let manager = TmuxProcessManager::new("test_session".to_string());
let window_name = manager.get_window_name("test_process");
assert_eq!(window_name, "proc-test_process");
}
#[tokio::test]
async fn test_command_building() {
let manager = TmuxProcessManager::new("test_session".to_string());
let config = ProcessConfig::new(
"test_process".to_string(),
PathBuf::from("/usr/bin/echo"),
)
.with_arg("hello".to_string())
.with_arg("world".to_string());
let command = manager.build_process_command(&config);
assert!(command.contains("/usr/bin/echo"));
assert!(command.contains("hello"));
assert!(command.contains("world"));
}
}