Merge branch 'development' of https://git.ourworld.tf/herocode/sal into development
This commit is contained in:
404
_archive/service_manager/src/tmux_manager.rs
Normal file
404
_archive/service_manager/src/tmux_manager.rs
Normal file
@@ -0,0 +1,404 @@
|
||||
//! # Tmux Process Manager
|
||||
//!
|
||||
//! This module provides a tmux-based process manager implementation that manages
|
||||
//! processes within tmux sessions and windows. This is useful for production
|
||||
//! environments where you need persistent, manageable processes.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::Utc;
|
||||
use std::process::Output;
|
||||
use tokio::process::Command;
|
||||
|
||||
use crate::process_manager::{
|
||||
LogInfo, ProcessConfig, ProcessManager, ProcessManagerError, ProcessManagerResult,
|
||||
ProcessStatus,
|
||||
};
|
||||
|
||||
/// Tmux-based process manager implementation
|
||||
///
|
||||
/// This manager creates and manages processes within tmux sessions, providing
|
||||
/// better process isolation and management capabilities compared to simple spawning.
|
||||
pub struct TmuxProcessManager {
|
||||
/// Name of the tmux session to use
|
||||
session_name: String,
|
||||
}
|
||||
|
||||
impl TmuxProcessManager {
|
||||
/// Create a new tmux process manager with the specified session name
|
||||
pub fn new(session_name: String) -> Self {
|
||||
Self { session_name }
|
||||
}
|
||||
|
||||
/// Execute a tmux command and return the output
|
||||
async fn tmux_command(&self, args: &[&str]) -> ProcessManagerResult<Output> {
|
||||
let output = Command::new("tmux")
|
||||
.args(args)
|
||||
.output()
|
||||
.await
|
||||
.map_err(|e| ProcessManagerError::Other(format!("Failed to execute tmux command: {}", e)))?;
|
||||
|
||||
log::debug!("Tmux command: tmux {}", args.join(" "));
|
||||
log::debug!("Tmux output: {}", String::from_utf8_lossy(&output.stdout));
|
||||
|
||||
if !output.stderr.is_empty() {
|
||||
log::debug!("Tmux stderr: {}", String::from_utf8_lossy(&output.stderr));
|
||||
}
|
||||
|
||||
Ok(output)
|
||||
}
|
||||
|
||||
/// Create the tmux session if it doesn't exist
|
||||
async fn create_session_if_needed(&self) -> ProcessManagerResult<()> {
|
||||
// Check if session exists
|
||||
let output = self
|
||||
.tmux_command(&["has-session", "-t", &self.session_name])
|
||||
.await?;
|
||||
|
||||
if !output.status.success() {
|
||||
// Session doesn't exist, create it
|
||||
log::info!("Creating tmux session: {}", self.session_name);
|
||||
let output = self
|
||||
.tmux_command(&["new-session", "-d", "-s", &self.session_name])
|
||||
.await?;
|
||||
|
||||
if !output.status.success() {
|
||||
return Err(ProcessManagerError::Other(format!(
|
||||
"Failed to create tmux session '{}': {}",
|
||||
self.session_name,
|
||||
String::from_utf8_lossy(&output.stderr)
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Build the command string for running a process
|
||||
fn build_process_command(&self, config: &ProcessConfig) -> String {
|
||||
let mut cmd_parts = vec![config.binary_path.to_string_lossy().to_string()];
|
||||
cmd_parts.extend(config.args.clone());
|
||||
cmd_parts.join(" ")
|
||||
}
|
||||
|
||||
/// Get the window name for a process
|
||||
fn get_window_name(&self, process_id: &str) -> String {
|
||||
format!("proc-{}", process_id)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ProcessManager for TmuxProcessManager {
|
||||
async fn start_process(&mut self, config: &ProcessConfig) -> ProcessManagerResult<()> {
|
||||
self.create_session_if_needed().await?;
|
||||
|
||||
let window_name = self.get_window_name(&config.process_id);
|
||||
let command = self.build_process_command(config);
|
||||
|
||||
// Check if window already exists
|
||||
let check_output = self
|
||||
.tmux_command(&[
|
||||
"list-windows",
|
||||
"-t",
|
||||
&self.session_name,
|
||||
"-F",
|
||||
"#{window_name}",
|
||||
])
|
||||
.await?;
|
||||
|
||||
let existing_windows = String::from_utf8_lossy(&check_output.stdout);
|
||||
if existing_windows.lines().any(|line| line.trim() == window_name) {
|
||||
return Err(ProcessManagerError::ProcessAlreadyRunning(config.process_id.clone()));
|
||||
}
|
||||
|
||||
// Create new window and run the process
|
||||
let mut tmux_args = vec![
|
||||
"new-window",
|
||||
"-t",
|
||||
&self.session_name,
|
||||
"-n",
|
||||
&window_name,
|
||||
];
|
||||
|
||||
// Set working directory if specified
|
||||
let working_dir_arg;
|
||||
if let Some(working_dir) = &config.working_dir {
|
||||
working_dir_arg = working_dir.to_string_lossy().to_string();
|
||||
tmux_args.extend(&["-c", &working_dir_arg]);
|
||||
}
|
||||
|
||||
tmux_args.push(&command);
|
||||
|
||||
let output = self.tmux_command(&tmux_args).await?;
|
||||
|
||||
if !output.status.success() {
|
||||
return Err(ProcessManagerError::StartupFailed(
|
||||
config.process_id.clone(),
|
||||
format!(
|
||||
"Failed to create tmux window: {}",
|
||||
String::from_utf8_lossy(&output.stderr)
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
// Wait a moment and check if the process is still running
|
||||
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
|
||||
|
||||
match self.process_status(&config.process_id).await? {
|
||||
ProcessStatus::Running => {
|
||||
log::info!("Successfully started process {} in tmux window {}", config.process_id, window_name);
|
||||
Ok(())
|
||||
}
|
||||
ProcessStatus::Stopped => {
|
||||
Err(ProcessManagerError::StartupFailed(
|
||||
config.process_id.clone(),
|
||||
"Process exited immediately after startup".to_string(),
|
||||
))
|
||||
}
|
||||
ProcessStatus::Error(msg) => {
|
||||
Err(ProcessManagerError::StartupFailed(
|
||||
config.process_id.clone(),
|
||||
format!("Process failed to start: {}", msg),
|
||||
))
|
||||
}
|
||||
_ => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn stop_process(&mut self, process_id: &str, force: bool) -> ProcessManagerResult<()> {
|
||||
let window_name = self.get_window_name(process_id);
|
||||
|
||||
// Check if window exists
|
||||
let check_output = self
|
||||
.tmux_command(&[
|
||||
"list-windows",
|
||||
"-t",
|
||||
&self.session_name,
|
||||
"-F",
|
||||
"#{window_name}",
|
||||
])
|
||||
.await?;
|
||||
|
||||
let existing_windows = String::from_utf8_lossy(&check_output.stdout);
|
||||
if !existing_windows.lines().any(|line| line.trim() == window_name) {
|
||||
return Err(ProcessManagerError::ProcessNotFound(process_id.to_string()));
|
||||
}
|
||||
|
||||
if force {
|
||||
// Kill the window immediately
|
||||
let output = self
|
||||
.tmux_command(&["kill-window", "-t", &format!("{}:{}", self.session_name, window_name)])
|
||||
.await?;
|
||||
|
||||
if !output.status.success() {
|
||||
return Err(ProcessManagerError::StopFailed(
|
||||
process_id.to_string(),
|
||||
format!(
|
||||
"Failed to kill tmux window: {}",
|
||||
String::from_utf8_lossy(&output.stderr)
|
||||
),
|
||||
));
|
||||
}
|
||||
} else {
|
||||
// Send SIGTERM to the process in the window
|
||||
let output = self
|
||||
.tmux_command(&[
|
||||
"send-keys",
|
||||
"-t",
|
||||
&format!("{}:{}", self.session_name, window_name),
|
||||
"C-c",
|
||||
])
|
||||
.await?;
|
||||
|
||||
if !output.status.success() {
|
||||
log::warn!("Failed to send SIGTERM, trying force kill");
|
||||
// Fallback to force kill
|
||||
return self.stop_process(process_id, true).await;
|
||||
}
|
||||
|
||||
// Wait a bit for graceful shutdown
|
||||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||
|
||||
// Check if process is still running, force kill if needed
|
||||
if let Ok(ProcessStatus::Running) = self.process_status(process_id).await {
|
||||
log::info!("Process {} didn't stop gracefully, force killing", process_id);
|
||||
return self.stop_process(process_id, true).await;
|
||||
}
|
||||
}
|
||||
|
||||
log::info!("Successfully stopped process {}", process_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn process_status(&self, process_id: &str) -> ProcessManagerResult<ProcessStatus> {
|
||||
let window_name = self.get_window_name(process_id);
|
||||
|
||||
// Check if window exists
|
||||
let check_output = self
|
||||
.tmux_command(&[
|
||||
"list-windows",
|
||||
"-t",
|
||||
&self.session_name,
|
||||
"-F",
|
||||
"#{window_name}",
|
||||
])
|
||||
.await?;
|
||||
|
||||
let existing_windows = String::from_utf8_lossy(&check_output.stdout);
|
||||
if !existing_windows.lines().any(|line| line.trim() == window_name) {
|
||||
return Ok(ProcessStatus::Stopped);
|
||||
}
|
||||
|
||||
// Check if there are any panes in the window (process running)
|
||||
let pane_output = self
|
||||
.tmux_command(&[
|
||||
"list-panes",
|
||||
"-t",
|
||||
&format!("{}:{}", self.session_name, window_name),
|
||||
"-F",
|
||||
"#{pane_pid}",
|
||||
])
|
||||
.await?;
|
||||
|
||||
if pane_output.status.success() && !pane_output.stdout.is_empty() {
|
||||
Ok(ProcessStatus::Running)
|
||||
} else {
|
||||
Ok(ProcessStatus::Stopped)
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_logs(&self, process_id: &str, lines: Option<usize>, _follow: bool) -> ProcessManagerResult<Vec<LogInfo>> {
|
||||
let window_name = self.get_window_name(process_id);
|
||||
|
||||
// Capture the pane content (this is the best we can do with tmux)
|
||||
let target_window = format!("{}:{}", self.session_name, window_name);
|
||||
let mut tmux_args = vec![
|
||||
"capture-pane",
|
||||
"-t",
|
||||
&target_window,
|
||||
"-p",
|
||||
];
|
||||
|
||||
// Add line limit if specified
|
||||
let lines_arg;
|
||||
if let Some(line_count) = lines {
|
||||
lines_arg = format!("-S -{}", line_count);
|
||||
tmux_args.push(&lines_arg);
|
||||
}
|
||||
|
||||
let output = self.tmux_command(&tmux_args).await?;
|
||||
|
||||
if !output.status.success() {
|
||||
return Err(ProcessManagerError::LogsFailed(
|
||||
process_id.to_string(),
|
||||
format!(
|
||||
"Failed to capture tmux pane: {}",
|
||||
String::from_utf8_lossy(&output.stderr)
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
let content = String::from_utf8_lossy(&output.stdout);
|
||||
let timestamp = Utc::now().to_rfc3339();
|
||||
|
||||
let logs = content
|
||||
.lines()
|
||||
.filter(|line| !line.trim().is_empty())
|
||||
.map(|line| LogInfo {
|
||||
timestamp: timestamp.clone(),
|
||||
level: "info".to_string(),
|
||||
message: line.to_string(),
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(logs)
|
||||
}
|
||||
|
||||
async fn health_check(&self) -> ProcessManagerResult<()> {
|
||||
// Check if tmux is available
|
||||
let output = Command::new("tmux")
|
||||
.arg("list-sessions")
|
||||
.output()
|
||||
.await
|
||||
.map_err(|e| ProcessManagerError::Other(format!("Tmux not available: {}", e)))?;
|
||||
|
||||
if !output.status.success() {
|
||||
let error_msg = String::from_utf8_lossy(&output.stderr);
|
||||
if error_msg.contains("no server running") {
|
||||
// This is fine, tmux server will start when needed
|
||||
Ok(())
|
||||
} else {
|
||||
Err(ProcessManagerError::Other(format!("Tmux health check failed: {}", error_msg)))
|
||||
}
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn list_processes(&self) -> ProcessManagerResult<Vec<String>> {
|
||||
// List all windows in our session that match our process naming pattern
|
||||
let output = self
|
||||
.tmux_command(&[
|
||||
"list-windows",
|
||||
"-t",
|
||||
&self.session_name,
|
||||
"-F",
|
||||
"#{window_name}",
|
||||
])
|
||||
.await?;
|
||||
|
||||
if !output.status.success() {
|
||||
// Session might not exist
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
let windows = String::from_utf8_lossy(&output.stdout);
|
||||
let processes = windows
|
||||
.lines()
|
||||
.filter_map(|line| {
|
||||
let window_name = line.trim();
|
||||
if window_name.starts_with("proc-") {
|
||||
Some(window_name.strip_prefix("proc-").unwrap().to_string())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(processes)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::path::PathBuf;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_tmux_manager_creation() {
|
||||
let manager = TmuxProcessManager::new("test_session".to_string());
|
||||
assert_eq!(manager.session_name, "test_session");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_window_name_generation() {
|
||||
let manager = TmuxProcessManager::new("test_session".to_string());
|
||||
let window_name = manager.get_window_name("test_process");
|
||||
assert_eq!(window_name, "proc-test_process");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_command_building() {
|
||||
let manager = TmuxProcessManager::new("test_session".to_string());
|
||||
let config = ProcessConfig::new(
|
||||
"test_process".to_string(),
|
||||
PathBuf::from("/usr/bin/echo"),
|
||||
)
|
||||
.with_arg("hello".to_string())
|
||||
.with_arg("world".to_string());
|
||||
|
||||
let command = manager.build_process_command(&config);
|
||||
assert!(command.contains("/usr/bin/echo"));
|
||||
assert!(command.contains("hello"));
|
||||
assert!(command.contains("world"));
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user