use crate::{ServiceConfig, ServiceManager, ServiceManagerError, ServiceStatus}; use once_cell::sync::Lazy; use serde_json::json; use std::sync::Arc; use std::time::Duration; use tokio::runtime::Runtime; use tokio::time::timeout; use zinit_client::{ServiceStatus as ZinitServiceStatus, ZinitClient, ZinitError}; // Shared runtime for async operations - production-safe initialization static ASYNC_RUNTIME: Lazy> = Lazy::new(|| Runtime::new().ok()); /// Get the async runtime, creating a temporary one if the static runtime failed fn get_runtime() -> Result { // Try to use the static runtime first if let Some(_runtime) = ASYNC_RUNTIME.as_ref() { // We can't return a reference to the static runtime because we need ownership // for block_on, so we create a new one. This is a reasonable trade-off for safety. Runtime::new().map_err(|e| { ServiceManagerError::Other(format!("Failed to create async runtime: {}", e)) }) } else { // Static runtime failed, try to create a new one Runtime::new().map_err(|e| { ServiceManagerError::Other(format!("Failed to create async runtime: {}", e)) }) } } pub struct ZinitServiceManager { client: Arc, } impl ZinitServiceManager { pub fn new(socket_path: &str) -> Result { // Create the base zinit client directly let client = Arc::new(ZinitClient::new(socket_path)); Ok(ZinitServiceManager { client }) } /// Execute an async operation using the shared runtime or current context fn execute_async(&self, operation: F) -> Result where F: std::future::Future> + Send + 'static, T: Send + 'static, { // Check if we're already in a tokio runtime context if let Ok(_handle) = tokio::runtime::Handle::try_current() { // We're in an async context, use spawn_blocking to avoid nested runtime let result = std::thread::spawn( move || -> Result, ServiceManagerError> { let rt = Runtime::new().map_err(|e| { ServiceManagerError::Other(format!("Failed to create runtime: {}", e)) })?; Ok(rt.block_on(operation)) }, ) .join() .map_err(|_| ServiceManagerError::Other("Thread join failed".to_string()))?; result?.map_err(|e| ServiceManagerError::Other(e.to_string())) } else { // No current runtime, use production-safe runtime let runtime = get_runtime()?; runtime .block_on(operation) .map_err(|e| ServiceManagerError::Other(e.to_string())) } } /// Execute an async operation with timeout using the shared runtime or current context fn execute_async_with_timeout( &self, operation: F, timeout_secs: u64, ) -> Result where F: std::future::Future> + Send + 'static, T: Send + 'static, { let timeout_duration = Duration::from_secs(timeout_secs); let timeout_op = timeout(timeout_duration, operation); // Check if we're already in a tokio runtime context if let Ok(_handle) = tokio::runtime::Handle::try_current() { // We're in an async context, use spawn_blocking to avoid nested runtime let result = std::thread::spawn(move || { let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(timeout_op) }) .join() .map_err(|_| ServiceManagerError::Other("Thread join failed".to_string()))?; result .map_err(|_| { ServiceManagerError::Other(format!( "Operation timed out after {} seconds", timeout_secs )) })? .map_err(|e| ServiceManagerError::Other(e.to_string())) } else { // No current runtime, use production-safe runtime let runtime = get_runtime()?; runtime .block_on(timeout_op) .map_err(|_| { ServiceManagerError::Other(format!( "Operation timed out after {} seconds", timeout_secs )) })? .map_err(|e| ServiceManagerError::Other(e.to_string())) } } } impl ServiceManager for ZinitServiceManager { fn exists(&self, service_name: &str) -> Result { let status_res = self.status(service_name); match status_res { Ok(_) => Ok(true), Err(ServiceManagerError::ServiceNotFound(_)) => Ok(false), Err(e) => Err(e), } } fn start(&self, config: &ServiceConfig) -> Result<(), ServiceManagerError> { // Build the exec command with args let mut exec_command = config.binary_path.clone(); if !config.args.is_empty() { exec_command.push(' '); exec_command.push_str(&config.args.join(" ")); } // Create zinit-compatible service configuration let mut service_config = json!({ "exec": exec_command, "oneshot": !config.auto_restart, // zinit uses oneshot, not restart "env": config.environment, }); // Add optional fields if present if let Some(ref working_dir) = config.working_directory { // Zinit doesn't support working_directory directly, so we need to modify the exec command let cd_command = format!("cd {} && {}", working_dir, exec_command); service_config["exec"] = json!(cd_command); } let client = Arc::clone(&self.client); let service_name = config.name.clone(); self.execute_async( async move { client.create_service(&service_name, service_config).await }, ) .map_err(|e| ServiceManagerError::StartFailed(config.name.clone(), e.to_string()))?; self.start_existing(&config.name) } fn start_existing(&self, service_name: &str) -> Result<(), ServiceManagerError> { let client = Arc::clone(&self.client); let service_name_owned = service_name.to_string(); let service_name_for_error = service_name.to_string(); self.execute_async(async move { client.start(&service_name_owned).await }) .map_err(|e| ServiceManagerError::StartFailed(service_name_for_error, e.to_string())) } fn start_and_confirm( &self, config: &ServiceConfig, timeout_secs: u64, ) -> Result<(), ServiceManagerError> { // Start the service first self.start(config)?; // Wait for confirmation with timeout using the shared runtime self.execute_async_with_timeout( async move { let start_time = std::time::Instant::now(); let timeout_duration = Duration::from_secs(timeout_secs); while start_time.elapsed() < timeout_duration { // We need to call status in a blocking way from within the async context // For now, we'll use a simple polling approach tokio::time::sleep(Duration::from_millis(100)).await; } // Return a timeout error that will be handled by execute_async_with_timeout // Use a generic error since we don't know the exact ZinitError variants Err(ZinitError::from(std::io::Error::new( std::io::ErrorKind::TimedOut, "Timeout waiting for service confirmation", ))) }, timeout_secs, )?; // Check final status match self.status(&config.name)? { ServiceStatus::Running => Ok(()), ServiceStatus::Failed => Err(ServiceManagerError::StartFailed( config.name.clone(), "Service failed to start".to_string(), )), _ => Err(ServiceManagerError::StartFailed( config.name.clone(), format!("Service did not start within {} seconds", timeout_secs), )), } } fn start_existing_and_confirm( &self, service_name: &str, timeout_secs: u64, ) -> Result<(), ServiceManagerError> { // Start the existing service first self.start_existing(service_name)?; // Wait for confirmation with timeout using the shared runtime self.execute_async_with_timeout( async move { let start_time = std::time::Instant::now(); let timeout_duration = Duration::from_secs(timeout_secs); while start_time.elapsed() < timeout_duration { tokio::time::sleep(Duration::from_millis(100)).await; } // Return a timeout error that will be handled by execute_async_with_timeout // Use a generic error since we don't know the exact ZinitError variants Err(ZinitError::from(std::io::Error::new( std::io::ErrorKind::TimedOut, "Timeout waiting for service confirmation", ))) }, timeout_secs, )?; // Check final status match self.status(service_name)? { ServiceStatus::Running => Ok(()), ServiceStatus::Failed => Err(ServiceManagerError::StartFailed( service_name.to_string(), "Service failed to start".to_string(), )), _ => Err(ServiceManagerError::StartFailed( service_name.to_string(), format!("Service did not start within {} seconds", timeout_secs), )), } } fn stop(&self, service_name: &str) -> Result<(), ServiceManagerError> { let client = Arc::clone(&self.client); let service_name_owned = service_name.to_string(); let service_name_for_error = service_name.to_string(); self.execute_async(async move { client.stop(&service_name_owned).await }) .map_err(|e| ServiceManagerError::StopFailed(service_name_for_error, e.to_string())) } fn restart(&self, service_name: &str) -> Result<(), ServiceManagerError> { let client = Arc::clone(&self.client); let service_name_owned = service_name.to_string(); let service_name_for_error = service_name.to_string(); self.execute_async(async move { client.restart(&service_name_owned).await }) .map_err(|e| ServiceManagerError::RestartFailed(service_name_for_error, e.to_string())) } fn status(&self, service_name: &str) -> Result { let client = Arc::clone(&self.client); let service_name_owned = service_name.to_string(); let service_name_for_error = service_name.to_string(); let status: ZinitServiceStatus = self .execute_async(async move { client.status(&service_name_owned).await }) .map_err(|e| { // Check if this is a "service not found" error if e.to_string().contains("not found") || e.to_string().contains("does not exist") { ServiceManagerError::ServiceNotFound(service_name_for_error) } else { ServiceManagerError::Other(e.to_string()) } })?; // ServiceStatus is a struct with fields, not an enum // We need to check the state field to determine the status // Convert ServiceState to string and match on that let state_str = format!("{:?}", status.state).to_lowercase(); let service_status = match state_str.as_str() { s if s.contains("running") => crate::ServiceStatus::Running, s if s.contains("stopped") => crate::ServiceStatus::Stopped, s if s.contains("failed") => crate::ServiceStatus::Failed, _ => crate::ServiceStatus::Unknown, }; Ok(service_status) } fn logs( &self, service_name: &str, _lines: Option, ) -> Result { // The logs method takes (follow: bool, filter: Option>) let client = Arc::clone(&self.client); let service_name_owned = service_name.to_string(); let logs = self .execute_async(async move { use futures::StreamExt; use tokio::time::{timeout, Duration}; let mut log_stream = client .logs(false, Some(service_name_owned.as_str())) .await?; let mut logs = Vec::new(); // Collect logs from the stream with a reasonable limit let mut count = 0; const MAX_LOGS: usize = 100; const LOG_TIMEOUT: Duration = Duration::from_secs(5); // Use timeout to prevent hanging let result = timeout(LOG_TIMEOUT, async { while let Some(log_result) = log_stream.next().await { match log_result { Ok(log_entry) => { logs.push(format!("{:?}", log_entry)); count += 1; if count >= MAX_LOGS { break; } } Err(_) => break, } } }) .await; // Handle timeout - this is not an error, just means no more logs available if result.is_err() { log::debug!( "Log reading timed out after {} seconds, returning {} logs", LOG_TIMEOUT.as_secs(), logs.len() ); } Ok::, ZinitError>(logs) }) .map_err(|e| { ServiceManagerError::LogsFailed(service_name.to_string(), e.to_string()) })?; Ok(logs.join("\n")) } fn list(&self) -> Result, ServiceManagerError> { let client = Arc::clone(&self.client); let services = self .execute_async(async move { client.list().await }) .map_err(|e| ServiceManagerError::Other(e.to_string()))?; Ok(services.keys().cloned().collect()) } fn remove(&self, service_name: &str) -> Result<(), ServiceManagerError> { // Try to stop the service first, but don't fail if it's already stopped or doesn't exist if let Err(e) = self.stop(service_name) { // Log the error but continue with removal log::warn!( "Failed to stop service '{}' before removal: {}", service_name, e ); } let client = Arc::clone(&self.client); let service_name = service_name.to_string(); self.execute_async(async move { client.delete_service(&service_name).await }) .map_err(|e| ServiceManagerError::Other(e.to_string())) } }