use log::{debug, error, info, warn}; use redis::AsyncCommands; use std::time::Duration; use hero_job::NAMESPACE_PREFIX; mod job; mod error; pub use crate::error::DispatcherError; pub use crate::job::JobBuilder; // Re-export types from hero_job for public API pub use hero_job::{Job, JobStatus, ScriptType}; pub struct Dispatcher { redis_client: redis::Client, caller_id: String, context_id: String, heroscript_workers: Vec, rhai_sal_workers: Vec, rhai_dsl_workers: Vec, } pub struct DispatcherBuilder { redis_url: Option, caller_id: Option, context_id: Option, heroscript_workers: Vec, rhai_sal_workers: Vec, rhai_dsl_workers: Vec, } impl DispatcherBuilder { pub fn new() -> Self { Self { redis_url: None, caller_id: Some("default_caller".to_string()), context_id: Some("default_context".to_string()), heroscript_workers: Vec::new(), rhai_sal_workers: Vec::new(), rhai_dsl_workers: Vec::new(), } } pub fn caller_id(mut self, caller_id: &str) -> Self { self.caller_id = Some(caller_id.to_string()); self } pub fn context_id(mut self, context_id: &str) -> Self { self.context_id = Some(context_id.to_string()); self } pub fn heroscript_workers(mut self, workers: Vec) -> Self { self.heroscript_workers = workers; self } pub fn rhai_sal_workers(mut self, workers: Vec) -> Self { self.rhai_sal_workers = workers; self } pub fn rhai_dsl_workers(mut self, workers: Vec) -> Self { self.rhai_dsl_workers = workers; self } pub fn redis_url(mut self, url: &str) -> Self { self.redis_url = Some(url.to_string()); self } /// Builds the final `Dispatcher` instance. /// /// This method validates the configuration and creates the Redis client. /// It will return an error if the caller ID is empty or if the Redis /// connection cannot be established. /// /// # Returns /// /// * `Ok(Dispatcher)` - Successfully configured client /// * `Err(DispatcherError)` - Configuration or connection error pub fn build(self) -> Result { let url = self .redis_url .unwrap_or_else(|| "redis://127.0.0.1/".to_string()); let client = redis::Client::open(url)?; Ok(Dispatcher { redis_client: client, caller_id: self.caller_id.unwrap_or_else(|| "default_caller".to_string()), context_id: self.context_id.unwrap_or_else(|| "default_context".to_string()), heroscript_workers: self.heroscript_workers, rhai_sal_workers: self.rhai_sal_workers, rhai_dsl_workers: self.rhai_dsl_workers, }) } } impl Dispatcher { /// Select a worker ID based on the script type using round-robin or first available fn select_worker_for_script_type(&self, script_type: &ScriptType) -> Result { let workers = match script_type { ScriptType::HeroScript => &self.heroscript_workers, ScriptType::RhaiSAL => &self.rhai_sal_workers, ScriptType::RhaiDSL => &self.rhai_dsl_workers, }; if workers.is_empty() { return Err(DispatcherError::InvalidInput(format!( "No workers configured for script type: {:?}", script_type ))); } // For now, use simple round-robin by selecting first available worker // TODO: Implement proper load balancing Ok(workers[0].clone()) } pub fn new_job(&self) -> JobBuilder { JobBuilder::new(self) } // Internal helper to submit script details and push to work queue async fn create_job_using_connection( &self, conn: &mut redis::aio::MultiplexedConnection, job: &Job, ) -> Result<(), DispatcherError> { debug!( "Submitting play request: {} for script type: {:?} with namespace prefix: {}", job.id, job.script_type, NAMESPACE_PREFIX ); // Use the shared Job struct's Redis storage method job.store_in_redis(conn).await .map_err(|e| DispatcherError::InvalidInput(format!("Failed to store job in Redis: {}", e)))?; Ok(()) } // Internal helper to submit script details and push to work queue async fn start_job_using_connection( &self, conn: &mut redis::aio::MultiplexedConnection, job_id: String, worker_id: String ) -> Result<(), DispatcherError> { let worker_queue_key = format!( "{}{}", NAMESPACE_PREFIX, worker_id.replace(" ", "_").to_lowercase() ); // lpush also infers its types, RV is typically i64 (length of list) or () depending on exact command variant // For `redis::AsyncCommands::lpush`, it's `RedisResult` where R: FromRedisValue // Often this is the length of the list. Let's allow inference or specify if needed. let _: redis::RedisResult = conn.lpush(&worker_queue_key, job_id.clone()).await; Ok(()) } // Internal helper to await response from worker async fn await_response_from_connection( &self, conn: &mut redis::aio::MultiplexedConnection, job_key: &String, reply_queue_key: &String, timeout: Duration, ) -> Result { // BLPOP on the reply queue // The timeout for BLPOP is in seconds (integer) let blpop_timeout_secs = timeout.as_secs().max(1); // Ensure at least 1 second for BLPOP timeout match conn .blpop::<&String, Option<(String, String)>>(reply_queue_key, blpop_timeout_secs as f64) .await { Ok(Some((_queue, result_message_str))) => { Ok(result_message_str) } Ok(None) => { // BLPOP timed out warn!( "Timeout waiting for result on reply queue {} for job {}", reply_queue_key, job_key ); // Optionally, delete the reply queue let _: redis::RedisResult = conn.del(&reply_queue_key).await; Err(DispatcherError::Timeout(job_key.clone())) } Err(e) => { // Redis error error!( "Redis error on BLPOP for reply queue {}: {}", reply_queue_key, e ); // Optionally, delete the reply queue let _: redis::RedisResult = conn.del(&reply_queue_key).await; Err(DispatcherError::RedisError(e)) } } } // New method using dedicated reply queue pub async fn create_job( &self, job: &Job, ) -> Result<(), DispatcherError> { let mut conn = self.redis_client.get_multiplexed_async_connection().await?; self.create_job_using_connection( &mut conn, &job, // Pass the job_id parameter ) .await?; Ok(()) } // New method using dedicated reply queue with automatic worker selection pub async fn run_job_and_await_result( &self, job: &Job ) -> Result { // Select worker based on script type let worker_id = self.select_worker_for_script_type(&job.script_type)?; let mut conn = self.redis_client.get_multiplexed_async_connection().await?; let reply_queue_key = format!("{}:reply:{}", NAMESPACE_PREFIX, job.id); // Derived from the passed job_id self.create_job_using_connection( &mut conn, &job, // Pass the job_id parameter ) .await?; self.start_job_using_connection(&mut conn, job.id.clone(), worker_id).await?; info!( "Task {} submitted. Waiting for result on queue {} with timeout {:?}...", job.id, // This is the UUID reply_queue_key, job.timeout ); self.await_response_from_connection( &mut conn, &job.id, &reply_queue_key, job.timeout, ) .await } // Method to get job status pub async fn get_job_status( &self, job_id: &str, ) -> Result { let mut conn = self.redis_client.get_multiplexed_async_connection().await?; let job_key = format!("{}{}", NAMESPACE_PREFIX, job_id); let result_map: Option> = conn.hgetall(&job_key).await?; match result_map { Some(map) => { let status_str = map.get("status").cloned().unwrap_or_else(|| { warn!("Task {}: 'status' field missing from Redis hash, defaulting to empty.", job_id); String::new() }); let status = match status_str.as_str() { "dispatched" => JobStatus::Dispatched, "started" => JobStatus::Started, "error" => JobStatus::Error, "finished" => JobStatus::Finished, _ => JobStatus::Dispatched, // default }; Ok(status) } None => { warn!("Job {} not found in Redis", job_id); Ok(JobStatus::Dispatched) // default for missing jobs } } } // Method to get job output pub async fn get_job_output( &self, job_id: &str, ) -> Result, DispatcherError> { let mut conn = self.redis_client.get_multiplexed_async_connection().await?; let job_key = format!("{}{}", NAMESPACE_PREFIX, job_id); let result_map: Option> = conn.hgetall(&job_key).await?; match result_map { Some(map) => { Ok(map.get("output").cloned()) } None => { warn!("Job {} not found in Redis", job_id); Ok(None) } } } /// List all jobs in Redis pub async fn list_jobs(&self) -> Result, DispatcherError> { let mut conn = self.redis_client.get_multiplexed_async_connection().await?; // Use the shared Job struct's list method Job::list_all_job_ids(&mut conn).await .map_err(|e| DispatcherError::InvalidInput(format!("Failed to list jobs: {}", e))) } /// Stop a job by pushing its ID to the stop queue pub async fn stop_job(&self, job_id: &str) -> Result<(), DispatcherError> { let mut conn = self.redis_client.get_multiplexed_async_connection().await?; // Get job details to determine script type and appropriate worker let job_key = format!("{}job:{}", NAMESPACE_PREFIX, job_id); let job_data: std::collections::HashMap = conn.hgetall(&job_key).await?; if job_data.is_empty() { return Err(DispatcherError::InvalidInput(format!("Job {} not found", job_id))); } // Parse script type from job data let script_type_str = job_data.get("script_type") .ok_or_else(|| DispatcherError::InvalidInput("Job missing script_type field".to_string()))?; let script_type: ScriptType = serde_json::from_str(&format!("\"{}\"", script_type_str)) .map_err(|e| DispatcherError::InvalidInput(format!("Invalid script type: {}", e)))?; // Select appropriate worker for this script type let worker_id = self.select_worker_for_script_type(&script_type)?; let stop_queue_key = format!("{}stop_queue:{}", NAMESPACE_PREFIX, worker_id); // Push job ID to the stop queue conn.lpush::<_, _, ()>(&stop_queue_key, job_id).await?; info!("Job {} added to stop queue {} for script type {:?}", job_id, stop_queue_key, script_type); Ok(()) } /// Get logs for a job by reading from its log file pub async fn get_job_logs(&self, job_id: &str) -> Result, DispatcherError> { let mut conn = self.redis_client.get_multiplexed_async_connection().await?; let job_key = format!("{}job:{}", NAMESPACE_PREFIX, job_id); // Get the job data to find the log path let result_map: Option> = conn.hgetall(&job_key).await?; match result_map { Some(map) => { if let Some(log_path) = map.get("log_path") { // Try to read the log file match std::fs::read_to_string(log_path) { Ok(contents) => Ok(Some(contents)), Err(e) => { warn!("Failed to read log file {}: {}", log_path, e); Ok(None) } } } else { // No log path configured for this job Ok(None) } } None => { warn!("Job {} not found in Redis", job_id); Ok(None) } } } /// Delete a specific job by ID pub async fn delete_job(&self, job_id: &str) -> Result<(), DispatcherError> { let mut conn = self.redis_client.get_multiplexed_async_connection().await?; // Use the shared Job struct's delete method Job::delete_from_redis(&mut conn, job_id).await .map_err(|e| DispatcherError::InvalidInput(format!("Failed to delete job: {}", e)))?; info!("Job {} deleted successfully", job_id); Ok(()) } /// Clear all jobs from Redis pub async fn clear_all_jobs(&self) -> Result { let mut conn = self.redis_client.get_multiplexed_async_connection().await?; // Get all job IDs first let job_ids = Job::list_all_job_ids(&mut conn).await .map_err(|e| DispatcherError::InvalidInput(format!("Failed to list jobs: {}", e)))?; let count = job_ids.len(); // Delete each job using the shared method for job_id in job_ids { Job::delete_from_redis(&mut conn, &job_id).await .map_err(|e| DispatcherError::InvalidInput(format!("Failed to delete job {}: {}", job_id, e)))?; } Ok(count) } /// Check if all prerequisites for a job are completed pub async fn check_prerequisites_completed(&self, job_id: &str) -> Result { let mut conn = self.redis_client.get_multiplexed_async_connection().await?; // Load the job using the shared Job struct let job = Job::load_from_redis(&mut conn, job_id).await .map_err(|e| DispatcherError::InvalidInput(format!("Failed to load job: {}", e)))?; // Check each prerequisite job status for prereq_id in &job.prerequisites { let status = Job::get_status(&mut conn, prereq_id).await .map_err(|e| DispatcherError::InvalidInput(format!("Failed to get prerequisite status: {}", e)))?; if status != JobStatus::Finished { return Ok(false); // Prerequisite not completed } } Ok(true) // All prerequisites completed (or no prerequisites) } /// Update job status and check dependent jobs for readiness pub async fn update_job_status_and_check_dependents(&self, job_id: &str, new_status: JobStatus) -> Result, DispatcherError> { let mut conn = self.redis_client.get_multiplexed_async_connection().await?; // Update job status using shared Job method Job::update_status(&mut conn, job_id, new_status.clone()).await .map_err(|e| DispatcherError::InvalidInput(format!("Failed to update job status: {}", e)))?; let mut ready_jobs = Vec::new(); // If job finished, check dependent jobs if new_status == JobStatus::Finished { // Load the job to get its dependents let job = Job::load_from_redis(&mut conn, job_id).await .map_err(|e| DispatcherError::InvalidInput(format!("Failed to load job: {}", e)))?; // Check each dependent job for dependent_id in &job.dependents { let dependent_status = Job::get_status(&mut conn, dependent_id).await .map_err(|e| DispatcherError::InvalidInput(format!("Failed to get dependent status: {}", e)))?; // Only check jobs that are waiting for prerequisites if dependent_status == JobStatus::WaitingForPrerequisites { // Check if all prerequisites are now completed if self.check_prerequisites_completed(dependent_id).await? { // Update status to dispatched and add to ready jobs Job::update_status(&mut conn, dependent_id, JobStatus::Dispatched).await .map_err(|e| DispatcherError::InvalidInput(format!("Failed to update dependent status: {}", e)))?; ready_jobs.push(dependent_id.clone()); } } } } Ok(ready_jobs) } /// Dispatch jobs that are ready (have all prerequisites completed) pub async fn dispatch_ready_jobs(&self, ready_job_ids: Vec) -> Result<(), DispatcherError> { for job_id in ready_job_ids { // Get job data to determine script type and select worker let mut conn = self.redis_client.get_multiplexed_async_connection().await?; let job_key = format!("{}job:{}", NAMESPACE_PREFIX, job_id); let job_data: std::collections::HashMap = conn.hgetall(&job_key).await?; if let Some(script_type_str) = job_data.get("script_type") { // Parse script type (stored as Debug format, e.g., "HeroScript") let script_type = match script_type_str.as_str() { "HeroScript" => ScriptType::HeroScript, "RhaiSAL" => ScriptType::RhaiSAL, "RhaiDSL" => ScriptType::RhaiDSL, _ => return Err(DispatcherError::InvalidInput(format!("Unknown script type: {}", script_type_str))), }; // Select worker and dispatch job let worker_id = self.select_worker_for_script_type(&script_type)?; self.start_job_using_connection(&mut conn, job_id, worker_id).await?; } } Ok(()) } }