baobab/core/supervisor/src/lib.rs
2025-08-05 15:44:33 +02:00

946 lines
35 KiB
Rust

use log::{debug, error, info, warn};
use redis::AsyncCommands;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use hero_job::NAMESPACE_PREFIX;
use zinit_client::Client as ZinitClient;
mod job;
mod error;
mod lifecycle;
pub use crate::error::SupervisorError;
pub use crate::job::JobBuilder;
pub use crate::lifecycle::ActorConfig;
// Re-export types from hero_job for public API
pub use hero_job::{Job, JobStatus, ScriptType};
pub struct Supervisor {
redis_client: redis::Client,
zinit_client: ZinitClient,
builder_data: Option<SupervisorBuilderData>,
}
pub struct SupervisorBuilder {
redis_url: Option<String>,
osis_actor: Option<String>,
sal_actor: Option<String>,
v_actor: Option<String>,
python_actor: Option<String>,
actor_env_vars: HashMap<String, String>,
websocket_config: Option<WebSocketServerConfig>,
}
/// Helper struct to pass builder data to actor launch method
#[derive(Clone)]
struct SupervisorBuilderData {
osis_actor: Option<String>,
sal_actor: Option<String>,
v_actor: Option<String>,
python_actor: Option<String>,
actor_env_vars: HashMap<String, String>,
websocket_config: Option<WebSocketServerConfig>,
}
/// TOML configuration structure for the supervisor
#[derive(Debug, Deserialize, Serialize)]
pub struct SupervisorConfig {
pub global: GlobalConfig,
pub websocket_server: Option<WebSocketServerConfig>,
pub osis_actor: Option<ActorConfigToml>,
pub sal_actor: Option<ActorConfigToml>,
pub v_actor: Option<ActorConfigToml>,
pub python_actor: Option<ActorConfigToml>,
}
/// Global configuration section
#[derive(Debug, Deserialize, Serialize)]
pub struct GlobalConfig {
pub redis_url: String,
}
/// Actor configuration section in TOML
#[derive(Debug, Deserialize, Serialize)]
pub struct ActorConfigToml {
pub binary_path: String,
}
/// WebSocket server configuration section in TOML
/// This mirrors the ServerConfig from hero_websocket_server but avoids circular dependency
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct WebSocketServerConfig {
/// Server host address
#[serde(default = "default_host")]
pub host: String,
/// Server port
#[serde(default = "default_port")]
pub port: u16,
/// Redis connection URL
#[serde(default = "default_redis_url")]
pub redis_url: String,
/// Enable authentication
#[serde(default)]
pub auth: bool,
/// Enable TLS/WSS
#[serde(default)]
pub tls: bool,
/// Path to TLS certificate file
pub cert: Option<String>,
/// Path to TLS private key file
pub key: Option<String>,
/// Separate port for TLS connections
pub tls_port: Option<u16>,
/// Circles configuration - maps circle names to lists of member public keys
#[serde(default)]
pub circles: HashMap<String, Vec<String>>,
}
// Default value functions for WebSocket server config
fn default_host() -> String {
"127.0.0.1".to_string()
}
fn default_port() -> u16 {
8443
}
fn default_redis_url() -> String {
"redis://127.0.0.1/".to_string()
}
impl SupervisorBuilder {
pub fn new() -> Self {
Self {
redis_url: None,
osis_actor: None,
sal_actor: None,
v_actor: None,
python_actor: None,
actor_env_vars: HashMap::new(),
websocket_config: None,
}
}
/// Create a SupervisorBuilder from a TOML configuration file
pub fn from_toml<P: AsRef<Path>>(toml_path: P) -> Result<Self, SupervisorError> {
let toml_content = fs::read_to_string(toml_path)
.map_err(|e| SupervisorError::ConfigError(format!("Failed to read TOML file: {}", e)))?;
let config: SupervisorConfig = toml::from_str(&toml_content)
.map_err(|e| SupervisorError::ConfigError(format!("Failed to parse TOML: {}", e)))?;
let mut builder = Self::new()
.redis_url(&config.global.redis_url);
// Configure actors based on TOML config
if let Some(osis_config) = config.osis_actor {
builder = builder.osis_actor(&osis_config.binary_path);
}
if let Some(sal_config) = config.sal_actor {
builder = builder.sal_actor(&sal_config.binary_path);
}
if let Some(v_config) = config.v_actor {
builder = builder.v_actor(&v_config.binary_path);
}
if let Some(python_config) = config.python_actor {
builder = builder.python_actor(&python_config.binary_path);
}
// Store WebSocket configuration for later use
if let Some(ws_config) = config.websocket_server {
builder.websocket_config = Some(ws_config);
}
Ok(builder)
}
/// Validate that all configured actor binaries exist and are executable
fn validate_actor_binaries(&self) -> Result<(), SupervisorError> {
let actors = [
("OSIS", &self.osis_actor),
("SAL", &self.sal_actor),
("V", &self.v_actor),
("Python", &self.python_actor),
];
for (actor_type, binary_path) in actors {
if let Some(path) = binary_path {
let path_obj = Path::new(path);
if !path_obj.exists() {
return Err(SupervisorError::ConfigError(
format!("{} actor binary does not exist: {}", actor_type, path)
));
}
if !path_obj.is_file() {
return Err(SupervisorError::ConfigError(
format!("{} actor path is not a file: {}", actor_type, path)
));
}
// Check if the file is executable (Unix-like systems)
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let metadata = path_obj.metadata().map_err(|e| {
SupervisorError::ConfigError(
format!("Failed to read metadata for {} actor binary {}: {}", actor_type, path, e)
)
})?;
let permissions = metadata.permissions();
if permissions.mode() & 0o111 == 0 {
return Err(SupervisorError::ConfigError(
format!("{} actor binary is not executable: {}", actor_type, path)
));
}
}
info!("Validated {} actor binary: {}", actor_type, path);
}
}
Ok(())
}
pub fn redis_url(mut self, url: &str) -> Self {
self.redis_url = Some(url.to_string());
self
}
pub fn osis_actor(mut self, binary_path: &str) -> Self {
self.osis_actor = Some(binary_path.to_string());
self
}
pub fn sal_actor(mut self, binary_path: &str) -> Self {
self.sal_actor = Some(binary_path.to_string());
self
}
pub fn v_actor(mut self, binary_path: &str) -> Self {
self.v_actor = Some(binary_path.to_string());
self
}
pub fn python_actor(mut self, binary_path: &str) -> Self {
self.python_actor = Some(binary_path.to_string());
self
}
pub fn actor_env_var(mut self, key: &str, value: &str) -> Self {
self.actor_env_vars.insert(key.to_string(), value.to_string());
self
}
pub fn actor_env_vars(mut self, env_vars: HashMap<String, String>) -> Self {
self.actor_env_vars.extend(env_vars);
self
}
/// Builds the final `Supervisor` instance synchronously.
///
/// This method validates the configuration, checks actor binary existence,
/// and creates the Redis client. Actor launching is deferred to the `start_actors()` method.
///
/// # Returns
///
/// * `Ok(Supervisor)` - Successfully configured client with valid binaries
/// * `Err(SupervisorError)` - Configuration, binary validation, or connection error
pub async fn build(self) -> Result<Supervisor, SupervisorError> {
// Validate that all configured actor binaries exist first
Self::validate_actor_binaries(&self)?;
let url = self.redis_url
.unwrap_or_else(|| "redis://127.0.0.1/".to_string());
let client = redis::Client::open(url)?;
let zinit_client = ZinitClient::unix_socket("/tmp/zinit.sock").await
.map_err(|e| SupervisorError::ZinitError(format!("Failed to create Zinit client: {}", e)))?;
// Store builder data for later use in start_actors()
let builder_data = SupervisorBuilderData {
osis_actor: self.osis_actor,
sal_actor: self.sal_actor,
v_actor: self.v_actor,
python_actor: self.python_actor,
actor_env_vars: self.actor_env_vars,
websocket_config: self.websocket_config,
};
let supervisor = Supervisor {
redis_client: client,
zinit_client,
builder_data: Some(builder_data),
};
Ok(supervisor)
}
}
impl Supervisor {
/// Start all configured actors asynchronously.
/// This method should be called after build() to launch the actors.
pub async fn start_actors(&self) -> Result<(), SupervisorError> {
info!("Starting Hero Supervisor actors...");
// Test Zinit connection first
info!("Testing Zinit connection at /tmp/zinit.sock...");
match self.zinit_client.list().await {
Ok(services) => {
info!("Successfully connected to Zinit. Current services: {:?}", services);
}
Err(e) => {
error!("Failed to connect to Zinit: {:?}", e);
return Err(SupervisorError::ZinitError(format!("Zinit connection failed: {}", e)));
}
}
// Clean up any existing actor services first
info!("Cleaning up existing actor services...");
self.cleanup_existing_actors().await?;
// Launch configured actors if builder data is available
if let Some(builder_data) = &self.builder_data {
info!("Launching configured actors...");
self.launch_configured_actors(builder_data).await?;
} else {
warn!("No builder data available, no actors to start");
}
info!("All actors started successfully!");
Ok(())
}
/// Clean up all actor services from zinit on program exit
pub async fn cleanup_and_shutdown(&self) -> Result<(), SupervisorError> {
info!("Cleaning up actor services before shutdown...");
let actor_names = vec![
"osis_actor_1",
"sal_actor_1",
"v_actor_1",
"python_actor_1"
];
for actor_name in actor_names {
if let Err(e) = self.stop_and_delete_actor(actor_name).await {
warn!("Failed to cleanup actor {}: {}", actor_name, e);
}
}
info!("Actor cleanup completed");
Ok(())
}
/// Clean up any existing actor services on startup
async fn cleanup_existing_actors(&self) -> Result<(), SupervisorError> {
info!("Cleaning up any existing actor services...");
let actor_names = vec![
"osis_actor_1",
"sal_actor_1",
"v_actor_1",
"python_actor_1"
];
for actor_name in actor_names {
// Try to stop and delete, but don't fail if they don't exist
info!("Attempting to cleanup actor: {}", actor_name);
match self.stop_and_delete_actor(actor_name).await {
Ok(_) => info!("Successfully cleaned up actor: {}", actor_name),
Err(e) => debug!("Failed to cleanup actor {}: {}", actor_name, e),
}
}
info!("Existing actor cleanup completed");
Ok(())
}
/// Stop and delete a actor service from zinit
async fn stop_and_delete_actor(&self, actor_name: &str) -> Result<(), SupervisorError> {
info!("Starting cleanup for actor: {}", actor_name);
// First try to stop the actor
info!("Attempting to stop actor: {}", actor_name);
if let Err(e) = self.zinit_client.stop(actor_name).await {
debug!("Actor {} was not running or failed to stop: {}", actor_name, e);
} else {
info!("Successfully stopped actor: {}", actor_name);
}
// Then forget the service to stop monitoring it
info!("Attempting to forget actor: {}", actor_name);
if let Err(e) = self.zinit_client.forget(actor_name).await {
info!("Actor {} was not being monitored or failed to forget: {}", actor_name, e);
} else {
info!("Successfully forgot actor service: {}", actor_name);
}
// Finally, delete the service configuration
info!("Attempting to delete service for actor: {}", actor_name);
if let Err(e) = self.zinit_client.delete_service(actor_name).await {
debug!("Actor {} service did not exist or failed to delete: {}", actor_name, e);
} else {
info!("Successfully deleted actor service: {}", actor_name);
}
info!("Completed cleanup for actor: {}", actor_name);
Ok(())
}
/// Get the hardcoded actor queue key for the script type
fn get_actor_queue_key(&self, script_type: &ScriptType) -> String {
format!("{}actor_queue:{}", NAMESPACE_PREFIX, script_type.actor_queue_suffix())
}
pub fn new_job(&self) -> JobBuilder {
JobBuilder::new(self)
}
/// Get WebSocket server configuration from TOML config
pub fn get_websocket_config(&self) -> Result<WebSocketServerConfig, SupervisorError> {
let builder_data = self.builder_data.as_ref().ok_or_else(|| {
SupervisorError::ConfigError("No builder data available for WebSocket config".to_string())
})?;
builder_data.websocket_config.clone().ok_or_else(|| {
SupervisorError::ConfigError("No WebSocket server configuration found in TOML config".to_string())
})
}
/// Extract actor configurations from the supervisor's builder data
pub fn get_actor_configs(&self) -> Result<Vec<ActorConfig>, SupervisorError> {
let builder_data = self.builder_data.as_ref().ok_or_else(|| {
SupervisorError::ConfigError("No builder data available for actor configs".to_string())
})?;
let mut configs = Vec::new();
if let Some(osis_path) = &builder_data.osis_actor {
configs.push(
ActorConfig::new("osis_actor_1".to_string(), PathBuf::from(osis_path), ScriptType::OSIS)
);
}
if let Some(sal_path) = &builder_data.sal_actor {
configs.push(
ActorConfig::new("sal_actor_1".to_string(), PathBuf::from(sal_path), ScriptType::SAL)
);
}
if let Some(v_path) = &builder_data.v_actor {
configs.push(
ActorConfig::new("v_actor_1".to_string(), PathBuf::from(v_path), ScriptType::V)
);
}
if let Some(python_path) = &builder_data.python_actor {
configs.push(
ActorConfig::new("python_actor_1".to_string(), PathBuf::from(python_path), ScriptType::Python)
);
}
Ok(configs)
}
/// Spawn a background lifecycle manager that continuously monitors and maintains actor health
/// Returns a JoinHandle that can be used to stop the lifecycle manager
pub fn spawn_lifecycle_manager(
self: Arc<Self>,
actor_configs: Vec<ActorConfig>,
health_check_interval: Duration,
) -> tokio::task::JoinHandle<Result<(), SupervisorError>> {
let supervisor = self;
tokio::spawn(async move {
info!("Starting background lifecycle manager with {} actors", actor_configs.len());
info!("Health check interval: {:?}", health_check_interval);
// Initial actor startup
info!("Performing initial actor startup...");
if let Err(e) = supervisor.start_actors().await {
error!("Failed to start actors during initialization: {}", e);
return Err(e);
}
// Start the monitoring loop
let mut interval = tokio::time::interval(health_check_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
info!("Running periodic actor health check...");
// Check each actor's health and restart if needed
for actor_config in &actor_configs {
if let Err(e) = supervisor.check_and_restart_actor(actor_config).await {
error!("Failed to check/restart actor {}: {}", actor_config.name, e);
}
}
info!("Health check cycle completed");
}
})
}
/// Check a single actor's health and restart if needed
async fn check_and_restart_actor(&self, actor_config: &ActorConfig) -> Result<(), SupervisorError> {
let actor_name = &actor_config.name;
// Get actor status
match self.zinit_client.status(actor_name).await {
Ok(status) => {
let is_healthy = status.state == "running" && status.pid > 0;
if is_healthy {
debug!("Actor {} is healthy (state: {}, pid: {})", actor_name, status.state, status.pid);
// Optionally send a ping job for deeper health check
if let Err(e) = self.send_ping_job(actor_config.script_type.clone()).await {
warn!("Ping job failed for actor {}: {}", actor_name, e);
// Note: We don't restart on ping failure as it might be temporary
}
} else {
warn!("Actor {} is unhealthy (state: {}, pid: {}), restarting...",
actor_name, status.state, status.pid);
// Attempt to restart the actor
if let Err(e) = self.restart_actor(actor_name).await {
error!("Failed to restart unhealthy actor {}: {}", actor_name, e);
// If restart fails, try a full stop/start cycle
warn!("Attempting full stop/start cycle for actor: {}", actor_name);
if let Err(e) = self.stop_and_delete_actor(actor_name).await {
error!("Failed to stop actor {} during recovery: {}", actor_name, e);
}
if let Err(e) = self.start_actor(actor_config).await {
error!("Failed to start actor {} during recovery: {}", actor_name, e);
return Err(e);
}
info!("Successfully recovered actor: {}", actor_name);
} else {
info!("Successfully restarted actor: {}", actor_name);
}
}
}
Err(e) => {
warn!("Could not get status for actor {} (may not exist): {}", actor_name, e);
// Actor doesn't exist, try to start it
info!("Attempting to start missing actor: {}", actor_name);
if let Err(e) = self.start_actor(actor_config).await {
error!("Failed to start missing actor {}: {}", actor_name, e);
return Err(e);
}
info!("Successfully started missing actor: {}", actor_name);
}
}
Ok(())
}
// Internal helper to submit script details and push to work queue
async fn create_job_using_connection(
&self,
conn: &mut redis::aio::MultiplexedConnection,
job: &Job,
) -> Result<(), SupervisorError> {
debug!(
"Submitting play request: {} for script type: {:?} with namespace prefix: {}",
job.id, job.script_type, NAMESPACE_PREFIX
);
// Use the shared Job struct's Redis storage method
job.store_in_redis(conn).await
.map_err(|e| SupervisorError::InvalidInput(format!("Failed to store job in Redis: {}", e)))?;
Ok(())
}
// Internal helper to submit script details and push to work queue
async fn start_job_using_connection(
&self,
conn: &mut redis::aio::MultiplexedConnection,
job_id: String,
script_type: &ScriptType
) -> Result<(), SupervisorError> {
let actor_queue_key = self.get_actor_queue_key(script_type);
// lpush also infers its types, RV is typically i64 (length of list) or () depending on exact command variant
// For `redis::AsyncCommands::lpush`, it's `RedisResult<R>` where R: FromRedisValue
// Often this is the length of the list. Let's allow inference or specify if needed.
let _: redis::RedisResult<i64> =
conn.lpush(&actor_queue_key, job_id.clone()).await;
Ok(())
}
// Internal helper to await response from actor
async fn await_response_from_connection(
&self,
conn: &mut redis::aio::MultiplexedConnection,
job_key: &String,
reply_queue_key: &String,
timeout: Duration,
) -> Result<String, SupervisorError> {
// BLPOP on the reply queue
// The timeout for BLPOP is in seconds (integer)
let blpop_timeout_secs = timeout.as_secs().max(1); // Ensure at least 1 second for BLPOP timeout
match conn
.blpop::<&String, Option<(String, String)>>(reply_queue_key, blpop_timeout_secs as f64)
.await
{
Ok(Some((_queue, result_message_str))) => {
Ok(result_message_str)
}
Ok(None) => {
// BLPOP timed out
warn!(
"Timeout waiting for result on reply queue {} for job {}",
reply_queue_key, job_key
);
// Optionally, delete the reply queue
let _: redis::RedisResult<i32> = conn.del(&reply_queue_key).await;
Err(SupervisorError::Timeout(job_key.clone()))
}
Err(e) => {
// Redis error
error!(
"Redis error on BLPOP for reply queue {}: {}",
reply_queue_key, e
);
// Optionally, delete the reply queue
let _: redis::RedisResult<i32> = conn.del(&reply_queue_key).await;
Err(SupervisorError::RedisError(e))
}
}
}
// New method using dedicated reply queue
pub async fn create_job(
&self,
job: &Job,
) -> Result<(), SupervisorError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
self.create_job_using_connection(
&mut conn,
&job, // Pass the job_id parameter
)
.await?;
Ok(())
}
// Method to start a previously created job
pub async fn start_job(
&self,
job_id: &str,
) -> Result<(), SupervisorError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
// Load the job to get its script type
let job = Job::load_from_redis(&mut conn, job_id).await?;
self.start_job_using_connection(&mut conn, job_id.to_string(), &job.script_type).await?;
Ok(())
}
// New method using dedicated reply queue with automatic actor selection
pub async fn run_job_and_await_result(
&self,
job: &Job
) -> Result<String, SupervisorError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let reply_queue_key = format!("{}:reply:{}", NAMESPACE_PREFIX, job.id); // Derived from the passed job_id
self.create_job_using_connection(
&mut conn,
&job, // Pass the job_id parameter
)
.await?;
self.start_job_using_connection(&mut conn, job.id.clone(), &job.script_type).await?;
info!(
"Task {} submitted. Waiting for result on queue {} with timeout {:?}...",
job.id, // This is the UUID
reply_queue_key,
job.timeout
);
self.await_response_from_connection(
&mut conn,
&job.id,
&reply_queue_key,
job.timeout,
)
.await
}
// Method to get job status
pub async fn get_job_status(
&self,
job_id: &str,
) -> Result<JobStatus, SupervisorError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let job_key = format!("{}{}", NAMESPACE_PREFIX, job_id);
let result_map: Option<std::collections::HashMap<String, String>> =
conn.hgetall(&job_key).await?;
match result_map {
Some(map) => {
let status_str = map.get("status").cloned().unwrap_or_else(|| {
warn!("Task {}: 'status' field missing from Redis hash, defaulting to empty.", job_id);
String::new()
});
let status = match status_str.as_str() {
"dispatched" => JobStatus::Dispatched,
"started" => JobStatus::Started,
"error" => JobStatus::Error,
"finished" => JobStatus::Finished,
_ => JobStatus::Dispatched, // default
};
Ok(status)
}
None => {
warn!("Job {} not found in Redis", job_id);
Ok(JobStatus::Dispatched) // default for missing jobs
}
}
}
// Method to get job output
pub async fn get_job_output(
&self,
job_id: &str,
) -> Result<Option<String>, SupervisorError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let job_key = format!("{}{}", NAMESPACE_PREFIX, job_id);
let result_map: Option<std::collections::HashMap<String, String>> =
conn.hgetall(&job_key).await?;
match result_map {
Some(map) => {
Ok(map.get("output").cloned())
}
None => {
warn!("Job {} not found in Redis", job_id);
Ok(None)
}
}
}
/// List all jobs in Redis
pub async fn list_jobs(&self) -> Result<Vec<String>, SupervisorError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
// Use the shared Job struct's list method
Job::list_all_job_ids(&mut conn).await
.map_err(|e| SupervisorError::InvalidInput(format!("Failed to list jobs: {}", e)))
}
/// Stop a job by pushing its ID to the stop queue
pub async fn stop_job(&self, job_id: &str) -> Result<(), SupervisorError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
// Get job details to determine script type and appropriate actor
let job_key = format!("{}job:{}", NAMESPACE_PREFIX, job_id);
let job_data: std::collections::HashMap<String, String> = conn.hgetall(&job_key).await?;
if job_data.is_empty() {
return Err(SupervisorError::InvalidInput(format!("Job {} not found", job_id)));
}
// Parse script type from job data
let script_type_str = job_data.get("script_type")
.ok_or_else(|| SupervisorError::InvalidInput("Job missing script_type field".to_string()))?;
let script_type: ScriptType = serde_json::from_str(&format!("\"{}\"", script_type_str))
.map_err(|e| SupervisorError::InvalidInput(format!("Invalid script type: {}", e)))?;
// Use hardcoded stop queue key for this script type
let stop_queue_key = format!("{}stop_queue:{}", NAMESPACE_PREFIX, script_type.actor_queue_suffix());
// Push job ID to the stop queue
conn.lpush::<_, _, ()>(&stop_queue_key, job_id).await?;
info!("Job {} added to stop queue {} for script type {:?}", job_id, stop_queue_key, script_type);
Ok(())
}
/// Get logs for a job by reading from its log file
pub async fn get_job_logs(&self, job_id: &str) -> Result<Option<String>, SupervisorError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let job_key = format!("{}job:{}", NAMESPACE_PREFIX, job_id);
// Get the job data to find the log path
let result_map: Option<std::collections::HashMap<String, String>> =
conn.hgetall(&job_key).await?;
match result_map {
Some(map) => {
if let Some(log_path) = map.get("log_path") {
// Try to read the log file
match std::fs::read_to_string(log_path) {
Ok(contents) => Ok(Some(contents)),
Err(e) => {
warn!("Failed to read log file {}: {}", log_path, e);
Ok(None)
}
}
} else {
// No log path configured for this job
Ok(None)
}
}
None => {
warn!("Job {} not found in Redis", job_id);
Ok(None)
}
}
}
/// Delete a specific job by ID
pub async fn delete_job(&self, job_id: &str) -> Result<(), SupervisorError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
// Use the shared Job struct's delete method
Job::delete_from_redis(&mut conn, job_id).await
.map_err(|e| SupervisorError::InvalidInput(format!("Failed to delete job: {}", e)))?;
info!("Job {} deleted successfully", job_id);
Ok(())
}
/// Clear all jobs from Redis
pub async fn clear_all_jobs(&self) -> Result<usize, SupervisorError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
// Get all job IDs first
let job_ids = Job::list_all_job_ids(&mut conn).await
.map_err(|e| SupervisorError::InvalidInput(format!("Failed to list jobs: {}", e)))?;
let count = job_ids.len();
// Delete each job using the shared method
for job_id in job_ids {
Job::delete_from_redis(&mut conn, &job_id).await
.map_err(|e| SupervisorError::InvalidInput(format!("Failed to delete job {}: {}", job_id, e)))?;
}
Ok(count)
}
/// Check if all prerequisites for a job are completed
pub async fn check_prerequisites_completed(&self, job_id: &str) -> Result<bool, SupervisorError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
// Load the job using the shared Job struct
let job = Job::load_from_redis(&mut conn, job_id).await
.map_err(|e| SupervisorError::InvalidInput(format!("Failed to load job: {}", e)))?;
// Check each prerequisite job status
for prereq_id in &job.prerequisites {
let status = Job::get_status(&mut conn, prereq_id).await
.map_err(|e| SupervisorError::InvalidInput(format!("Failed to get prerequisite status: {}", e)))?;
if status != JobStatus::Finished {
return Ok(false); // Prerequisite not completed
}
}
Ok(true) // All prerequisites completed (or no prerequisites)
}
/// Update job status and check dependent jobs for readiness
pub async fn update_job_status_and_check_dependents(&self, job_id: &str, new_status: JobStatus) -> Result<Vec<String>, SupervisorError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
// Update job status using shared Job method
Job::update_status(&mut conn, job_id, new_status.clone()).await
.map_err(|e| SupervisorError::InvalidInput(format!("Failed to update job status: {}", e)))?;
let mut ready_jobs = Vec::new();
// If job finished, check dependent jobs
if new_status == JobStatus::Finished {
// Load the job to get its dependents
let job = Job::load_from_redis(&mut conn, job_id).await
.map_err(|e| SupervisorError::InvalidInput(format!("Failed to load job: {}", e)))?;
// Check each dependent job
for dependent_id in &job.dependents {
let dependent_status = Job::get_status(&mut conn, dependent_id).await
.map_err(|e| SupervisorError::InvalidInput(format!("Failed to get dependent status: {}", e)))?;
// Only check jobs that are waiting for prerequisites
if dependent_status == JobStatus::WaitingForPrerequisites {
// Check if all prerequisites are now completed
if self.check_prerequisites_completed(dependent_id).await? {
// Update status to dispatched and add to ready jobs
Job::update_status(&mut conn, dependent_id, JobStatus::Dispatched).await
.map_err(|e| SupervisorError::InvalidInput(format!("Failed to update dependent status: {}", e)))?;
ready_jobs.push(dependent_id.clone());
}
}
}
}
Ok(ready_jobs)
}
/// Dispatch jobs that are ready (have all prerequisites completed)
pub async fn dispatch_ready_jobs(&self, ready_job_ids: Vec<String>) -> Result<(), SupervisorError> {
for job_id in ready_job_ids {
// Get job data to determine script type and select actor
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let job_key = format!("{}job:{}", NAMESPACE_PREFIX, job_id);
let job_data: std::collections::HashMap<String, String> = conn.hgetall(&job_key).await?;
if let Some(script_type_str) = job_data.get("script_type") {
// Parse script type (stored as Debug format, e.g., "OSIS")
let script_type = match script_type_str.as_str() {
"OSIS" => ScriptType::OSIS,
"SAL" => ScriptType::SAL,
"V" => ScriptType::V,
"Python" => ScriptType::Python,
_ => return Err(SupervisorError::InvalidInput(format!("Unknown script type: {}", script_type_str))),
};
// Dispatch job using hardcoded queue
self.start_job_using_connection(&mut conn, job_id, &script_type).await?;
}
}
Ok(())
}
}