use chrono::Utc; use log::{debug, error, info}; use redis::AsyncCommands; use rhai::{Dynamic, Engine}; use rhai_dispatcher::RhaiTaskDetails; // Import for constructing the reply message use serde_json; use std::collections::HashMap; use tokio::sync::mpsc; // For shutdown signal use tokio::task::JoinHandle; // For serializing the reply message const NAMESPACE_PREFIX: &str = "rhailib:"; const BLPOP_TIMEOUT_SECONDS: usize = 5; // This function updates specific fields in the Redis hash. // It doesn't need to know the full RhaiTaskDetails struct, only the field names. async fn update_task_status_in_redis( conn: &mut redis::aio::MultiplexedConnection, task_id: &str, status: &str, output: Option, error_msg: Option, ) -> redis::RedisResult<()> { let task_key = format!("{}{}", NAMESPACE_PREFIX, task_id); let mut updates: Vec<(&str, String)> = vec![ ("status", status.to_string()), ("updatedAt", Utc::now().timestamp().to_string()), ]; if let Some(out) = output { updates.push(("output", out)); } if let Some(err) = error_msg { updates.push(("error", err)); } debug!( "Updating task {} in Redis with status: {}, updates: {:?}", task_id, status, updates ); conn.hset_multiple::<_, _, _, ()>(&task_key, &updates) .await?; Ok(()) } pub fn spawn_rhai_worker( worker_id: String, db_path: String, mut engine: Engine, redis_url: String, mut shutdown_rx: mpsc::Receiver<()>, // Add shutdown receiver preserve_tasks: bool, // Flag to control task cleanup ) -> JoinHandle>> { tokio::spawn(async move { let queue_key = format!("{}{}", NAMESPACE_PREFIX, worker_id); info!( "Rhai Worker for Worker ID '{}' starting. Connecting to Redis at {}. Listening on queue: {}. Waiting for tasks or shutdown signal.", worker_id, redis_url, queue_key ); let redis_client = match redis::Client::open(redis_url.as_str()) { Ok(client) => client, Err(e) => { error!( "Worker for Worker ID '{}': Failed to open Redis client: {}", worker_id, e ); return Err(Box::new(e) as Box); } }; let mut redis_conn = match redis_client.get_multiplexed_async_connection().await { Ok(conn) => conn, Err(e) => { error!( "Worker for Worker ID '{}': Failed to get Redis connection: {}", worker_id, e ); return Err(Box::new(e) as Box); } }; info!( "Worker for Worker ID '{}' successfully connected to Redis.", worker_id ); loop { let blpop_keys = vec![queue_key.clone()]; tokio::select! { // Listen for shutdown signal _ = shutdown_rx.recv() => { info!("Worker for Worker ID '{}': Shutdown signal received. Terminating loop.", worker_id.clone()); break; } // Listen for tasks from Redis blpop_result = redis_conn.blpop(&blpop_keys, BLPOP_TIMEOUT_SECONDS as f64) => { debug!("Worker for Worker ID '{}': Attempting BLPOP on queue: {}", worker_id.clone(), queue_key); let response: Option<(String, String)> = match blpop_result { Ok(resp) => resp, Err(e) => { error!("Worker '{}': Redis BLPOP error on queue {}: {}. Worker for this circle might stop.", worker_id, queue_key, e); return Err(Box::new(e) as Box); } }; if let Some((_queue_name_recv, task_id)) = response { info!("Worker '{}' received task_id: {} from queue: {}", worker_id, task_id, _queue_name_recv); debug!("Worker '{}', Task {}: Processing started.", worker_id, task_id); let task_details_key = format!("{}{}", NAMESPACE_PREFIX, task_id); debug!("Worker '{}', Task {}: Attempting HGETALL from key: {}", worker_id, task_id, task_details_key); let task_details_map_result: Result, _> = redis_conn.hgetall(&task_details_key).await; match task_details_map_result { Ok(details_map) => { debug!("Worker '{}', Task {}: HGETALL successful. Details: {:?}", worker_id, task_id, details_map); let script_content_opt = details_map.get("script").cloned(); let created_at_str_opt = details_map.get("createdAt").cloned(); let caller_id = details_map.get("callerId").cloned().expect("callerId field missing from Redis hash"); let context_id = details_map.get("contextId").cloned().expect("contextId field missing from Redis hash"); if context_id.is_empty() { error!("Worker '{}', Task {}: contextId field missing from Redis hash", worker_id, task_id); return Err("contextId field missing from Redis hash".into()); } if caller_id.is_empty() { error!("Worker '{}', Task {}: callerId field missing from Redis hash", worker_id, task_id); return Err("callerId field missing from Redis hash".into()); } if let Some(script_content) = script_content_opt { info!("Worker '{}' processing task_id: {}. Script: {:.50}...", context_id, task_id, script_content); debug!("Worker for Context ID '{}', Task {}: Attempting to update status to 'processing'.", context_id, task_id); if let Err(e) = update_task_status_in_redis(&mut redis_conn, &task_id, "processing", None, None).await { error!("Worker for Context ID '{}', Task {}: Failed to update status to 'processing': {}", context_id, task_id, e); } else { debug!("Worker for Context ID '{}', Task {}: Status updated to 'processing'.", context_id, task_id); } let mut db_config = rhai::Map::new(); db_config.insert("DB_PATH".into(), db_path.clone().into()); db_config.insert("CALLER_ID".into(), caller_id.clone().into()); db_config.insert("CONTEXT_ID".into(), context_id.clone().into()); engine.set_default_tag(Dynamic::from(db_config)); // Or pass via CallFnOptions debug!("Worker for Context ID '{}', Task {}: Evaluating script with Rhai engine.", context_id, task_id); let mut final_status = "error".to_string(); // Default to error let mut final_output: Option = None; let mut final_error_msg: Option = None; match engine.eval::(&script_content) { Ok(result) => { let output_str = if result.is::() { // If the result is a string, we can unwrap it directly. // This moves `result`, which is fine because it's the last time we use it in this branch. result.into_string().unwrap() } else { result.to_string() }; info!("Worker for Context ID '{}' task {} completed. Output: {}", context_id, task_id, output_str); final_status = "completed".to_string(); final_output = Some(output_str); } Err(e) => { let error_str = format!("{:?}", *e); error!("Worker for Context ID '{}' task {} script evaluation failed. Error: {}", context_id, task_id, error_str); final_error_msg = Some(error_str); // final_status remains "error" } } debug!("Worker for Context ID '{}', Task {}: Attempting to update status to '{}'.", context_id, task_id, final_status); if let Err(e) = update_task_status_in_redis( &mut redis_conn, &task_id, &final_status, final_output.clone(), // Clone for task hash update final_error_msg.clone(), // Clone for task hash update ).await { error!("Worker for Context ID '{}', Task {}: Failed to update final status to '{}': {}", context_id, task_id, final_status, e); } else { debug!("Worker for Context ID '{}', Task {}: Final status updated to '{}'.", context_id, task_id, final_status); } // Send to reply queue if specified let created_at = created_at_str_opt .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok()) .map(|dt| dt.with_timezone(&Utc)) .unwrap_or_else(Utc::now); // Fallback, though createdAt should exist let reply_details = RhaiTaskDetails { task_id: task_id.to_string(), // Add the task_id script: script_content.clone(), // Include script for context in reply status: final_status, // The final status output: final_output, // The final output error: final_error_msg, // The final error created_at, // Original creation time updated_at: Utc::now(), // Time of this final update/reply caller_id: caller_id.clone(), context_id: context_id.clone(), worker_id: worker_id.clone(), }; let reply_queue_key = format!("{}:reply:{}", NAMESPACE_PREFIX, task_id); match serde_json::to_string(&reply_details) { Ok(reply_json) => { let lpush_result: redis::RedisResult = redis_conn.lpush(&reply_queue_key, &reply_json).await; match lpush_result { Ok(_) => debug!("Worker for Context ID '{}', Task {}: Successfully sent result to reply queue {}", context_id, task_id, reply_queue_key), Err(e_lpush) => error!("Worker for Context ID '{}', Task {}: Failed to LPUSH result to reply queue {}: {}", context_id, task_id, reply_queue_key, e_lpush), } } Err(e_json) => { error!("Worker for Context ID '{}', Task {}: Failed to serialize reply details for queue {}: {}", context_id, task_id, reply_queue_key, e_json); } } // Clean up task details based on preserve_tasks flag if !preserve_tasks { // The worker is responsible for cleaning up the task details hash. if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await { error!("Worker for Context ID '{}', Task {}: Failed to delete task details key '{}': {}", context_id, task_id, task_details_key, e); } else { debug!("Worker for Context ID '{}', Task {}: Cleaned up task details key '{}'.", context_id, task_id, task_details_key); } } else { debug!("Worker for Context ID '{}', Task {}: Preserving task details (preserve_tasks=true)", context_id, task_id); } } else { // Script content not found in hash error!( "Worker for Context ID '{}', Task {}: Script content not found in Redis hash. Details map: {:?}", context_id, task_id, details_map ); // Clean up invalid task details based on preserve_tasks flag if !preserve_tasks { // Even if the script is not found, the worker should clean up the invalid task hash. if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await { error!("Worker for Context ID '{}', Task {}: Failed to delete invalid task details key '{}': {}", context_id, task_id, task_details_key, e); } } else { debug!("Worker for Context ID '{}', Task {}: Preserving invalid task details (preserve_tasks=true)", context_id, task_id); } } } Err(e) => { error!( "Worker '{}', Task {}: Failed to fetch details (HGETALL) from Redis for key {}. Error: {:?}", worker_id, task_id, task_details_key, e ); } } } else { debug!("Worker '{}': BLPOP timed out on queue {}. No new tasks. Checking for shutdown signal again.", &worker_id, &queue_key); } } // End of blpop_result match } // End of tokio::select! } // End of loop info!("Worker '{}' has shut down.", worker_id); Ok(()) }) }