//! Job client implementation for managing jobs in Redis use chrono::Utc; use redis::AsyncCommands; use crate::job::{Job, JobStatus, JobError}; /// Client for managing jobs in Redis #[derive(Debug, Clone)] pub struct Client { redis_client: redis::Client, namespace: String, } pub struct ClientBuilder { /// Redis URL for connection redis_url: String, /// Namespace for queue keys namespace: String, } impl ClientBuilder { /// Create a new client builder pub fn new() -> Self { Self { redis_url: "redis://localhost:6379".to_string(), namespace: "".to_string(), } } /// Set the Redis URL pub fn redis_url>(mut self, url: S) -> Self { self.redis_url = url.into(); self } /// Set the namespace for queue keys pub fn namespace>(mut self, namespace: S) -> Self { self.namespace = namespace.into(); self } /// Build the client pub async fn build(self) -> Result { // Create Redis client let redis_client = redis::Client::open(self.redis_url.as_str()) .map_err(|e| JobError::Redis(e))?; Ok(Client { redis_client, namespace: self.namespace, }) } } impl Default for Client { fn default() -> Self { // Note: Default implementation creates an empty client // Use Client::builder() for proper initialization Self { redis_client: redis::Client::open("redis://localhost:6379").unwrap(), namespace: "".to_string(), } } } impl Client { /// Create a new client builder pub fn builder() -> ClientBuilder { ClientBuilder::new() } /// List all job IDs from Redis pub async fn list_jobs(&self) -> Result, JobError> { let mut conn = self.redis_client .get_multiplexed_async_connection() .await .map_err(|e| JobError::Redis(e))?; let keys: Vec = conn.keys(format!("{}:*", &self.jobs_key())).await .map_err(|e| JobError::Redis(e))?; let job_ids: Vec = keys .into_iter() .filter_map(|key| { if key.starts_with(&format!("{}:", self.jobs_key())) { key.strip_prefix(&format!("{}:", self.jobs_key())) .map(|s| s.to_string()) } else { None } }) .collect(); Ok(job_ids) } fn jobs_key(&self) -> String { if self.namespace.is_empty() { format!("job") } else { format!("{}:job", self.namespace) } } pub fn job_key(&self, job_id: &str) -> String { if self.namespace.is_empty() { format!("job:{}", job_id) } else { format!("{}:job:{}", self.namespace, job_id) } } pub fn job_reply_key(&self, job_id: &str) -> String { if self.namespace.is_empty() { format!("reply:{}", job_id) } else { format!("{}:reply:{}", self.namespace, job_id) } } pub fn runner_key(&self, runner_name: &str) -> String { if self.namespace.is_empty() { format!("runner:{}", runner_name) } else { format!("{}:runner:{}", self.namespace, runner_name) } } /// Set job error in Redis pub async fn set_error(&self, job_id: &str, error: &str, ) -> Result<(), JobError> { let job_key = self.job_key(job_id); let now = Utc::now(); let mut conn = self.redis_client .get_multiplexed_async_connection() .await .map_err(|e| JobError::Redis(e))?; let _: () = conn.hset_multiple(&job_key, &[ ("error", error), ("status", JobStatus::Error.as_str()), ("updated_at", &now.to_rfc3339()), ]).await .map_err(|e| JobError::Redis(e))?; Ok(()) } /// Set job status in Redis pub async fn set_job_status(&self, job_id: &str, status: JobStatus, ) -> Result<(), JobError> { let job_key = self.job_key(job_id); let now = Utc::now(); let mut conn = self.redis_client .get_multiplexed_async_connection() .await .map_err(|e| JobError::Redis(e))?; let _: () = conn.hset_multiple(&job_key, &[ ("status", status.as_str()), ("updated_at", &now.to_rfc3339()), ]).await .map_err(|e| JobError::Redis(e))?; Ok(()) } /// Get job status from Redis pub async fn get_status( &self, job_id: &str, ) -> Result { let mut conn = self.redis_client .get_multiplexed_async_connection() .await .map_err(|e| JobError::Redis(e))?; let status_str: Option = conn.hget(&self.job_key(job_id), "status").await .map_err(|e| JobError::Redis(e))?; match status_str { Some(s) => JobStatus::from_str(&s).ok_or_else(|| JobError::InvalidStatus(s)), None => Err(JobError::NotFound(job_id.to_string())), } } /// Delete job from Redis pub async fn delete_from_redis( &self, job_id: &str, ) -> Result<(), JobError> { let mut conn = self.redis_client .get_multiplexed_async_connection() .await .map_err(|e| JobError::Redis(e))?; let job_key = self.job_key(job_id); let _: () = conn.del(&job_key).await .map_err(|e| JobError::Redis(e))?; Ok(()) } /// Store this job in Redis with the specified status pub async fn store_job_in_redis_with_status(&self, job: &Job, status: JobStatus) -> Result<(), JobError> { let mut conn = self.redis_client .get_multiplexed_async_connection() .await .map_err(|e| JobError::Redis(e))?; let job_key = self.job_key(&job.id); // Serialize the job data let job_data = serde_json::to_string(job) .map_err(|e| JobError::Serialization(e))?; // Store job data in Redis hash let _: () = conn.hset_multiple(&job_key, &[ ("data", job_data), ("status", status.as_str().to_string()), ("created_at", job.created_at.to_rfc3339()), ("updated_at", job.updated_at.to_rfc3339()), ]).await .map_err(|e| JobError::Redis(e))?; // Set TTL for the job (24 hours) let _: () = conn.expire(&job_key, 86400).await .map_err(|e| JobError::Redis(e))?; Ok(()) } /// Store this job in Redis (defaults to Dispatched status for backwards compatibility) pub async fn store_job_in_redis(&self, job: &Job) -> Result<(), JobError> { self.store_job_in_redis_with_status(job, JobStatus::Dispatched).await } /// Load a job from Redis by ID pub async fn load_job_from_redis( &self, job_id: &str, ) -> Result { let job_key = self.job_key(job_id); let mut conn = self.redis_client .get_multiplexed_async_connection() .await .map_err(|e| JobError::Redis(e))?; // Get job data from Redis let job_data: Option = conn.hget(&job_key, "data").await .map_err(|e| JobError::Redis(e))?; match job_data { Some(data) => { let job: Job = serde_json::from_str(&data) .map_err(|e| JobError::Serialization(e))?; Ok(job) } None => Err(JobError::NotFound(job_id.to_string())), } } /// Delete a job by ID pub async fn delete_job(&mut self, job_id: &str) -> Result<(), JobError> { let mut conn = self.redis_client.get_multiplexed_async_connection().await .map_err(|e| JobError::Redis(e))?; let job_key = self.job_key(job_id); let deleted_count: i32 = conn.del(&job_key).await .map_err(|e| JobError::Redis(e))?; if deleted_count == 0 { return Err(JobError::NotFound(job_id.to_string())); } Ok(()) } /// Set job result in Redis pub async fn set_result( &self, job_id: &str, result: &str, ) -> Result<(), JobError> { let job_key = self.job_key(&job_id); let now = Utc::now(); let mut conn = self.redis_client .get_multiplexed_async_connection() .await .map_err(|e| JobError::Redis(e))?; let _: () = conn.hset_multiple(&job_key, &[ ("result", result), ("status", JobStatus::Finished.as_str()), ("updated_at", &now.to_rfc3339()), ]).await .map_err(|e| JobError::Redis(e))?; Ok(()) } /// Get job result from Redis pub async fn get_result( &self, job_id: &str, ) -> Result, JobError> { let job_key = self.job_key(job_id); let mut conn = self.redis_client .get_multiplexed_async_connection() .await .map_err(|e| JobError::Redis(e))?; let result: Option = conn.hget(&job_key, "result").await .map_err(|e| JobError::Redis(e))?; Ok(result) } /// Get job result from Redis pub async fn get_error( &self, job_id: &str, ) -> Result, JobError> { let job_key = self.job_key(job_id); let mut conn = self.redis_client .get_multiplexed_async_connection() .await .map_err(|e| JobError::Redis(e))?; let result: Option = conn.hget(&job_key, "error").await .map_err(|e| JobError::Redis(e))?; Ok(result) } /// Get a job ID from the work queue (blocking pop) pub async fn get_job_id(&self, queue_key: &str) -> Result, JobError> { let mut conn = self.redis_client .get_multiplexed_async_connection() .await .map_err(|e| JobError::Redis(e))?; // Use BRPOP with a short timeout to avoid blocking indefinitely let result: Option<(String, String)> = conn.brpop(queue_key, 1.0).await .map_err(|e| JobError::Redis(e))?; Ok(result.map(|(_, job_id)| job_id)) } /// Get a job by ID (alias for load_job_from_redis) pub async fn get_job(&self, job_id: &str) -> Result { self.load_job_from_redis(job_id).await } /// Dispatch a job to a runner's queue pub async fn dispatch_job(&self, job_id: &str, runner_name: &str) -> Result<(), JobError> { let mut conn = self.redis_client .get_multiplexed_async_connection() .await .map_err(|e| JobError::Redis(e))?; let queue_key = self.runner_key(runner_name); // Push job ID to the runner's queue (LPUSH for FIFO with BRPOP) let _: () = conn.lpush(&queue_key, job_id).await .map_err(|e| JobError::Redis(e))?; Ok(()) } /// Run a job: dispatch it, wait for completion, and return the result /// /// This is a convenience method that: /// 1. Stores the job in Redis /// 2. Dispatches it to the runner's queue /// 3. Waits for the job to complete (polls status) /// 4. Returns the result or error /// /// # Arguments /// * `job` - The job to run /// * `runner_name` - The name of the runner to dispatch to /// * `timeout_secs` - Maximum time to wait for job completion (in seconds) /// /// # Returns /// * `Ok(String)` - The job result if successful /// * `Err(JobError)` - If the job fails, times out, or encounters an error pub async fn run_job( &self, job: &crate::job::Job, runner_name: &str, timeout_secs: u64, ) -> Result { use tokio::time::{Duration, timeout}; // Store the job in Redis self.store_job_in_redis(job).await?; // Dispatch to runner queue self.dispatch_job(&job.id, runner_name).await?; // Wait for job to complete with timeout let result = timeout( Duration::from_secs(timeout_secs), self.wait_for_job_completion(&job.id) ).await; match result { Ok(Ok(job_result)) => Ok(job_result), Ok(Err(e)) => Err(e), Err(_) => Err(JobError::Timeout(format!( "Job {} did not complete within {} seconds", job.id, timeout_secs ))), } } /// Wait for a job to complete by polling its status /// /// This polls the job status every 500ms until it reaches a terminal state /// (Finished or Error), then returns the result or error. async fn wait_for_job_completion(&self, job_id: &str) -> Result { use tokio::time::{sleep, Duration}; loop { // Check job status let status = self.get_status(job_id).await?; match status { JobStatus::Finished => { // Job completed successfully, get the result let result = self.get_result(job_id).await?; return result.ok_or_else(|| { JobError::InvalidData(format!("Job {} finished but has no result", job_id)) }); } JobStatus::Error => { // Job failed, get the error message let mut conn = self.redis_client .get_multiplexed_async_connection() .await .map_err(|e| JobError::Redis(e))?; let error_msg: Option = conn .hget(&self.job_key(job_id), "error") .await .map_err(|e| JobError::Redis(e))?; return Err(JobError::InvalidData( error_msg.unwrap_or_else(|| format!("Job {} failed with unknown error", job_id)) )); } JobStatus::Stopping => { return Err(JobError::InvalidData(format!("Job {} was stopped", job_id))); } // Job is still running (Dispatched, WaitingForPrerequisites, Started) _ => { // Wait before polling again sleep(Duration::from_millis(500)).await; } } } } }