feat: Add service manager support

- Add a new service manager crate for dynamic service management
- Integrate service manager with Rhai for scripting
- Provide examples for circle worker management and basic usage
- Add comprehensive tests for service lifecycle and error handling
- Implement cross-platform support for macOS and Linux (zinit/systemd)
This commit is contained in:
Mahmoud-Emad
2025-07-01 18:00:21 +03:00
parent 46ad848e7e
commit 131d978450
28 changed files with 3562 additions and 192 deletions

View File

@@ -1,42 +1,435 @@
use crate::{ServiceConfig, ServiceManager, ServiceManagerError, ServiceStatus};
use async_trait::async_trait;
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::process::Command;
#[derive(Debug)]
pub struct SystemdServiceManager;
pub struct SystemdServiceManager {
service_prefix: String,
user_mode: bool,
}
impl SystemdServiceManager {
pub fn new() -> Self {
Self
Self {
service_prefix: "sal".to_string(),
user_mode: true, // Default to user services for safety
}
}
pub fn new_system() -> Self {
Self {
service_prefix: "sal".to_string(),
user_mode: false, // System-wide services (requires root)
}
}
fn get_service_name(&self, service_name: &str) -> String {
format!("{}-{}.service", self.service_prefix, service_name)
}
fn get_unit_file_path(&self, service_name: &str) -> PathBuf {
let service_file = self.get_service_name(service_name);
if self.user_mode {
// User service directory
let home = std::env::var("HOME").unwrap_or_else(|_| "/tmp".to_string());
PathBuf::from(home)
.join(".config")
.join("systemd")
.join("user")
.join(service_file)
} else {
// System service directory
PathBuf::from("/etc/systemd/system").join(service_file)
}
}
fn run_systemctl(&self, args: &[&str]) -> Result<String, ServiceManagerError> {
let mut cmd = Command::new("systemctl");
if self.user_mode {
cmd.arg("--user");
}
cmd.args(args);
let output = cmd
.output()
.map_err(|e| ServiceManagerError::Other(format!("Failed to run systemctl: {}", e)))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(ServiceManagerError::Other(format!(
"systemctl command failed: {}",
stderr
)));
}
Ok(String::from_utf8_lossy(&output.stdout).to_string())
}
fn create_unit_file(&self, config: &ServiceConfig) -> Result<(), ServiceManagerError> {
let unit_path = self.get_unit_file_path(&config.name);
// Ensure the directory exists
if let Some(parent) = unit_path.parent() {
fs::create_dir_all(parent).map_err(|e| {
ServiceManagerError::Other(format!("Failed to create unit directory: {}", e))
})?;
}
// Create the unit file content
let mut unit_content = String::new();
unit_content.push_str("[Unit]\n");
unit_content.push_str(&format!("Description={} service\n", config.name));
unit_content.push_str("After=network.target\n\n");
unit_content.push_str("[Service]\n");
unit_content.push_str("Type=simple\n");
// Build the ExecStart command
let mut exec_start = config.binary_path.clone();
for arg in &config.args {
exec_start.push(' ');
exec_start.push_str(arg);
}
unit_content.push_str(&format!("ExecStart={}\n", exec_start));
if let Some(working_dir) = &config.working_directory {
unit_content.push_str(&format!("WorkingDirectory={}\n", working_dir));
}
// Add environment variables
for (key, value) in &config.environment {
unit_content.push_str(&format!("Environment=\"{}={}\"\n", key, value));
}
if config.auto_restart {
unit_content.push_str("Restart=always\n");
unit_content.push_str("RestartSec=5\n");
}
unit_content.push_str("\n[Install]\n");
unit_content.push_str("WantedBy=default.target\n");
// Write the unit file
fs::write(&unit_path, unit_content)
.map_err(|e| ServiceManagerError::Other(format!("Failed to write unit file: {}", e)))?;
// Reload systemd to pick up the new unit file
self.run_systemctl(&["daemon-reload"])?;
Ok(())
}
}
#[async_trait]
impl ServiceManager for SystemdServiceManager {
async fn start(&self, _config: &ServiceConfig) -> Result<(), ServiceManagerError> {
Err(ServiceManagerError::Other("Systemd implementation not yet complete".to_string()))
fn exists(&self, service_name: &str) -> Result<bool, ServiceManagerError> {
let unit_path = self.get_unit_file_path(service_name);
Ok(unit_path.exists())
}
async fn stop(&self, _service_name: &str) -> Result<(), ServiceManagerError> {
Err(ServiceManagerError::Other("Systemd implementation not yet complete".to_string()))
fn start(&self, config: &ServiceConfig) -> Result<(), ServiceManagerError> {
let service_name = self.get_service_name(&config.name);
// Check if service already exists and is running
if self.exists(&config.name)? {
match self.status(&config.name)? {
ServiceStatus::Running => {
return Err(ServiceManagerError::ServiceAlreadyExists(
config.name.clone(),
));
}
_ => {
// Service exists but not running, we can start it
}
}
} else {
// Create the unit file
self.create_unit_file(config)?;
}
// Enable and start the service
self.run_systemctl(&["enable", &service_name])
.map_err(|e| ServiceManagerError::StartFailed(config.name.clone(), e.to_string()))?;
self.run_systemctl(&["start", &service_name])
.map_err(|e| ServiceManagerError::StartFailed(config.name.clone(), e.to_string()))?;
Ok(())
}
async fn restart(&self, _service_name: &str) -> Result<(), ServiceManagerError> {
Err(ServiceManagerError::Other("Systemd implementation not yet complete".to_string()))
fn start_existing(&self, service_name: &str) -> Result<(), ServiceManagerError> {
let service_unit = self.get_service_name(service_name);
// Check if unit file exists
if !self.exists(service_name)? {
return Err(ServiceManagerError::ServiceNotFound(
service_name.to_string(),
));
}
// Check if already running
match self.status(service_name)? {
ServiceStatus::Running => {
return Ok(()); // Already running, nothing to do
}
_ => {
// Start the service
self.run_systemctl(&["start", &service_unit]).map_err(|e| {
ServiceManagerError::StartFailed(service_name.to_string(), e.to_string())
})?;
}
}
Ok(())
}
async fn status(&self, _service_name: &str) -> Result<ServiceStatus, ServiceManagerError> {
Err(ServiceManagerError::Other("Systemd implementation not yet complete".to_string()))
fn start_and_confirm(
&self,
config: &ServiceConfig,
timeout_secs: u64,
) -> Result<(), ServiceManagerError> {
// Start the service first
self.start(config)?;
// Wait for confirmation with timeout
let start_time = std::time::Instant::now();
let timeout_duration = std::time::Duration::from_secs(timeout_secs);
while start_time.elapsed() < timeout_duration {
match self.status(&config.name) {
Ok(ServiceStatus::Running) => return Ok(()),
Ok(ServiceStatus::Failed) => {
return Err(ServiceManagerError::StartFailed(
config.name.clone(),
"Service failed to start".to_string(),
));
}
Ok(_) => {
// Still starting, wait a bit
std::thread::sleep(std::time::Duration::from_millis(100));
}
Err(_) => {
// Service might not exist yet, wait a bit
std::thread::sleep(std::time::Duration::from_millis(100));
}
}
}
Err(ServiceManagerError::StartFailed(
config.name.clone(),
format!("Service did not start within {} seconds", timeout_secs),
))
}
async fn logs(&self, _service_name: &str, _lines: Option<usize>) -> Result<String, ServiceManagerError> {
Err(ServiceManagerError::Other("Systemd implementation not yet complete".to_string()))
fn start_existing_and_confirm(
&self,
service_name: &str,
timeout_secs: u64,
) -> Result<(), ServiceManagerError> {
// Start the existing service first
self.start_existing(service_name)?;
// Wait for confirmation with timeout
let start_time = std::time::Instant::now();
let timeout_duration = std::time::Duration::from_secs(timeout_secs);
while start_time.elapsed() < timeout_duration {
match self.status(service_name) {
Ok(ServiceStatus::Running) => return Ok(()),
Ok(ServiceStatus::Failed) => {
return Err(ServiceManagerError::StartFailed(
service_name.to_string(),
"Service failed to start".to_string(),
));
}
Ok(_) => {
// Still starting, wait a bit
std::thread::sleep(std::time::Duration::from_millis(100));
}
Err(_) => {
// Service might not exist yet, wait a bit
std::thread::sleep(std::time::Duration::from_millis(100));
}
}
}
Err(ServiceManagerError::StartFailed(
service_name.to_string(),
format!("Service did not start within {} seconds", timeout_secs),
))
}
async fn list(&self) -> Result<Vec<String>, ServiceManagerError> {
Err(ServiceManagerError::Other("Systemd implementation not yet complete".to_string()))
fn stop(&self, service_name: &str) -> Result<(), ServiceManagerError> {
let service_unit = self.get_service_name(service_name);
// Check if service exists
if !self.exists(service_name)? {
return Err(ServiceManagerError::ServiceNotFound(
service_name.to_string(),
));
}
// Stop the service
self.run_systemctl(&["stop", &service_unit]).map_err(|e| {
ServiceManagerError::StopFailed(service_name.to_string(), e.to_string())
})?;
Ok(())
}
async fn remove(&self, _service_name: &str) -> Result<(), ServiceManagerError> {
Err(ServiceManagerError::Other("Systemd implementation not yet complete".to_string()))
fn restart(&self, service_name: &str) -> Result<(), ServiceManagerError> {
let service_unit = self.get_service_name(service_name);
// Check if service exists
if !self.exists(service_name)? {
return Err(ServiceManagerError::ServiceNotFound(
service_name.to_string(),
));
}
// Restart the service
self.run_systemctl(&["restart", &service_unit])
.map_err(|e| {
ServiceManagerError::RestartFailed(service_name.to_string(), e.to_string())
})?;
Ok(())
}
}
fn status(&self, service_name: &str) -> Result<ServiceStatus, ServiceManagerError> {
let service_unit = self.get_service_name(service_name);
// Check if service exists
if !self.exists(service_name)? {
return Err(ServiceManagerError::ServiceNotFound(
service_name.to_string(),
));
}
// Get service status
let output = self
.run_systemctl(&["is-active", &service_unit])
.unwrap_or_else(|_| "unknown".to_string());
let status = match output.trim() {
"active" => ServiceStatus::Running,
"inactive" => ServiceStatus::Stopped,
"failed" => ServiceStatus::Failed,
_ => ServiceStatus::Unknown,
};
Ok(status)
}
fn logs(
&self,
service_name: &str,
lines: Option<usize>,
) -> Result<String, ServiceManagerError> {
let service_unit = self.get_service_name(service_name);
// Check if service exists
if !self.exists(service_name)? {
return Err(ServiceManagerError::ServiceNotFound(
service_name.to_string(),
));
}
// Build journalctl command
let mut args = vec!["--unit", &service_unit, "--no-pager"];
let lines_arg;
if let Some(n) = lines {
lines_arg = format!("--lines={}", n);
args.push(&lines_arg);
}
// Use journalctl to get logs
let mut cmd = std::process::Command::new("journalctl");
if self.user_mode {
cmd.arg("--user");
}
cmd.args(&args);
let output = cmd.output().map_err(|e| {
ServiceManagerError::LogsFailed(
service_name.to_string(),
format!("Failed to run journalctl: {}", e),
)
})?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(ServiceManagerError::LogsFailed(
service_name.to_string(),
format!("journalctl command failed: {}", stderr),
));
}
Ok(String::from_utf8_lossy(&output.stdout).to_string())
}
fn list(&self) -> Result<Vec<String>, ServiceManagerError> {
// List all services with our prefix
let output =
self.run_systemctl(&["list-units", "--type=service", "--all", "--no-pager"])?;
let mut services = Vec::new();
for line in output.lines() {
if line.contains(&format!("{}-", self.service_prefix)) {
// Extract service name from the line
if let Some(unit_name) = line.split_whitespace().next() {
if let Some(service_name) = unit_name.strip_suffix(".service") {
if let Some(name) =
service_name.strip_prefix(&format!("{}-", self.service_prefix))
{
services.push(name.to_string());
}
}
}
}
}
Ok(services)
}
fn remove(&self, service_name: &str) -> Result<(), ServiceManagerError> {
let service_unit = self.get_service_name(service_name);
// Check if service exists
if !self.exists(service_name)? {
return Err(ServiceManagerError::ServiceNotFound(
service_name.to_string(),
));
}
// Try to stop the service first, but don't fail if it's already stopped
if let Err(e) = self.stop(service_name) {
log::warn!(
"Failed to stop service '{}' before removal: {}",
service_name,
e
);
}
// Disable the service
if let Err(e) = self.run_systemctl(&["disable", &service_unit]) {
log::warn!("Failed to disable service '{}': {}", service_name, e);
}
// Remove the unit file
let unit_path = self.get_unit_file_path(service_name);
if unit_path.exists() {
std::fs::remove_file(&unit_path).map_err(|e| {
ServiceManagerError::Other(format!("Failed to remove unit file: {}", e))
})?;
}
// Reload systemd to pick up the changes
self.run_systemctl(&["daemon-reload"])?;
Ok(())
}
}