initial commit

This commit is contained in:
Timur Gordon
2025-08-26 14:49:21 +02:00
commit 767c66fb6a
66 changed files with 22035 additions and 0 deletions

234
src/runner.rs Normal file
View File

@@ -0,0 +1,234 @@
//! Runner implementation for actor process management.
use crate::job::{Job};
use log::{debug, info};
use redis::AsyncCommands;
use sal_service_manager::{ProcessManager, ProcessManagerError as ServiceProcessManagerError, ProcessStatus, ProcessConfig};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Mutex;
/// Represents the current status of an actor/runner (alias for ProcessStatus)
pub type RunnerStatus = ProcessStatus;
/// Log information structure
#[derive(Debug, Clone)]
pub struct LogInfo {
pub timestamp: String,
pub level: String,
pub message: String,
}
/// Runner configuration and state (merged from RunnerConfig)
#[derive(Debug, Clone)]
pub struct Runner {
/// Unique identifier for the runner
pub id: String,
pub name: String,
pub namespace: String,
/// Path to the actor binary
pub command: PathBuf, // Command to run runner by, used only if supervisor is used to run runners
/// Redis URL for job queue
pub redis_url: String,
}
impl Runner {
/// Create a new runner from configuration
pub fn from_config(config: RunnerConfig) -> Self {
Self {
id: config.id,
name: config.name,
namespace: config.namespace,
command: config.command,
redis_url: config.redis_url,
}
}
/// Create a new runner with the given parameters
pub fn new(
id: String,
name: String,
namespace: String,
command: PathBuf,
redis_url: String,
) -> Self {
Self {
id,
name,
namespace,
command,
redis_url,
}
}
/// Get the queue key for this runner with the given namespace
pub fn get_queue(&self) -> String {
if self.namespace == "" {
format!("runner:{}", self.name)
} else {
format!("{}:runner:{}", self.namespace, self.name)
}
}
}
/// Result type for runner operations
pub type RunnerResult<T> = Result<T, RunnerError>;
/// Errors that can occur during runner operations
#[derive(Debug, thiserror::Error)]
pub enum RunnerError {
#[error("Actor '{actor_id}' not found")]
ActorNotFound { actor_id: String },
#[error("Actor '{actor_id}' is already running")]
ActorAlreadyRunning { actor_id: String },
#[error("Actor '{actor_id}' is not running")]
ActorNotRunning { actor_id: String },
#[error("Failed to start actor '{actor_id}': {reason}")]
StartupFailed { actor_id: String, reason: String },
#[error("Failed to stop actor '{actor_id}': {reason}")]
StopFailed { actor_id: String, reason: String },
#[error("Timeout waiting for actor '{actor_id}' to start")]
StartupTimeout { actor_id: String },
#[error("Job queue error for actor '{actor_id}': {reason}")]
QueueError { actor_id: String, reason: String },
#[error("Process manager error: {source}")]
ProcessManagerError {
#[from]
source: ServiceProcessManagerError,
},
#[error("Configuration error: {reason}")]
ConfigError { reason: String },
#[error("Invalid secret: {0}")]
InvalidSecret(String),
#[error("IO error: {source}")]
IoError {
#[from]
source: std::io::Error,
},
#[error("Redis error: {source}")]
RedisError {
#[from]
source: redis::RedisError,
},
#[error("Job error: {source}")]
JobError {
#[from]
source: crate::JobError,
},
}
/// Convert Runner to ProcessConfig
pub fn runner_to_process_config(config: &Runner) -> ProcessConfig {
ProcessConfig::new(config.id.clone(), config.command.clone())
.with_arg("--id".to_string())
.with_arg(config.id.clone())
.with_arg("--redis-url".to_string())
.with_arg(config.redis_url.clone())
}
// Type alias for backward compatibility
pub type RunnerConfig = Runner;
#[cfg(test)]
mod tests {
use super::*;
use sal_service_manager::{ProcessManagerError, SimpleProcessManager};
use std::collections::HashMap;
#[derive(Debug)]
struct MockProcessManager {
processes: HashMap<String, ProcessStatus>,
}
impl MockProcessManager {
fn new() -> Self {
Self {
processes: HashMap::new(),
}
}
}
#[async_trait::async_trait]
impl ProcessManager for MockProcessManager {
async fn start_process(&mut self, config: &ProcessConfig) -> Result<(), ProcessManagerError> {
self.processes.insert(config.id.clone(), ProcessStatus::Running);
Ok(())
}
async fn stop_process(&mut self, process_id: &str, _force: bool) -> Result<(), ProcessManagerError> {
self.processes.insert(process_id.to_string(), ProcessStatus::Stopped);
Ok(())
}
async fn process_status(&self, process_id: &str) -> Result<ProcessStatus, ProcessManagerError> {
Ok(self.processes.get(process_id).cloned().unwrap_or(ProcessStatus::Stopped))
}
async fn process_logs(&self, _process_id: &str, _lines: Option<usize>, _follow: bool) -> Result<Vec<LogInfo>, ProcessManagerError> {
Ok(vec![])
}
async fn health_check(&self) -> Result<(), ProcessManagerError> {
Ok(())
}
async fn list_processes(&self) -> Result<Vec<String>, ProcessManagerError> {
Ok(self.processes.keys().cloned().collect())
}
}
#[test]
fn test_runner_creation() {
let runner = Runner::new(
"test_actor".to_string(),
"test_runner".to_string(),
"".to_string(),
PathBuf::from("/path/to/binary"),
"redis://localhost:6379".to_string(),
);
assert_eq!(runner.id, "test_actor");
assert_eq!(runner.name, "test_runner");
assert_eq!(runner.command, PathBuf::from("/path/to/binary"));
assert_eq!(runner.redis_url, "redis://localhost:6379");
}
#[test]
fn test_runner_get_queue() {
let runner = Runner::new(
"test_actor".to_string(),
"test_runner".to_string(),
"".to_string(),
PathBuf::from("/path/to/binary"),
"redis://localhost:6379".to_string(),
);
let queue_key = runner.get_queue();
assert_eq!(queue_key, "runner:test_runner");
}
#[test]
fn test_runner_error_types() {
let error = RunnerError::ActorNotFound {
actor_id: "test".to_string(),
};
assert!(error.to_string().contains("test"));
let error = RunnerError::ActorAlreadyRunning {
actor_id: "test".to_string(),
};
assert!(error.to_string().contains("already running"));
}
}