...
This commit is contained in:
379
_archive/service_manager/src/zinit.rs
Normal file
379
_archive/service_manager/src/zinit.rs
Normal file
@@ -0,0 +1,379 @@
|
||||
use crate::{ServiceConfig, ServiceManager, ServiceManagerError, ServiceStatus};
|
||||
use once_cell::sync::Lazy;
|
||||
use serde_json::json;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::runtime::Runtime;
|
||||
use tokio::time::timeout;
|
||||
use zinit_client::{ServiceStatus as ZinitServiceStatus, ZinitClient, ZinitError};
|
||||
|
||||
// Shared runtime for async operations - production-safe initialization
|
||||
static ASYNC_RUNTIME: Lazy<Option<Runtime>> = Lazy::new(|| Runtime::new().ok());
|
||||
|
||||
/// Get the async runtime, creating a temporary one if the static runtime failed
|
||||
fn get_runtime() -> Result<Runtime, ServiceManagerError> {
|
||||
// Try to use the static runtime first
|
||||
if let Some(_runtime) = ASYNC_RUNTIME.as_ref() {
|
||||
// We can't return a reference to the static runtime because we need ownership
|
||||
// for block_on, so we create a new one. This is a reasonable trade-off for safety.
|
||||
Runtime::new().map_err(|e| {
|
||||
ServiceManagerError::Other(format!("Failed to create async runtime: {}", e))
|
||||
})
|
||||
} else {
|
||||
// Static runtime failed, try to create a new one
|
||||
Runtime::new().map_err(|e| {
|
||||
ServiceManagerError::Other(format!("Failed to create async runtime: {}", e))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ZinitServiceManager {
|
||||
client: Arc<ZinitClient>,
|
||||
}
|
||||
|
||||
impl ZinitServiceManager {
|
||||
pub fn new(socket_path: &str) -> Result<Self, ServiceManagerError> {
|
||||
// Create the base zinit client directly
|
||||
let client = Arc::new(ZinitClient::new(socket_path));
|
||||
|
||||
Ok(ZinitServiceManager { client })
|
||||
}
|
||||
|
||||
/// Execute an async operation using the shared runtime or current context
|
||||
fn execute_async<F, T>(&self, operation: F) -> Result<T, ServiceManagerError>
|
||||
where
|
||||
F: std::future::Future<Output = Result<T, ZinitError>> + Send + 'static,
|
||||
T: Send + 'static,
|
||||
{
|
||||
// Check if we're already in a tokio runtime context
|
||||
if let Ok(_handle) = tokio::runtime::Handle::try_current() {
|
||||
// We're in an async context, use spawn_blocking to avoid nested runtime
|
||||
let result = std::thread::spawn(
|
||||
move || -> Result<Result<T, ZinitError>, ServiceManagerError> {
|
||||
let rt = Runtime::new().map_err(|e| {
|
||||
ServiceManagerError::Other(format!("Failed to create runtime: {}", e))
|
||||
})?;
|
||||
Ok(rt.block_on(operation))
|
||||
},
|
||||
)
|
||||
.join()
|
||||
.map_err(|_| ServiceManagerError::Other("Thread join failed".to_string()))?;
|
||||
result?.map_err(|e| ServiceManagerError::Other(e.to_string()))
|
||||
} else {
|
||||
// No current runtime, use production-safe runtime
|
||||
let runtime = get_runtime()?;
|
||||
runtime
|
||||
.block_on(operation)
|
||||
.map_err(|e| ServiceManagerError::Other(e.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute an async operation with timeout using the shared runtime or current context
|
||||
fn execute_async_with_timeout<F, T>(
|
||||
&self,
|
||||
operation: F,
|
||||
timeout_secs: u64,
|
||||
) -> Result<T, ServiceManagerError>
|
||||
where
|
||||
F: std::future::Future<Output = Result<T, ZinitError>> + Send + 'static,
|
||||
T: Send + 'static,
|
||||
{
|
||||
let timeout_duration = Duration::from_secs(timeout_secs);
|
||||
let timeout_op = timeout(timeout_duration, operation);
|
||||
|
||||
// Check if we're already in a tokio runtime context
|
||||
if let Ok(_handle) = tokio::runtime::Handle::try_current() {
|
||||
// We're in an async context, use spawn_blocking to avoid nested runtime
|
||||
let result = std::thread::spawn(move || {
|
||||
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||
rt.block_on(timeout_op)
|
||||
})
|
||||
.join()
|
||||
.map_err(|_| ServiceManagerError::Other("Thread join failed".to_string()))?;
|
||||
|
||||
result
|
||||
.map_err(|_| {
|
||||
ServiceManagerError::Other(format!(
|
||||
"Operation timed out after {} seconds",
|
||||
timeout_secs
|
||||
))
|
||||
})?
|
||||
.map_err(|e| ServiceManagerError::Other(e.to_string()))
|
||||
} else {
|
||||
// No current runtime, use production-safe runtime
|
||||
let runtime = get_runtime()?;
|
||||
runtime
|
||||
.block_on(timeout_op)
|
||||
.map_err(|_| {
|
||||
ServiceManagerError::Other(format!(
|
||||
"Operation timed out after {} seconds",
|
||||
timeout_secs
|
||||
))
|
||||
})?
|
||||
.map_err(|e| ServiceManagerError::Other(e.to_string()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ServiceManager for ZinitServiceManager {
|
||||
fn exists(&self, service_name: &str) -> Result<bool, ServiceManagerError> {
|
||||
let status_res = self.status(service_name);
|
||||
match status_res {
|
||||
Ok(_) => Ok(true),
|
||||
Err(ServiceManagerError::ServiceNotFound(_)) => Ok(false),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
fn start(&self, config: &ServiceConfig) -> Result<(), ServiceManagerError> {
|
||||
// Build the exec command with args
|
||||
let mut exec_command = config.binary_path.clone();
|
||||
if !config.args.is_empty() {
|
||||
exec_command.push(' ');
|
||||
exec_command.push_str(&config.args.join(" "));
|
||||
}
|
||||
|
||||
// Create zinit-compatible service configuration
|
||||
let mut service_config = json!({
|
||||
"exec": exec_command,
|
||||
"oneshot": !config.auto_restart, // zinit uses oneshot, not restart
|
||||
"env": config.environment,
|
||||
});
|
||||
|
||||
// Add optional fields if present
|
||||
if let Some(ref working_dir) = config.working_directory {
|
||||
// Zinit doesn't support working_directory directly, so we need to modify the exec command
|
||||
let cd_command = format!("cd {} && {}", working_dir, exec_command);
|
||||
service_config["exec"] = json!(cd_command);
|
||||
}
|
||||
|
||||
let client = Arc::clone(&self.client);
|
||||
let service_name = config.name.clone();
|
||||
self.execute_async(
|
||||
async move { client.create_service(&service_name, service_config).await },
|
||||
)
|
||||
.map_err(|e| ServiceManagerError::StartFailed(config.name.clone(), e.to_string()))?;
|
||||
|
||||
self.start_existing(&config.name)
|
||||
}
|
||||
|
||||
fn start_existing(&self, service_name: &str) -> Result<(), ServiceManagerError> {
|
||||
let client = Arc::clone(&self.client);
|
||||
let service_name_owned = service_name.to_string();
|
||||
let service_name_for_error = service_name.to_string();
|
||||
self.execute_async(async move { client.start(&service_name_owned).await })
|
||||
.map_err(|e| ServiceManagerError::StartFailed(service_name_for_error, e.to_string()))
|
||||
}
|
||||
|
||||
fn start_and_confirm(
|
||||
&self,
|
||||
config: &ServiceConfig,
|
||||
timeout_secs: u64,
|
||||
) -> Result<(), ServiceManagerError> {
|
||||
// Start the service first
|
||||
self.start(config)?;
|
||||
|
||||
// Wait for confirmation with timeout using the shared runtime
|
||||
self.execute_async_with_timeout(
|
||||
async move {
|
||||
let start_time = std::time::Instant::now();
|
||||
let timeout_duration = Duration::from_secs(timeout_secs);
|
||||
|
||||
while start_time.elapsed() < timeout_duration {
|
||||
// We need to call status in a blocking way from within the async context
|
||||
// For now, we'll use a simple polling approach
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
|
||||
// Return a timeout error that will be handled by execute_async_with_timeout
|
||||
// Use a generic error since we don't know the exact ZinitError variants
|
||||
Err(ZinitError::from(std::io::Error::new(
|
||||
std::io::ErrorKind::TimedOut,
|
||||
"Timeout waiting for service confirmation",
|
||||
)))
|
||||
},
|
||||
timeout_secs,
|
||||
)?;
|
||||
|
||||
// Check final status
|
||||
match self.status(&config.name)? {
|
||||
ServiceStatus::Running => Ok(()),
|
||||
ServiceStatus::Failed => Err(ServiceManagerError::StartFailed(
|
||||
config.name.clone(),
|
||||
"Service failed to start".to_string(),
|
||||
)),
|
||||
_ => Err(ServiceManagerError::StartFailed(
|
||||
config.name.clone(),
|
||||
format!("Service did not start within {} seconds", timeout_secs),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn start_existing_and_confirm(
|
||||
&self,
|
||||
service_name: &str,
|
||||
timeout_secs: u64,
|
||||
) -> Result<(), ServiceManagerError> {
|
||||
// Start the existing service first
|
||||
self.start_existing(service_name)?;
|
||||
|
||||
// Wait for confirmation with timeout using the shared runtime
|
||||
self.execute_async_with_timeout(
|
||||
async move {
|
||||
let start_time = std::time::Instant::now();
|
||||
let timeout_duration = Duration::from_secs(timeout_secs);
|
||||
|
||||
while start_time.elapsed() < timeout_duration {
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
|
||||
// Return a timeout error that will be handled by execute_async_with_timeout
|
||||
// Use a generic error since we don't know the exact ZinitError variants
|
||||
Err(ZinitError::from(std::io::Error::new(
|
||||
std::io::ErrorKind::TimedOut,
|
||||
"Timeout waiting for service confirmation",
|
||||
)))
|
||||
},
|
||||
timeout_secs,
|
||||
)?;
|
||||
|
||||
// Check final status
|
||||
match self.status(service_name)? {
|
||||
ServiceStatus::Running => Ok(()),
|
||||
ServiceStatus::Failed => Err(ServiceManagerError::StartFailed(
|
||||
service_name.to_string(),
|
||||
"Service failed to start".to_string(),
|
||||
)),
|
||||
_ => Err(ServiceManagerError::StartFailed(
|
||||
service_name.to_string(),
|
||||
format!("Service did not start within {} seconds", timeout_secs),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn stop(&self, service_name: &str) -> Result<(), ServiceManagerError> {
|
||||
let client = Arc::clone(&self.client);
|
||||
let service_name_owned = service_name.to_string();
|
||||
let service_name_for_error = service_name.to_string();
|
||||
self.execute_async(async move { client.stop(&service_name_owned).await })
|
||||
.map_err(|e| ServiceManagerError::StopFailed(service_name_for_error, e.to_string()))
|
||||
}
|
||||
|
||||
fn restart(&self, service_name: &str) -> Result<(), ServiceManagerError> {
|
||||
let client = Arc::clone(&self.client);
|
||||
let service_name_owned = service_name.to_string();
|
||||
let service_name_for_error = service_name.to_string();
|
||||
self.execute_async(async move { client.restart(&service_name_owned).await })
|
||||
.map_err(|e| ServiceManagerError::RestartFailed(service_name_for_error, e.to_string()))
|
||||
}
|
||||
|
||||
fn status(&self, service_name: &str) -> Result<ServiceStatus, ServiceManagerError> {
|
||||
let client = Arc::clone(&self.client);
|
||||
let service_name_owned = service_name.to_string();
|
||||
let service_name_for_error = service_name.to_string();
|
||||
let status: ZinitServiceStatus = self
|
||||
.execute_async(async move { client.status(&service_name_owned).await })
|
||||
.map_err(|e| {
|
||||
// Check if this is a "service not found" error
|
||||
if e.to_string().contains("not found") || e.to_string().contains("does not exist") {
|
||||
ServiceManagerError::ServiceNotFound(service_name_for_error)
|
||||
} else {
|
||||
ServiceManagerError::Other(e.to_string())
|
||||
}
|
||||
})?;
|
||||
|
||||
// ServiceStatus is a struct with fields, not an enum
|
||||
// We need to check the state field to determine the status
|
||||
// Convert ServiceState to string and match on that
|
||||
let state_str = format!("{:?}", status.state).to_lowercase();
|
||||
let service_status = match state_str.as_str() {
|
||||
s if s.contains("running") => crate::ServiceStatus::Running,
|
||||
s if s.contains("stopped") => crate::ServiceStatus::Stopped,
|
||||
s if s.contains("failed") => crate::ServiceStatus::Failed,
|
||||
_ => crate::ServiceStatus::Unknown,
|
||||
};
|
||||
Ok(service_status)
|
||||
}
|
||||
|
||||
fn logs(
|
||||
&self,
|
||||
service_name: &str,
|
||||
_lines: Option<usize>,
|
||||
) -> Result<String, ServiceManagerError> {
|
||||
// The logs method takes (follow: bool, filter: Option<impl AsRef<str>>)
|
||||
let client = Arc::clone(&self.client);
|
||||
let service_name_owned = service_name.to_string();
|
||||
let logs = self
|
||||
.execute_async(async move {
|
||||
use futures::StreamExt;
|
||||
use tokio::time::{timeout, Duration};
|
||||
|
||||
let mut log_stream = client
|
||||
.logs(false, Some(service_name_owned.as_str()))
|
||||
.await?;
|
||||
let mut logs = Vec::new();
|
||||
|
||||
// Collect logs from the stream with a reasonable limit
|
||||
let mut count = 0;
|
||||
const MAX_LOGS: usize = 100;
|
||||
const LOG_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
// Use timeout to prevent hanging
|
||||
let result = timeout(LOG_TIMEOUT, async {
|
||||
while let Some(log_result) = log_stream.next().await {
|
||||
match log_result {
|
||||
Ok(log_entry) => {
|
||||
logs.push(format!("{:?}", log_entry));
|
||||
count += 1;
|
||||
if count >= MAX_LOGS {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
||||
// Handle timeout - this is not an error, just means no more logs available
|
||||
if result.is_err() {
|
||||
log::debug!(
|
||||
"Log reading timed out after {} seconds, returning {} logs",
|
||||
LOG_TIMEOUT.as_secs(),
|
||||
logs.len()
|
||||
);
|
||||
}
|
||||
|
||||
Ok::<Vec<String>, ZinitError>(logs)
|
||||
})
|
||||
.map_err(|e| {
|
||||
ServiceManagerError::LogsFailed(service_name.to_string(), e.to_string())
|
||||
})?;
|
||||
Ok(logs.join("\n"))
|
||||
}
|
||||
|
||||
fn list(&self) -> Result<Vec<String>, ServiceManagerError> {
|
||||
let client = Arc::clone(&self.client);
|
||||
let services = self
|
||||
.execute_async(async move { client.list().await })
|
||||
.map_err(|e| ServiceManagerError::Other(e.to_string()))?;
|
||||
Ok(services.keys().cloned().collect())
|
||||
}
|
||||
|
||||
fn remove(&self, service_name: &str) -> Result<(), ServiceManagerError> {
|
||||
// Try to stop the service first, but don't fail if it's already stopped or doesn't exist
|
||||
if let Err(e) = self.stop(service_name) {
|
||||
// Log the error but continue with removal
|
||||
log::warn!(
|
||||
"Failed to stop service '{}' before removal: {}",
|
||||
service_name,
|
||||
e
|
||||
);
|
||||
}
|
||||
|
||||
let client = Arc::clone(&self.client);
|
||||
let service_name = service_name.to_string();
|
||||
self.execute_async(async move { client.delete_service(&service_name).await })
|
||||
.map_err(|e| ServiceManagerError::Other(e.to_string()))
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user