//! # Asynchronous Worker Implementation //! //! This module provides an asynchronous actor implementation that can process //! multiple jobs concurrently with timeout support. Each job is spawned as a //! separate Tokio task, allowing for parallel execution and proper timeout handling. //! //! ## Features //! //! - **Concurrent Processing**: Multiple jobs can run simultaneously //! - **Timeout Support**: Jobs that exceed their timeout are automatically cancelled //! - **Resource Cleanup**: Proper cleanup of aborted/cancelled jobs //! - **Non-blocking**: Worker continues processing new jobs while others are running //! - **Scalable**: Can handle high job throughput with parallel execution //! //! ## Usage //! //! ```rust //! use std::sync::Arc; //! use std::time::Duration; //! use baobab_actor::async_actor_impl::AsyncWorker; //! use baobab_actor::actor_trait::{spawn_actor, WorkerConfig}; //! use baobab_actor::engine::create_heromodels_engine; //! use tokio::sync::mpsc; //! //! let config = WorkerConfig::new( //! "async_actor_1".to_string(), //! "/path/to/db".to_string(), //! "redis://localhost:6379".to_string(), //! false, // preserve_tasks //! ).with_default_timeout(Duration::from_secs(300)); //! //! let actor = Arc::new(AsyncWorker::new()); //! let engine = create_heromodels_engine(); //! let (shutdown_tx, shutdown_rx) = mpsc::channel(1); //! //! let handle = spawn_actor(actor, config, engine, shutdown_rx); //! //! // Later, shutdown the actor //! shutdown_tx.send(()).await.unwrap(); //! handle.await.unwrap().unwrap(); //! ``` use async_trait::async_trait; use hero_job::{Job, JobStatus}; use log::{debug, error, info, warn}; use rhai::Engine; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::sync::Mutex; use tokio::task::JoinHandle; use tokio::time::timeout; use crate::engine::eval_script; use crate::actor_trait::{Worker, WorkerConfig}; use crate::initialize_redis_connection; /// Represents a running job with its handle and metadata #[derive(Debug)] struct RunningJob { job_id: String, handle: JoinHandle<()>, started_at: std::time::Instant, } /// Builder for AsyncWorker #[derive(Debug, Default)] pub struct AsyncWorkerBuilder { actor_id: Option, db_path: Option, redis_url: Option, default_timeout: Option, } impl AsyncWorkerBuilder { pub fn new() -> Self { Self::default() } pub fn actor_id>(mut self, actor_id: S) -> Self { self.actor_id = Some(actor_id.into()); self } pub fn db_path>(mut self, db_path: S) -> Self { self.db_path = Some(db_path.into()); self } pub fn redis_url>(mut self, redis_url: S) -> Self { self.redis_url = Some(redis_url.into()); self } pub fn default_timeout(mut self, timeout: Duration) -> Self { self.default_timeout = Some(timeout); self } pub fn build(self) -> Result { Ok(AsyncWorker { actor_id: self.actor_id.ok_or("actor_id is required")?, db_path: self.db_path.ok_or("db_path is required")?, redis_url: self.redis_url.ok_or("redis_url is required")?, default_timeout: self.default_timeout.unwrap_or(Duration::from_secs(300)), running_jobs: Arc::new(Mutex::new(HashMap::new())), }) } } /// Asynchronous actor that processes jobs concurrently #[derive(Debug, Clone)] pub struct AsyncWorker { pub actor_id: String, pub db_path: String, pub redis_url: String, pub default_timeout: Duration, running_jobs: Arc>>, } impl AsyncWorker { /// Create a new AsyncWorkerBuilder pub fn builder() -> AsyncWorkerBuilder { AsyncWorkerBuilder::new() } /// Add a running job to the tracking map async fn add_running_job(&self, job_id: String, handle: JoinHandle<()>) { let running_job = RunningJob { job_id: job_id.clone(), handle, started_at: std::time::Instant::now(), }; let mut jobs = self.running_jobs.lock().await; jobs.insert(job_id.clone(), running_job); debug!("Async Worker: Added running job '{}'. Total running: {}", job_id, jobs.len()); } /// Remove a completed job from the tracking map async fn remove_running_job(&self, job_id: &str) { let mut jobs = self.running_jobs.lock().await; if let Some(job) = jobs.remove(job_id) { let duration = job.started_at.elapsed(); debug!("Async Worker: Removed completed job '{}' after {:?}. Remaining: {}", job_id, duration, jobs.len()); } } /// Get the count of currently running jobs pub async fn running_job_count(&self) -> usize { let jobs = self.running_jobs.lock().await; jobs.len() } /// Cleanup any finished jobs from the running jobs map async fn cleanup_finished_jobs(&self) { let mut jobs = self.running_jobs.lock().await; let mut to_remove = Vec::new(); for (job_id, running_job) in jobs.iter() { if running_job.handle.is_finished() { to_remove.push(job_id.clone()); } } for job_id in to_remove { if let Some(job) = jobs.remove(&job_id) { let duration = job.started_at.elapsed(); debug!("Async Worker: Cleaned up finished job '{}' after {:?}", job_id, duration); } } } /// Execute a single job asynchronously with timeout support async fn execute_job_with_timeout( job: Job, engine: Engine, actor_id: String, redis_url: String, job_timeout: Duration, ) { let job_id = job.id.clone(); info!("Async Worker '{}', Job {}: Starting execution with timeout {:?}", actor_id, job_id, job_timeout); // Create a new Redis connection for this job let mut redis_conn = match initialize_redis_connection(&actor_id, &redis_url).await { Ok(conn) => conn, Err(e) => { error!("Async Worker '{}', Job {}: Failed to initialize Redis connection: {}", actor_id, job_id, e); return; } }; // Update job status to Started if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Started).await { error!("Async Worker '{}', Job {}: Failed to update status to Started: {}", actor_id, job_id, e); return; } // Create the script execution task let script_task = async { // Execute the Rhai script match eval_script(&engine, &job.script) { Ok(result) => { let result_str = format!("{:?}", result); info!("Async Worker '{}', Job {}: Script executed successfully. Result: {}", actor_id, job_id, result_str); // Update job with success result if let Err(e) = Job::set_result(&mut redis_conn, &job_id, &result_str).await { error!("Async Worker '{}', Job {}: Failed to set result: {}", actor_id, job_id, e); return; } if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Finished).await { error!("Async Worker '{}', Job {}: Failed to update status to Finished: {}", actor_id, job_id, e); } } Err(e) => { let error_msg = format!("Script execution error: {}", e); error!("Async Worker '{}', Job {}: {}", actor_id, job_id, error_msg); // Update job with error if let Err(e) = Job::set_error(&mut redis_conn, &job_id, &error_msg).await { error!("Async Worker '{}', Job {}: Failed to set error: {}", actor_id, job_id, e); return; } if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Error).await { error!("Async Worker '{}', Job {}: Failed to update status to Error: {}", actor_id, job_id, e); } } } }; // Execute the script with timeout match timeout(job_timeout, script_task).await { Ok(()) => { info!("Async Worker '{}', Job {}: Completed within timeout", actor_id, job_id); } Err(_) => { warn!("Async Worker '{}', Job {}: Timed out after {:?}, marking as error", actor_id, job_id, job_timeout); let timeout_msg = format!("Job timed out after {:?}", job_timeout); if let Err(e) = Job::set_error(&mut redis_conn, &job_id, &timeout_msg).await { error!("Async Worker '{}', Job {}: Failed to set timeout error: {}", actor_id, job_id, e); } if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Error).await { error!("Async Worker '{}', Job {}: Failed to update status to Error after timeout: {}", actor_id, job_id, e); } } } info!("Async Worker '{}', Job {}: Job processing completed", actor_id, job_id); } } impl Default for AsyncWorker { fn default() -> Self { // Default AsyncWorker with placeholder values // In practice, use the builder pattern instead Self { actor_id: "default_async_actor".to_string(), db_path: "/tmp".to_string(), redis_url: "redis://localhost:6379".to_string(), default_timeout: Duration::from_secs(300), running_jobs: Arc::new(Mutex::new(HashMap::new())), } } } #[async_trait] impl Worker for AsyncWorker { async fn process_job( &self, job: Job, engine: &Engine, // Reuse the stateless engine _redis_conn: &mut redis::aio::MultiplexedConnection, ) { let job_id = job.id.clone(); let actor_id = &self.actor_id.clone(); // Determine timeout (use job-specific timeout if available, otherwise default) let job_timeout = if job.timeout.as_secs() > 0 { job.timeout } else { self.default_timeout // Use actor's default timeout }; info!("Async Worker '{}', Job {}: Spawning job execution task with timeout {:?}", actor_id, job_id, job_timeout); // Clone necessary data for the spawned task let job_id_clone = job_id.clone(); let actor_id_clone = actor_id.clone(); let actor_id_debug = actor_id.clone(); // Additional clone for debug statement let job_id_debug = job_id.clone(); // Additional clone for debug statement let redis_url_clone = self.redis_url.clone(); let running_jobs_clone = Arc::clone(&self.running_jobs); // Spawn the job execution task let job_handle = tokio::spawn(async move { Self::execute_job_with_timeout( job, engine, actor_id_clone, redis_url_clone, job_timeout, ).await; // Remove this job from the running jobs map when it completes let mut jobs = running_jobs_clone.lock().await; if let Some(running_job) = jobs.remove(&job_id_clone) { let duration = running_job.started_at.elapsed(); debug!("Async Worker '{}': Removed completed job '{}' after {:?}", actor_id_debug, job_id_debug, duration); } }); // Add the job to the running jobs map self.add_running_job(job_id, job_handle).await; // Cleanup finished jobs periodically self.cleanup_finished_jobs().await; } fn actor_type(&self) -> &'static str { "Async" } fn actor_id(&self) -> &str { &self.actor_id } fn redis_url(&self) -> &str { &self.redis_url } } /// Convenience function to spawn an asynchronous actor using the trait interface /// /// This function provides a clean interface for the new async actor implementation /// with timeout support. pub fn spawn_async_actor( actor_id: String, db_path: String, engine: rhai::Engine, redis_url: String, shutdown_rx: mpsc::Receiver<()>, default_timeout: std::time::Duration, ) -> JoinHandle>> { use std::sync::Arc; let actor = Arc::new( AsyncActor::builder() .actor_id(actor_id) .db_path(db_path) .redis_url(redis_url) .default_timeout(default_timeout) .build() .expect("Failed to build AsyncActor") ); spawn_actor(actor, shutdown_rx) } #[cfg(test)] mod tests { use super::*; use crate::engine::create_heromodels_engine; use hero_job::ScriptType; #[tokio::test] async fn test_async_actor_creation() { let actor = AsyncWorker::new(); assert_eq!(actor.actor_type(), "Async"); assert_eq!(actor.running_job_count().await, 0); } #[tokio::test] async fn test_async_actor_default() { let actor = AsyncWorker::default(); assert_eq!(actor.actor_type(), "Async"); } #[tokio::test] async fn test_async_actor_job_tracking() { let actor = AsyncWorker::new(); // Simulate adding a job let handle = tokio::spawn(async { tokio::time::sleep(Duration::from_millis(100)).await; }); actor.add_running_job("job_1".to_string(), handle).await; assert_eq!(actor.running_job_count().await, 1); // Wait for job to complete tokio::time::sleep(Duration::from_millis(200)).await; actor.cleanup_finished_jobs().await; assert_eq!(actor.running_job_count().await, 0); } #[tokio::test] async fn test_async_actor_process_job_interface() { let actor = AsyncWorker::new(); let engine = create_heromodels_engine(); // Create a simple test job let job = Job::new( "test_caller".to_string(), "test_context".to_string(), r#"print("Hello from async actor test!"); 42"#.to_string(), ScriptType::OSIS, ); let config = WorkerConfig::new( "test_async_actor".to_string(), "/tmp".to_string(), "redis://localhost:6379".to_string(), false, ).with_default_timeout(Duration::from_secs(60)); // Note: This test doesn't actually connect to Redis, it just tests the interface // In a real test environment, you'd need a Redis instance or mock // The process_job method should be callable (interface test) // actor.process_job(job, engine, &mut redis_conn, &config).await; // For now, just verify the actor was created successfully assert_eq!(actor.actor_type(), "Async"); } }