From f0888e6f04cad999539cd431ad7eff0624871295 Mon Sep 17 00:00:00 2001 From: Timur Gordon <31495328+timurgordon@users.noreply.github.com> Date: Thu, 7 Aug 2025 10:11:41 +0200 Subject: [PATCH] initial commit --- cmd/sal.rs | 302 ++++++++++++++++++++++++++++++++++ src/engine.rs | 187 +++++++++++++++++++++ src/lib.rs | 448 ++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 937 insertions(+) create mode 100644 cmd/sal.rs create mode 100644 src/engine.rs create mode 100644 src/lib.rs diff --git a/cmd/sal.rs b/cmd/sal.rs new file mode 100644 index 0000000..8f867a8 --- /dev/null +++ b/cmd/sal.rs @@ -0,0 +1,302 @@ +//! System Worker Binary - Asynchronous actor for high-throughput concurrent processing + +use clap::Parser; +use log::{error, info, warn}; +use baobab_actor::async_actor_impl::AsyncWorker; +use baobab_actor::config::{ConfigError, WorkerConfig}; +use baobab_actor::engine::create_heromodels_engine; +use baobab_actor::actor_trait::{spawn_actor, WorkerConfig as TraitWorkerConfig}; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; +use tokio::signal; +use tokio::sync::mpsc; + +#[derive(Parser, Debug)] +#[command( + name = "system", + version = "0.1.0", + about = "System Worker - Asynchronous Worker with Concurrent Job Processing", + long_about = "An asynchronous actor for Hero framework that processes multiple jobs \ + concurrently with timeout support. Ideal for high-throughput scenarios \ + where jobs can be executed in parallel." +)] +struct Args { + /// Path to TOML configuration file + #[arg(short, long, help = "Path to TOML configuration file")] + config: PathBuf, + + /// Override actor ID from config + #[arg(long, help = "Override actor ID from configuration file")] + actor_id: Option, + + /// Override Redis URL from config + #[arg(long, help = "Override Redis URL from configuration file")] + redis_url: Option, + + /// Override database path from config + #[arg(long, help = "Override database path from configuration file")] + db_path: Option, + + /// Override default timeout in seconds + #[arg(long, help = "Override default job timeout in seconds")] + timeout: Option, + + /// Enable verbose logging (debug level) + #[arg(short, long, help = "Enable verbose logging")] + verbose: bool, + + /// Disable timestamps in log output + #[arg(long, help = "Remove timestamps from log output")] + no_timestamp: bool, + + /// Show actor statistics periodically + #[arg(long, help = "Show periodic actor statistics")] + show_stats: bool, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = Args::parse(); + + // Load configuration from TOML file + let mut config = match WorkerConfig::from_file(&args.config) { + Ok(config) => config, + Err(e) => { + eprintln!("Failed to load configuration from {:?}: {}", args.config, e); + std::process::exit(1); + } + }; + + // Validate that this is an async actor configuration + if !config.is_async() { + eprintln!("Error: System actor requires an async actor configuration"); + eprintln!("Expected: [actor_type] type = \"async\""); + eprintln!("Found: {:?}", config.actor_type); + std::process::exit(1); + } + + // Apply command line overrides + if let Some(actor_id) = args.actor_id { + config.actor_id = actor_id; + } + if let Some(redis_url) = args.redis_url { + config.redis_url = redis_url; + } + if let Some(db_path) = args.db_path { + config.db_path = db_path; + } + + // Override timeout if specified + if let Some(timeout_secs) = args.timeout { + if let baobab_actor::config::WorkerType::Async { ref mut default_timeout_seconds } = config.actor_type { + *default_timeout_seconds = timeout_secs; + } + } + + // Configure logging + setup_logging(&config, args.verbose, args.no_timestamp)?; + + info!("🚀 System Worker starting..."); + info!("Worker ID: {}", config.actor_id); + info!("Redis URL: {}", config.redis_url); + info!("Database Path: {}", config.db_path); + info!("Preserve Tasks: {}", config.preserve_tasks); + + if let Some(timeout) = config.get_default_timeout() { + info!("Default Timeout: {:?}", timeout); + } + + // Create Rhai engine + let engine = create_heromodels_engine(); + info!("✅ Rhai engine initialized"); + + // Create actor configuration for the trait-based interface + let mut actor_config = TraitWorkerConfig::new( + config.actor_id.clone(), + config.db_path.clone(), + config.redis_url.clone(), + config.preserve_tasks, + ); + + // Add timeout configuration for async actor + if let Some(timeout) = config.get_default_timeout() { + actor_config = actor_config.with_default_timeout(timeout); + } + + // Create async actor instance + let actor = Arc::new(AsyncWorker::default()); + info!("✅ Async actor instance created"); + + // Setup shutdown signal handling + let (shutdown_tx, shutdown_rx) = mpsc::channel(1); + + // Spawn shutdown signal handler + let shutdown_tx_clone = shutdown_tx.clone(); + tokio::spawn(async move { + if let Err(e) = signal::ctrl_c().await { + error!("Failed to listen for shutdown signal: {}", e); + return; + } + info!("🛑 Shutdown signal received"); + if let Err(e) = shutdown_tx_clone.send(()).await { + error!("Failed to send shutdown signal: {}", e); + } + }); + + // Spawn statistics reporter if requested + if args.show_stats { + let actor_stats = Arc::clone(&actor); + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(30)); + loop { + interval.tick().await; + let running_count = actor_stats.running_job_count().await; + if running_count > 0 { + info!("📊 Worker Stats: {} jobs currently running", running_count); + } else { + info!("📊 Worker Stats: No jobs currently running"); + } + } + }); + } + + // Spawn the actor + info!("🔄 Starting actor loop..."); + let actor_handle = spawn_actor(actor, engine, shutdown_rx); + + // Wait for the actor to complete + match actor_handle.await { + Ok(Ok(())) => { + info!("✅ System Worker shut down gracefully"); + } + Ok(Err(e)) => { + error!("❌ System Worker encountered an error: {}", e); + std::process::exit(1); + } + Err(e) => { + error!("❌ Failed to join actor task: {}", e); + std::process::exit(1); + } + } + + Ok(()) +} + +/// Setup logging based on configuration and command line arguments +fn setup_logging( + config: &WorkerConfig, + verbose: bool, + no_timestamp: bool, +) -> Result<(), Box> { + let mut builder = env_logger::Builder::new(); + + // Determine log level + let log_level = if verbose { + "debug" + } else { + &config.logging.level + }; + + // Set log level + builder.filter_level(match log_level.to_lowercase().as_str() { + "trace" => log::LevelFilter::Trace, + "debug" => log::LevelFilter::Debug, + "info" => log::LevelFilter::Info, + "warn" => log::LevelFilter::Warn, + "error" => log::LevelFilter::Error, + _ => { + warn!("Invalid log level: {}. Using 'info'", log_level); + log::LevelFilter::Info + } + }); + + // Configure timestamps + let show_timestamps = !no_timestamp && config.logging.timestamps; + if !show_timestamps { + builder.format_timestamp(None); + } + + builder.init(); + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Write; + use tempfile::NamedTempFile; + + #[test] + fn test_config_validation() { + let config_toml = r#" +actor_id = "test_system" +redis_url = "redis://localhost:6379" +db_path = "/tmp/test_db" + +[actor_type] +type = "async" +default_timeout_seconds = 600 + +[logging] +level = "info" +"#; + + let mut temp_file = NamedTempFile::new().unwrap(); + temp_file.write_all(config_toml.as_bytes()).unwrap(); + + let config = WorkerConfig::from_file(temp_file.path()).unwrap(); + assert!(!config.is_sync()); + assert!(config.is_async()); + assert_eq!(config.actor_id, "test_system"); + assert_eq!(config.get_default_timeout(), Some(Duration::from_secs(600))); + } + + #[test] + fn test_sync_config_rejection() { + let config_toml = r#" +actor_id = "test_system" +redis_url = "redis://localhost:6379" +db_path = "/tmp/test_db" + +[actor_type] +type = "sync" + +[logging] +level = "info" +"#; + + let mut temp_file = NamedTempFile::new().unwrap(); + temp_file.write_all(config_toml.as_bytes()).unwrap(); + + let config = WorkerConfig::from_file(temp_file.path()).unwrap(); + assert!(config.is_sync()); + assert!(!config.is_async()); + // This would be rejected in main() function + } + + #[test] + fn test_timeout_override() { + let config_toml = r#" +actor_id = "test_system" +redis_url = "redis://localhost:6379" +db_path = "/tmp/test_db" + +[actor_type] +type = "async" +default_timeout_seconds = 300 +"#; + + let mut temp_file = NamedTempFile::new().unwrap(); + temp_file.write_all(config_toml.as_bytes()).unwrap(); + + let mut config = WorkerConfig::from_file(temp_file.path()).unwrap(); + assert_eq!(config.get_default_timeout(), Some(Duration::from_secs(300))); + + // Test timeout override + if let baobab_actor::config::WorkerType::Async { ref mut default_timeout_seconds } = config.actor_type { + *default_timeout_seconds = 600; + } + assert_eq!(config.get_default_timeout(), Some(Duration::from_secs(600))); + } +} diff --git a/src/engine.rs b/src/engine.rs new file mode 100644 index 0000000..888ac0f --- /dev/null +++ b/src/engine.rs @@ -0,0 +1,187 @@ +//! Rhai scripting integration for the SAL library +//! +//! This module provides integration with the Rhai scripting language, +//! allowing SAL functions to be called from Rhai scripts. + +// OS module is now provided by sal-os package +// Platform module is now provided by sal-os package +// PostgreSQL module is now provided by sal-postgresclient package + +// Virt modules (buildah, nerdctl, rfs) are now provided by sal-virt package +// vault module is now provided by sal-vault package +// zinit module is now in sal-zinit-client package + +#[cfg(test)] +mod tests; + +// Re-export common Rhai types for convenience +pub use rhai::{Array, Dynamic, Engine, EvalAltResult, Map}; + +// Re-export specific functions from sal-os package +pub use sal_os::rhai::{ + delete, + // Download functions + download, + download_install, + // File system functions + exist, + file_size, + find_dir, + find_dirs, + find_file, + find_files, + mkdir, + register_os_module, + rsync, +}; + +// Re-export Redis client module registration function +pub use sal_redisclient::rhai::register_redisclient_module; + +// Re-export PostgreSQL client module registration function +pub use sal_postgresclient::rhai::register_postgresclient_module; + +pub use sal_process::rhai::{ + kill, + process_get, + process_list, + register_process_module, + // Run functions + // Process management functions + which, +}; + +// Re-export virt functions from sal-virt package +pub use sal_virt::rhai::nerdctl::{ + nerdctl_copy, + nerdctl_exec, + nerdctl_image_build, + nerdctl_image_commit, + nerdctl_image_pull, + nerdctl_image_push, + nerdctl_image_remove, + nerdctl_image_tag, + // Image functions + nerdctl_images, + nerdctl_list, + nerdctl_remove, + // Container functions + nerdctl_run, + nerdctl_run_with_name, + nerdctl_run_with_port, + nerdctl_stop, +}; +pub use sal_virt::rhai::{ + bah_new, register_bah_module, register_nerdctl_module, register_rfs_module, +}; + +// Re-export git module from sal-git package +pub use sal_git::rhai::register_git_module; +pub use sal_git::{GitRepo, GitTree}; + +// Re-export zinit module from sal-zinit-client package +pub use sal_zinit_client::rhai::register_zinit_module; + +// Re-export mycelium module +pub use sal_mycelium::rhai::register_mycelium_module; + +// Re-export text module +pub use sal_text::rhai::register_text_module; + +// Re-export net module +pub use sal_net::rhai::register_net_module; + +// Re-export crypto module - TEMPORARILY DISABLED +// TODO: Implement rhai module for Lee's vault implementation +// pub use sal_vault::rhai::register_crypto_module; + +// Re-export kubernetes module +pub use sal_kubernetes::rhai::register_kubernetes_module; +pub use sal_kubernetes::KubernetesManager; + +// Re-export service manager module +pub use sal_service_manager::rhai::register_service_manager_module; + +// Rename copy functions to avoid conflicts +pub use sal_os::rhai::copy as os_copy; + +/// Register all SAL modules with the Rhai engine +/// +/// # Arguments +/// +/// * `engine` - The Rhai engine to register the modules with +/// +/// # Example +/// +/// ```ignore +/// use rhai::Engine; +/// use sal::rhai; +/// +/// let mut engine = Engine::new(); +/// rhai::register(&mut engine); +/// +/// // Now you can use SAL functions in Rhai scripts +/// // You can evaluate Rhai scripts with SAL functions +/// let result = engine.eval::("exist('some_file.txt')").unwrap(); +/// ``` +pub fn register(engine: &mut Engine) -> Result<(), Box> { + // Register Core module functions + core::register_core_module(engine)?; + + // Register OS module functions + sal_os::rhai::register_os_module(engine)?; + + // Register Process module functions + sal_process::rhai::register_process_module(engine)?; + + // Register Virt module functions (Buildah, Nerdctl, RFS) + sal_virt::rhai::register_virt_module(engine)?; + + // Register Git module functions + sal_git::rhai::register_git_module(engine)?; + + // Register Zinit module functions + sal_zinit_client::rhai::register_zinit_module(engine)?; + + // Register Mycelium module functions + sal_mycelium::rhai::register_mycelium_module(engine)?; + + // Register Text module functions + sal_text::rhai::register_text_module(engine)?; + + // Register Net module functions + sal_net::rhai::register_net_module(engine)?; + + // RFS module functions are now registered as part of sal_virt above + + // Register Crypto module functions - TEMPORARILY DISABLED + // TODO: Implement rhai module for Lee's vault implementation + // register_crypto_module(engine)?; + + // Register Kubernetes module functions + register_kubernetes_module(engine)?; + + // Register Redis client module functions + sal_redisclient::rhai::register_redisclient_module(engine)?; + + // Register PostgreSQL client module functions + sal_postgresclient::rhai::register_postgresclient_module(engine)?; + + // Register Service Manager module functions + sal_service_manager::rhai::register_service_manager_module(engine)?; + + // Platform functions are now registered by sal-os package + + // Screen module functions are now part of sal-process package + + // Register utility functions + engine.register_fn("is_def_fn", |_name: &str| -> bool { + // This is a utility function to check if a function is defined in the engine + // For testing purposes, we'll just return true + true + }); + + // Future modules can be registered here + + Ok(()) +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..02ce0a2 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,448 @@ +//! # Asynchronous Worker Implementation +//! +//! This module provides an asynchronous actor implementation that can process +//! multiple jobs concurrently with timeout support. Each job is spawned as a +//! separate Tokio task, allowing for parallel execution and proper timeout handling. +//! +//! ## Features +//! +//! - **Concurrent Processing**: Multiple jobs can run simultaneously +//! - **Timeout Support**: Jobs that exceed their timeout are automatically cancelled +//! - **Resource Cleanup**: Proper cleanup of aborted/cancelled jobs +//! - **Non-blocking**: Worker continues processing new jobs while others are running +//! - **Scalable**: Can handle high job throughput with parallel execution +//! +//! ## Usage +//! +//! ```rust +//! use std::sync::Arc; +//! use std::time::Duration; +//! use baobab_actor::async_actor_impl::AsyncWorker; +//! use baobab_actor::actor_trait::{spawn_actor, WorkerConfig}; +//! use baobab_actor::engine::create_heromodels_engine; +//! use tokio::sync::mpsc; +//! +//! let config = WorkerConfig::new( +//! "async_actor_1".to_string(), +//! "/path/to/db".to_string(), +//! "redis://localhost:6379".to_string(), +//! false, // preserve_tasks +//! ).with_default_timeout(Duration::from_secs(300)); +//! +//! let actor = Arc::new(AsyncWorker::new()); +//! let engine = create_heromodels_engine(); +//! let (shutdown_tx, shutdown_rx) = mpsc::channel(1); +//! +//! let handle = spawn_actor(actor, config, engine, shutdown_rx); +//! +//! // Later, shutdown the actor +//! shutdown_tx.send(()).await.unwrap(); +//! handle.await.unwrap().unwrap(); +//! ``` + +use async_trait::async_trait; +use hero_job::{Job, JobStatus}; +use log::{debug, error, info, warn}; +use rhai::Engine; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::Mutex; +use tokio::task::JoinHandle; +use tokio::time::timeout; + +use crate::engine::eval_script; +use crate::actor_trait::{Worker, WorkerConfig}; +use crate::initialize_redis_connection; + +/// Represents a running job with its handle and metadata +#[derive(Debug)] +struct RunningJob { + job_id: String, + handle: JoinHandle<()>, + started_at: std::time::Instant, +} + +/// Builder for AsyncWorker +#[derive(Debug, Default)] +pub struct AsyncWorkerBuilder { + actor_id: Option, + db_path: Option, + redis_url: Option, + default_timeout: Option, +} + +impl AsyncWorkerBuilder { + pub fn new() -> Self { + Self::default() + } + + pub fn actor_id>(mut self, actor_id: S) -> Self { + self.actor_id = Some(actor_id.into()); + self + } + + pub fn db_path>(mut self, db_path: S) -> Self { + self.db_path = Some(db_path.into()); + self + } + + pub fn redis_url>(mut self, redis_url: S) -> Self { + self.redis_url = Some(redis_url.into()); + self + } + + pub fn default_timeout(mut self, timeout: Duration) -> Self { + self.default_timeout = Some(timeout); + self + } + + pub fn build(self) -> Result { + Ok(AsyncWorker { + actor_id: self.actor_id.ok_or("actor_id is required")?, + db_path: self.db_path.ok_or("db_path is required")?, + redis_url: self.redis_url.ok_or("redis_url is required")?, + default_timeout: self.default_timeout.unwrap_or(Duration::from_secs(300)), + running_jobs: Arc::new(Mutex::new(HashMap::new())), + }) + } +} + +/// Asynchronous actor that processes jobs concurrently +#[derive(Debug, Clone)] +pub struct AsyncWorker { + pub actor_id: String, + pub db_path: String, + pub redis_url: String, + pub default_timeout: Duration, + running_jobs: Arc>>, +} + +impl AsyncWorker { + /// Create a new AsyncWorkerBuilder + pub fn builder() -> AsyncWorkerBuilder { + AsyncWorkerBuilder::new() + } + + /// Add a running job to the tracking map + async fn add_running_job(&self, job_id: String, handle: JoinHandle<()>) { + let running_job = RunningJob { + job_id: job_id.clone(), + handle, + started_at: std::time::Instant::now(), + }; + + let mut jobs = self.running_jobs.lock().await; + jobs.insert(job_id.clone(), running_job); + debug!("Async Worker: Added running job '{}'. Total running: {}", + job_id, jobs.len()); + } + + /// Remove a completed job from the tracking map + async fn remove_running_job(&self, job_id: &str) { + let mut jobs = self.running_jobs.lock().await; + if let Some(job) = jobs.remove(job_id) { + let duration = job.started_at.elapsed(); + debug!("Async Worker: Removed completed job '{}' after {:?}. Remaining: {}", + job_id, duration, jobs.len()); + } + } + + /// Get the count of currently running jobs + pub async fn running_job_count(&self) -> usize { + let jobs = self.running_jobs.lock().await; + jobs.len() + } + + /// Cleanup any finished jobs from the running jobs map + async fn cleanup_finished_jobs(&self) { + let mut jobs = self.running_jobs.lock().await; + let mut to_remove = Vec::new(); + + for (job_id, running_job) in jobs.iter() { + if running_job.handle.is_finished() { + to_remove.push(job_id.clone()); + } + } + + for job_id in to_remove { + if let Some(job) = jobs.remove(&job_id) { + let duration = job.started_at.elapsed(); + debug!("Async Worker: Cleaned up finished job '{}' after {:?}", + job_id, duration); + } + } + } + + /// Execute a single job asynchronously with timeout support + async fn execute_job_with_timeout( + job: Job, + engine: Engine, + actor_id: String, + redis_url: String, + job_timeout: Duration, + ) { + let job_id = job.id.clone(); + info!("Async Worker '{}', Job {}: Starting execution with timeout {:?}", + actor_id, job_id, job_timeout); + + // Create a new Redis connection for this job + let mut redis_conn = match initialize_redis_connection(&actor_id, &redis_url).await { + Ok(conn) => conn, + Err(e) => { + error!("Async Worker '{}', Job {}: Failed to initialize Redis connection: {}", + actor_id, job_id, e); + return; + } + }; + + // Update job status to Started + if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Started).await { + error!("Async Worker '{}', Job {}: Failed to update status to Started: {}", + actor_id, job_id, e); + return; + } + + // Create the script execution task + let script_task = async { + // Execute the Rhai script + match eval_script(&engine, &job.script) { + Ok(result) => { + let result_str = format!("{:?}", result); + info!("Async Worker '{}', Job {}: Script executed successfully. Result: {}", + actor_id, job_id, result_str); + + // Update job with success result + if let Err(e) = Job::set_result(&mut redis_conn, &job_id, &result_str).await { + error!("Async Worker '{}', Job {}: Failed to set result: {}", + actor_id, job_id, e); + return; + } + + if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Finished).await { + error!("Async Worker '{}', Job {}: Failed to update status to Finished: {}", + actor_id, job_id, e); + } + } + Err(e) => { + let error_msg = format!("Script execution error: {}", e); + error!("Async Worker '{}', Job {}: {}", actor_id, job_id, error_msg); + + // Update job with error + if let Err(e) = Job::set_error(&mut redis_conn, &job_id, &error_msg).await { + error!("Async Worker '{}', Job {}: Failed to set error: {}", + actor_id, job_id, e); + return; + } + + if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Error).await { + error!("Async Worker '{}', Job {}: Failed to update status to Error: {}", + actor_id, job_id, e); + } + } + } + }; + + // Execute the script with timeout + match timeout(job_timeout, script_task).await { + Ok(()) => { + info!("Async Worker '{}', Job {}: Completed within timeout", actor_id, job_id); + } + Err(_) => { + warn!("Async Worker '{}', Job {}: Timed out after {:?}, marking as error", + actor_id, job_id, job_timeout); + + let timeout_msg = format!("Job timed out after {:?}", job_timeout); + if let Err(e) = Job::set_error(&mut redis_conn, &job_id, &timeout_msg).await { + error!("Async Worker '{}', Job {}: Failed to set timeout error: {}", + actor_id, job_id, e); + } + + if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Error).await { + error!("Async Worker '{}', Job {}: Failed to update status to Error after timeout: {}", + actor_id, job_id, e); + } + } + } + + info!("Async Worker '{}', Job {}: Job processing completed", actor_id, job_id); + } +} + +impl Default for AsyncWorker { + fn default() -> Self { + // Default AsyncWorker with placeholder values + // In practice, use the builder pattern instead + Self { + actor_id: "default_async_actor".to_string(), + db_path: "/tmp".to_string(), + redis_url: "redis://localhost:6379".to_string(), + default_timeout: Duration::from_secs(300), + running_jobs: Arc::new(Mutex::new(HashMap::new())), + } + } +} + +#[async_trait] +impl Worker for AsyncWorker { + async fn process_job( + &self, + job: Job, + engine: &Engine, // Reuse the stateless engine + _redis_conn: &mut redis::aio::MultiplexedConnection, + ) { + let job_id = job.id.clone(); + let actor_id = &self.actor_id.clone(); + + // Determine timeout (use job-specific timeout if available, otherwise default) + let job_timeout = if job.timeout.as_secs() > 0 { + job.timeout + } else { + self.default_timeout // Use actor's default timeout + }; + + info!("Async Worker '{}', Job {}: Spawning job execution task with timeout {:?}", + actor_id, job_id, job_timeout); + + // Clone necessary data for the spawned task + let job_id_clone = job_id.clone(); + let actor_id_clone = actor_id.clone(); + let actor_id_debug = actor_id.clone(); // Additional clone for debug statement + let job_id_debug = job_id.clone(); // Additional clone for debug statement + let redis_url_clone = self.redis_url.clone(); + let running_jobs_clone = Arc::clone(&self.running_jobs); + + // Spawn the job execution task + let job_handle = tokio::spawn(async move { + Self::execute_job_with_timeout( + job, + engine, + actor_id_clone, + redis_url_clone, + job_timeout, + ).await; + + // Remove this job from the running jobs map when it completes + let mut jobs = running_jobs_clone.lock().await; + if let Some(running_job) = jobs.remove(&job_id_clone) { + let duration = running_job.started_at.elapsed(); + debug!("Async Worker '{}': Removed completed job '{}' after {:?}", + actor_id_debug, job_id_debug, duration); + } + }); + + // Add the job to the running jobs map + self.add_running_job(job_id, job_handle).await; + + // Cleanup finished jobs periodically + self.cleanup_finished_jobs().await; + } + + fn actor_type(&self) -> &'static str { + "Async" + } + + fn actor_id(&self) -> &str { + &self.actor_id + } + + fn redis_url(&self) -> &str { + &self.redis_url + } +} + + +/// Convenience function to spawn an asynchronous actor using the trait interface +/// +/// This function provides a clean interface for the new async actor implementation +/// with timeout support. +pub fn spawn_async_actor( + actor_id: String, + db_path: String, + engine: rhai::Engine, + redis_url: String, + shutdown_rx: mpsc::Receiver<()>, + default_timeout: std::time::Duration, +) -> JoinHandle>> { + use std::sync::Arc; + + let actor = Arc::new( + AsyncActor::builder() + .actor_id(actor_id) + .db_path(db_path) + .redis_url(redis_url) + .default_timeout(default_timeout) + .build() + .expect("Failed to build AsyncActor") + ); + spawn_actor(actor, shutdown_rx) +} + + +#[cfg(test)] +mod tests { + use super::*; + use crate::engine::create_heromodels_engine; + use hero_job::ScriptType; + + #[tokio::test] + async fn test_async_actor_creation() { + let actor = AsyncWorker::new(); + assert_eq!(actor.actor_type(), "Async"); + assert_eq!(actor.running_job_count().await, 0); + } + + #[tokio::test] + async fn test_async_actor_default() { + let actor = AsyncWorker::default(); + assert_eq!(actor.actor_type(), "Async"); + } + + #[tokio::test] + async fn test_async_actor_job_tracking() { + let actor = AsyncWorker::new(); + + // Simulate adding a job + let handle = tokio::spawn(async { + tokio::time::sleep(Duration::from_millis(100)).await; + }); + + actor.add_running_job("job_1".to_string(), handle).await; + assert_eq!(actor.running_job_count().await, 1); + + // Wait for job to complete + tokio::time::sleep(Duration::from_millis(200)).await; + actor.cleanup_finished_jobs().await; + assert_eq!(actor.running_job_count().await, 0); + } + + #[tokio::test] + async fn test_async_actor_process_job_interface() { + let actor = AsyncWorker::new(); + let engine = create_heromodels_engine(); + + // Create a simple test job + let job = Job::new( + "test_caller".to_string(), + "test_context".to_string(), + r#"print("Hello from async actor test!"); 42"#.to_string(), + ScriptType::OSIS, + ); + + let config = WorkerConfig::new( + "test_async_actor".to_string(), + "/tmp".to_string(), + "redis://localhost:6379".to_string(), + false, + ).with_default_timeout(Duration::from_secs(60)); + + // Note: This test doesn't actually connect to Redis, it just tests the interface + // In a real test environment, you'd need a Redis instance or mock + + // The process_job method should be callable (interface test) + // actor.process_job(job, engine, &mut redis_conn, &config).await; + + // For now, just verify the actor was created successfully + assert_eq!(actor.actor_type(), "Async"); + } +}