mod engine; use async_trait::async_trait; use baobab_actor::execute_job_with_engine; use hero_job::{Job, JobStatus, ScriptType}; use hero_logger::{create_job_logger, create_job_logger_with_guard}; use log::{error, info}; use redis::AsyncCommands; use rhai::Engine; use std::sync::Arc; use tokio::sync::mpsc; use tokio::task::JoinHandle; use tracing::subscriber::with_default; use baobab_actor::{actor_trait::Actor, spawn_actor}; /// Constant actor ID for OSIS actor const OSIS: &str = "osis"; /// Builder for OSISActor #[derive(Debug)] pub struct OSISActorBuilder { engine: Option>, db_path: Option, redis_url: Option, } impl Default for OSISActorBuilder { fn default() -> Self { Self { engine: None, db_path: None, redis_url: Some("redis://localhost:6379".to_string()), } } } impl OSISActorBuilder { pub fn new() -> Self { Self::default() } pub fn engine(mut self, engine: Engine) -> Self { self.engine = Some(Arc::new(engine)); self } pub fn shared_engine(mut self, engine: Arc) -> Self { self.engine = Some(engine); self } pub fn db_path>(mut self, db_path: S) -> Self { self.db_path = Some(db_path.into()); self } pub fn redis_url>(mut self, redis_url: S) -> Self { self.redis_url = Some(redis_url.into()); self } pub fn build(self) -> Result { let engine = self .engine .unwrap_or_else(|| crate::engine::create_osis_engine()); Ok(OSISActor { engine, db_path: self.db_path.ok_or("db_path is required")?, redis_url: self .redis_url .unwrap_or("redis://localhost:6379".to_string()), }) } } /// OSIS actor that processes jobs in a blocking, synchronized manner #[derive(Debug, Clone)] pub struct OSISActor { pub engine: Arc, pub db_path: String, pub redis_url: String, } impl OSISActor { /// Create a new OSISActorBuilder pub fn builder() -> OSISActorBuilder { OSISActorBuilder::new() } } impl Default for OSISActor { fn default() -> Self { Self { engine: crate::engine::create_osis_engine(), db_path: "/tmp".to_string(), redis_url: "redis://localhost:6379".to_string(), } } } #[async_trait] impl Actor for OSISActor { async fn process_job(&self, job: Job, redis_conn: &mut redis::aio::MultiplexedConnection) { let job_id = &job.id; let _db_path = &self.db_path; // Debug: Log job details info!( "OSIS Actor '{}', Job {}: Processing job with context_id: {}, script length: {}", OSIS, job_id, job.context_id, job.script.len() ); // Create job-specific logger let (job_logger, guard) = match create_job_logger_with_guard("logs", "osis", job_id) { Ok((logger, guard)) => { info!( "OSIS Actor '{}', Job {}: Job logger created successfully", OSIS, job_id ); (logger, guard) }, Err(e) => { error!( "OSIS Actor '{}', Job {}: Failed to create job logger: {}", OSIS, job_id, e ); return; } }; info!( "OSIS Actor '{}', Job {}: Starting sequential processing", OSIS, job_id ); // Update job status to Started if let Err(e) = Job::update_status(redis_conn, job_id, JobStatus::Started).await { error!( "OSIS Actor '{}', Job {}: Failed to update status to Started: {}", OSIS, job_id, e ); return; } // Execute ALL job processing within logging context let job_result = with_default(job_logger, || { tracing::info!(target: "osis_actor", "Job {} started", job_id); // Move the Rhai script execution inside this scope // IMPORTANT: Create a new engine and configure Rhai logging for this job context let mut job_engine = Engine::new(); register_dsl_modules(&mut job_engine); // Configure Rhai logging integration for this engine instance hero_logger::rhai_integration::configure_rhai_logging(&mut job_engine, "osis_actor"); // Execute the script within the job logger context let script_result = tokio::task::block_in_place(|| { tokio::runtime::Handle::current().block_on(async { execute_job_with_engine(&mut job_engine, &job, &self.db_path).await }) }); tracing::info!(target: "osis_actor", "Job {} completed", job_id); script_result // Return the result }); // Handle the result outside the logging context match job_result { Ok(result) => { let result_str = format!("{:?}", result); info!( "OSIS Actor '{}', Job {}: Script executed successfully. Result: {}", OSIS, job_id, result_str ); // Update job with success result (stores in job hash output field) if let Err(e) = Job::set_result(redis_conn, job_id, &result_str).await { error!( "OSIS Actor '{}', Job {}: Failed to set result: {}", OSIS, job_id, e ); return; } // Also push result to result queue for retrieval let result_queue_key = format!("hero:job:{}:result", job_id); if let Err(e) = redis_conn .lpush::<_, _, ()>(&result_queue_key, &result_str) .await { error!( "OSIS Actor '{}', Job {}: Failed to push result to queue {}: {}", OSIS, job_id, result_queue_key, e ); } else { info!( "OSIS Actor '{}', Job {}: Result pushed to queue: {}", OSIS, job_id, result_queue_key ); } if let Err(e) = Job::update_status(redis_conn, job_id, JobStatus::Finished).await { error!( "OSIS Actor '{}', Job {}: Failed to update status to Finished: {}", OSIS, job_id, e ); } } Err(e) => { let error_msg = format!("Script execution error: {}", e); error!("OSIS Actor '{}', Job {}: {}", OSIS, job_id, error_msg); // Update job with error (stores in job hash error field) if let Err(e) = Job::set_error(redis_conn, job_id, &error_msg).await { error!( "OSIS Actor '{}', Job {}: Failed to set error: {}", OSIS, job_id, e ); } // Also push error to error queue for retrieval let error_queue_key = format!("hero:job:{}:error", job_id); if let Err(e) = redis_conn .lpush::<_, _, ()>(&error_queue_key, &error_msg) .await { error!( "OSIS Actor '{}', Job {}: Failed to push error to queue {}: {}", OSIS, job_id, error_queue_key, e ); } else { info!( "OSIS Actor '{}', Job {}: Error pushed to queue: {}", OSIS, job_id, error_queue_key ); } if let Err(e) = Job::update_status(redis_conn, job_id, JobStatus::Error).await { error!( "OSIS Actor '{}', Job {}: Failed to update status to Error: {}", OSIS, job_id, e ); } } } // Force flush logs before dropping guard std::thread::sleep(std::time::Duration::from_millis(100)); // Keep the guard alive until after processing drop(guard); info!( "OSIS Actor '{}', Job {}: Sequential processing completed", OSIS, job_id ); } fn actor_type(&self) -> &'static str { "OSIS" } fn actor_id(&self) -> &str { // Actor ID contains "osis" so the runtime derives ScriptType=OSIS and consumes the canonical type queue. "osis" } fn redis_url(&self) -> &str { &self.redis_url } } /// Convenience function to spawn an OSIS actor using the trait interface /// /// This function provides backward compatibility with the original actor API /// while using the new trait-based implementation. pub fn spawn_osis_actor( db_path: String, redis_url: String, shutdown_rx: mpsc::Receiver<()>, ) -> JoinHandle>> { let actor = Arc::new( OSISActor::builder() .db_path(db_path) .redis_url(redis_url) .build() .expect("Failed to build OSISActor"), ); spawn_actor(actor, shutdown_rx) } // Re-export engine functions for examples and external use pub use crate::engine::{create_osis_engine, register_dsl_modules}; #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn test_osis_actor_creation() { let actor = OSISActor::builder().build().unwrap(); assert_eq!(actor.actor_type(), "OSIS"); } #[tokio::test] async fn test_osis_actor_default() { let actor = OSISActor::default(); assert_eq!(actor.actor_type(), "OSIS"); } #[tokio::test] async fn test_osis_actor_process_job_interface() { let actor = OSISActor::default(); // Create a simple test job let _job = Job::new( "test_caller".to_string(), "test_context".to_string(), r#"print("Hello from sync actor test!"); 42"#.to_string(), ScriptType::OSIS, ); // Note: This test doesn't actually connect to Redis, it just tests the interface // In a real test environment, you'd need a Redis instance or mock // For now, just verify the actor was created successfully assert_eq!(actor.actor_type(), "OSIS"); } }