This commit is contained in:
Maxime Van Hees 2025-08-07 11:56:44 +02:00
parent 0a118533c4
commit e66bee09cf
2 changed files with 19 additions and 126 deletions

View File

@ -23,7 +23,7 @@ path = "examples/actor.rs"
[dependencies]
redis = { version = "0.25.0", features = ["tokio-comp"] }
rhai = { version = "1.21.0", features = ["std", "sync", "decimal", "internals"] }
rhai = { version = "1.21.0", features = ["std", "sync", "decimal", "internals", "serde"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }
@ -36,11 +36,12 @@ toml = "0.8"
thiserror = "1.0"
async-trait = "0.1"
hero_job = { git = "https://git.ourworld.tf/herocode/baobab.git"}
baobab_actor = { git = "https://git.ourworld.tf/herocode/baobab.git"}
baobab_actor = { git = "https://git.ourworld.tf/herocode/baobab.git", branch = "logger"}
heromodels = { git = "https://git.ourworld.tf/herocode/db.git" }
heromodels_core = { git = "https://git.ourworld.tf/herocode/db.git" }
heromodels-derive = { git = "https://git.ourworld.tf/herocode/db.git" }
rhailib_dsl = { git = "https://git.ourworld.tf/herocode/rhailib.git" }
hero_logger = { git = "https://git.ourworld.tf/herocode/baobab.git", branch = "logger" }
[features]
default = ["calendar", "finance"]

View File

@ -1,45 +1,3 @@
//! # Asynchronous Worker Implementation
//!
//! This module provides an asynchronous actor implementation that can process
//! multiple jobs concurrently with timeout support. Each job is spawned as a
//! separate Tokio task, allowing for parallel execution and proper timeout handling.
//!
//! ## Features
//!
//! - **Concurrent Processing**: Multiple jobs can run simultaneously
//! - **Timeout Support**: Jobs that exceed their timeout are automatically cancelled
//! - **Resource Cleanup**: Proper cleanup of aborted/cancelled jobs
//! - **Non-blocking**: Worker continues processing new jobs while others are running
//! - **Scalable**: Can handle high job throughput with parallel execution
//!
//! ## Usage
//!
//! ```rust
//! use std::sync::Arc;
//! use std::time::Duration;
//! use baobab_actor::async_actor_impl::AsyncWorker;
//! use baobab_actor::actor_trait::{spawn_actor, WorkerConfig};
//! use baobab_actor::engine::create_heromodels_engine;
//! use tokio::sync::mpsc;
//!
//! let config = WorkerConfig::new(
//! "async_actor_1".to_string(),
//! "/path/to/db".to_string(),
//! "redis://localhost:6379".to_string(),
//! false, // preserve_tasks
//! ).with_default_timeout(Duration::from_secs(300));
//!
//! let actor = Arc::new(AsyncWorker::new());
//! let engine = create_heromodels_engine();
//! let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
//!
//! let handle = spawn_actor(actor, config, engine, shutdown_rx);
//!
//! // Later, shutdown the actor
//! shutdown_tx.send(()).await.unwrap();
//! handle.await.unwrap().unwrap();
//! ```
use async_trait::async_trait;
use hero_job::{Job, JobStatus};
use log::{debug, error, info, warn};
@ -47,15 +5,11 @@ use rhai::Engine;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::sync::{mpsc, Mutex};
use tokio::task::JoinHandle;
use tokio::time::timeout;
use crate::engine::eval_script;
use crate::actor_trait::{Worker, WorkerConfig};
use crate::initialize_redis_connection;
use baobab_actor::{actor_trait::Actor, spawn_actor};
use baobab_actor::{actor_trait::Actor, spawn_actor, initialize_redis_connection};
mod engine;
/// Represents a running job with its handle and metadata
#[derive(Debug)]
@ -208,7 +162,7 @@ impl AsyncWorker {
// Create the script execution task
let script_task = async {
// Execute the Rhai script
match eval_script(&engine, &job.script) {
match engine.eval::<rhai::Dynamic>(&job.script) {
Ok(result) => {
let result_str = format!("{:?}", result);
info!("Async Worker '{}', Job {}: Script executed successfully. Result: {}",
@ -286,11 +240,10 @@ impl Default for AsyncWorker {
}
#[async_trait]
impl Worker for AsyncWorker {
impl Actor for AsyncWorker {
async fn process_job(
&self,
job: Job,
engine: &Engine, // Reuse the stateless engine
job: hero_job::Job,
_redis_conn: &mut redis::aio::MultiplexedConnection,
) {
let job_id = job.id.clone();
@ -316,6 +269,14 @@ impl Worker for AsyncWorker {
// Spawn the job execution task
let job_handle = tokio::spawn(async move {
// Create engine for this job - we need to get it from somewhere
// For now, let's assume we need to create a new engine instance
let mut engine = rhai::Engine::new();
if let Err(e) = register_engine(&mut engine) {
error!("Failed to register engine modules: {}", e);
return;
}
Self::execute_job_with_timeout(
job,
engine,
@ -369,7 +330,7 @@ pub fn spawn_async_actor(
use std::sync::Arc;
let actor = Arc::new(
AsyncActor::builder()
AsyncWorker::builder()
.actor_id(actor_id)
.db_path(db_path)
.redis_url(redis_url)
@ -378,73 +339,4 @@ pub fn spawn_async_actor(
.expect("Failed to build AsyncActor")
);
spawn_actor(actor, shutdown_rx)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engine::create_heromodels_engine;
use hero_job::ScriptType;
#[tokio::test]
async fn test_async_actor_creation() {
let actor = AsyncWorker::new();
assert_eq!(actor.actor_type(), "Async");
assert_eq!(actor.running_job_count().await, 0);
}
#[tokio::test]
async fn test_async_actor_default() {
let actor = AsyncWorker::default();
assert_eq!(actor.actor_type(), "Async");
}
#[tokio::test]
async fn test_async_actor_job_tracking() {
let actor = AsyncWorker::new();
// Simulate adding a job
let handle = tokio::spawn(async {
tokio::time::sleep(Duration::from_millis(100)).await;
});
actor.add_running_job("job_1".to_string(), handle).await;
assert_eq!(actor.running_job_count().await, 1);
// Wait for job to complete
tokio::time::sleep(Duration::from_millis(200)).await;
actor.cleanup_finished_jobs().await;
assert_eq!(actor.running_job_count().await, 0);
}
#[tokio::test]
async fn test_async_actor_process_job_interface() {
let actor = AsyncWorker::new();
let engine = create_heromodels_engine();
// Create a simple test job
let job = Job::new(
"test_caller".to_string(),
"test_context".to_string(),
r#"print("Hello from async actor test!"); 42"#.to_string(),
ScriptType::OSIS,
);
let config = WorkerConfig::new(
"test_async_actor".to_string(),
"/tmp".to_string(),
"redis://localhost:6379".to_string(),
false,
).with_default_timeout(Duration::from_secs(60));
// Note: This test doesn't actually connect to Redis, it just tests the interface
// In a real test environment, you'd need a Redis instance or mock
// The process_job method should be callable (interface test)
// actor.process_job(job, engine, &mut redis_conn, &config).await;
// For now, just verify the actor was created successfully
assert_eq!(actor.actor_type(), "Async");
}
}
}