From e66bee09cfcd3461bbbcba94c4f0ecfc0d7e9c5f Mon Sep 17 00:00:00 2001 From: Maxime Van Hees Date: Thu, 7 Aug 2025 11:56:44 +0200 Subject: [PATCH] ... --- Cargo.toml | 5 +- src/lib.rs | 140 ++++++----------------------------------------------- 2 files changed, 19 insertions(+), 126 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3b01799..8bb61fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ path = "examples/actor.rs" [dependencies] redis = { version = "0.25.0", features = ["tokio-comp"] } -rhai = { version = "1.21.0", features = ["std", "sync", "decimal", "internals"] } +rhai = { version = "1.21.0", features = ["std", "sync", "decimal", "internals", "serde"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] } @@ -36,11 +36,12 @@ toml = "0.8" thiserror = "1.0" async-trait = "0.1" hero_job = { git = "https://git.ourworld.tf/herocode/baobab.git"} -baobab_actor = { git = "https://git.ourworld.tf/herocode/baobab.git"} +baobab_actor = { git = "https://git.ourworld.tf/herocode/baobab.git", branch = "logger"} heromodels = { git = "https://git.ourworld.tf/herocode/db.git" } heromodels_core = { git = "https://git.ourworld.tf/herocode/db.git" } heromodels-derive = { git = "https://git.ourworld.tf/herocode/db.git" } rhailib_dsl = { git = "https://git.ourworld.tf/herocode/rhailib.git" } +hero_logger = { git = "https://git.ourworld.tf/herocode/baobab.git", branch = "logger" } [features] default = ["calendar", "finance"] diff --git a/src/lib.rs b/src/lib.rs index db5d1c1..2eb187e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,45 +1,3 @@ -//! # Asynchronous Worker Implementation -//! -//! This module provides an asynchronous actor implementation that can process -//! multiple jobs concurrently with timeout support. Each job is spawned as a -//! separate Tokio task, allowing for parallel execution and proper timeout handling. -//! -//! ## Features -//! -//! - **Concurrent Processing**: Multiple jobs can run simultaneously -//! - **Timeout Support**: Jobs that exceed their timeout are automatically cancelled -//! - **Resource Cleanup**: Proper cleanup of aborted/cancelled jobs -//! - **Non-blocking**: Worker continues processing new jobs while others are running -//! - **Scalable**: Can handle high job throughput with parallel execution -//! -//! ## Usage -//! -//! ```rust -//! use std::sync::Arc; -//! use std::time::Duration; -//! use baobab_actor::async_actor_impl::AsyncWorker; -//! use baobab_actor::actor_trait::{spawn_actor, WorkerConfig}; -//! use baobab_actor::engine::create_heromodels_engine; -//! use tokio::sync::mpsc; -//! -//! let config = WorkerConfig::new( -//! "async_actor_1".to_string(), -//! "/path/to/db".to_string(), -//! "redis://localhost:6379".to_string(), -//! false, // preserve_tasks -//! ).with_default_timeout(Duration::from_secs(300)); -//! -//! let actor = Arc::new(AsyncWorker::new()); -//! let engine = create_heromodels_engine(); -//! let (shutdown_tx, shutdown_rx) = mpsc::channel(1); -//! -//! let handle = spawn_actor(actor, config, engine, shutdown_rx); -//! -//! // Later, shutdown the actor -//! shutdown_tx.send(()).await.unwrap(); -//! handle.await.unwrap().unwrap(); -//! ``` - use async_trait::async_trait; use hero_job::{Job, JobStatus}; use log::{debug, error, info, warn}; @@ -47,15 +5,11 @@ use rhai::Engine; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; -use tokio::sync::Mutex; +use tokio::sync::{mpsc, Mutex}; use tokio::task::JoinHandle; use tokio::time::timeout; - -use crate::engine::eval_script; -use crate::actor_trait::{Worker, WorkerConfig}; -use crate::initialize_redis_connection; - -use baobab_actor::{actor_trait::Actor, spawn_actor}; +use baobab_actor::{actor_trait::Actor, spawn_actor, initialize_redis_connection}; +mod engine; /// Represents a running job with its handle and metadata #[derive(Debug)] @@ -208,7 +162,7 @@ impl AsyncWorker { // Create the script execution task let script_task = async { // Execute the Rhai script - match eval_script(&engine, &job.script) { + match engine.eval::(&job.script) { Ok(result) => { let result_str = format!("{:?}", result); info!("Async Worker '{}', Job {}: Script executed successfully. Result: {}", @@ -286,11 +240,10 @@ impl Default for AsyncWorker { } #[async_trait] -impl Worker for AsyncWorker { +impl Actor for AsyncWorker { async fn process_job( &self, - job: Job, - engine: &Engine, // Reuse the stateless engine + job: hero_job::Job, _redis_conn: &mut redis::aio::MultiplexedConnection, ) { let job_id = job.id.clone(); @@ -316,6 +269,14 @@ impl Worker for AsyncWorker { // Spawn the job execution task let job_handle = tokio::spawn(async move { + // Create engine for this job - we need to get it from somewhere + // For now, let's assume we need to create a new engine instance + let mut engine = rhai::Engine::new(); + if let Err(e) = register_engine(&mut engine) { + error!("Failed to register engine modules: {}", e); + return; + } + Self::execute_job_with_timeout( job, engine, @@ -369,7 +330,7 @@ pub fn spawn_async_actor( use std::sync::Arc; let actor = Arc::new( - AsyncActor::builder() + AsyncWorker::builder() .actor_id(actor_id) .db_path(db_path) .redis_url(redis_url) @@ -378,73 +339,4 @@ pub fn spawn_async_actor( .expect("Failed to build AsyncActor") ); spawn_actor(actor, shutdown_rx) -} - - -#[cfg(test)] -mod tests { - use super::*; - use crate::engine::create_heromodels_engine; - use hero_job::ScriptType; - - #[tokio::test] - async fn test_async_actor_creation() { - let actor = AsyncWorker::new(); - assert_eq!(actor.actor_type(), "Async"); - assert_eq!(actor.running_job_count().await, 0); - } - - #[tokio::test] - async fn test_async_actor_default() { - let actor = AsyncWorker::default(); - assert_eq!(actor.actor_type(), "Async"); - } - - #[tokio::test] - async fn test_async_actor_job_tracking() { - let actor = AsyncWorker::new(); - - // Simulate adding a job - let handle = tokio::spawn(async { - tokio::time::sleep(Duration::from_millis(100)).await; - }); - - actor.add_running_job("job_1".to_string(), handle).await; - assert_eq!(actor.running_job_count().await, 1); - - // Wait for job to complete - tokio::time::sleep(Duration::from_millis(200)).await; - actor.cleanup_finished_jobs().await; - assert_eq!(actor.running_job_count().await, 0); - } - - #[tokio::test] - async fn test_async_actor_process_job_interface() { - let actor = AsyncWorker::new(); - let engine = create_heromodels_engine(); - - // Create a simple test job - let job = Job::new( - "test_caller".to_string(), - "test_context".to_string(), - r#"print("Hello from async actor test!"); 42"#.to_string(), - ScriptType::OSIS, - ); - - let config = WorkerConfig::new( - "test_async_actor".to_string(), - "/tmp".to_string(), - "redis://localhost:6379".to_string(), - false, - ).with_default_timeout(Duration::from_secs(60)); - - // Note: This test doesn't actually connect to Redis, it just tests the interface - // In a real test environment, you'd need a Redis instance or mock - - // The process_job method should be callable (interface test) - // actor.process_job(job, engine, &mut redis_conn, &config).await; - - // For now, just verify the actor was created successfully - assert_eq!(actor.actor_type(), "Async"); - } -} +} \ No newline at end of file