baobab/core/actor/src/lib.rs
2025-08-07 10:26:11 +02:00

265 lines
11 KiB
Rust

use hero_job::{Job, JobStatus};
use log::{debug, error, info};
use redis::AsyncCommands;
use rhai::{Dynamic, Engine};
use tokio::sync::mpsc; // For shutdown signal
use tokio::task::JoinHandle;
/// Actor trait abstraction for unified actor interface
pub mod actor_trait;
/// Terminal UI module for actor monitoring and job dispatch
pub mod terminal_ui;
const NAMESPACE_PREFIX: &str = "hero:job:";
const BLPOP_TIMEOUT_SECONDS: usize = 5;
/// Initialize Redis connection for the actor
pub(crate) async fn initialize_redis_connection(
actor_id: &str,
redis_url: &str,
) -> Result<redis::aio::MultiplexedConnection, Box<dyn std::error::Error + Send + Sync>> {
let redis_client = redis::Client::open(redis_url)
.map_err(|e| {
error!("Actor for Actor ID '{}': Failed to open Redis client: {}", actor_id, e);
e
})?;
let redis_conn = redis_client.get_multiplexed_async_connection().await
.map_err(|e| {
error!("Actor for Actor ID '{}': Failed to get Redis connection: {}", actor_id, e);
e
})?;
info!("Actor for Actor ID '{}' successfully connected to Redis.", actor_id);
Ok(redis_conn)
}
/// Load job from Redis using Job struct
pub(crate) async fn load_job_from_redis(
redis_conn: &mut redis::aio::MultiplexedConnection,
job_id: &str,
actor_id: &str,
) -> Result<Job, Box<dyn std::error::Error + Send + Sync>> {
debug!("Actor '{}', Job {}: Loading job from Redis", actor_id, job_id);
match Job::load_from_redis(redis_conn, job_id).await {
Ok(job) => {
debug!("Actor '{}', Job {}: Successfully loaded job", actor_id, job_id);
Ok(job)
}
Err(e) => {
error!("Actor '{}', Job {}: Failed to load job from Redis: {}", actor_id, job_id, e);
Err(Box::new(e))
}
}
}
/// Execute the Rhai script and update job status in Redis
async fn execute_script_and_update_status(
redis_conn: &mut redis::aio::MultiplexedConnection,
engine: &mut Engine,
job: &Job,
db_path: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut db_config = rhai::Map::new();
db_config.insert("DB_PATH".into(), db_path.to_string().into());
db_config.insert("CALLER_ID".into(), job.caller_id.clone().into());
db_config.insert("CONTEXT_ID".into(), job.context_id.clone().into());
engine.set_default_tag(Dynamic::from(db_config));
debug!("Actor for Context ID '{}': Evaluating script with Rhai engine.", job.context_id);
match engine.eval::<rhai::Dynamic>(&job.script) {
Ok(result) => {
let output_str = if result.is::<String>() {
result.into_string().unwrap()
} else {
result.to_string()
};
info!("Actor for Context ID '{}' job {} completed. Output: {}", job.context_id, job.id, output_str);
// Update job status to finished and set result
Job::update_status(redis_conn, &job.id, JobStatus::Finished).await
.map_err(|e| {
error!("Failed to update job {} status to finished: {}", job.id, e);
e
})?;
Job::set_result(redis_conn, &job.id, &output_str).await
.map_err(|e| {
error!("Failed to set job {} result: {}", job.id, e);
e
})?;
Ok(())
}
Err(e) => {
let error_str = format!("{:?}", *e);
error!("Actor for Context ID '{}' job {} script evaluation failed. Error: {}", job.context_id, job.id, error_str);
// Update job status to error and set error message
Job::update_status(redis_conn, &job.id, JobStatus::Error).await
.map_err(|e| {
error!("Failed to update job {} status to error: {}", job.id, e);
e
})?;
Job::set_error(redis_conn, &job.id, &error_str).await
.map_err(|e| {
error!("Failed to set job {} error: {}", job.id, e);
e
})?;
Ok(())
}
}
}
/// Execute a job with the given engine, setting proper job context
///
/// This function sets up the engine with job context (DB_PATH, CALLER_ID, CONTEXT_ID)
/// and evaluates the script. It returns the result or error without updating Redis.
/// This allows actors to handle Redis updates according to their own patterns.
pub async fn execute_job_with_engine(
engine: &mut Engine,
job: &Job,
db_path: &str,
) -> Result<Dynamic, Box<rhai::EvalAltResult>> {
// Set up job context in the engine
let mut db_config = rhai::Map::new();
db_config.insert("DB_PATH".into(), db_path.to_string().into());
db_config.insert("CALLER_ID".into(), job.caller_id.clone().into());
db_config.insert("CONTEXT_ID".into(), job.context_id.clone().into());
engine.set_default_tag(Dynamic::from(db_config));
debug!("Actor for Context ID '{}': Evaluating script with Rhai engine (job context set).", job.context_id);
// Execute the script with the configured engine
engine.eval::<Dynamic>(&job.script)
}
/// Clean up job from Redis if preserve_tasks is false
async fn cleanup_job(
redis_conn: &mut redis::aio::MultiplexedConnection,
job_id: &str,
context_id: &str,
preserve_tasks: bool,
) {
if !preserve_tasks {
if let Err(e) = Job::delete_from_redis(redis_conn, job_id).await {
error!("Actor for Context ID '{}', Job {}: Failed to delete job: {}", context_id, job_id, e);
} else {
debug!("Actor for Context ID '{}', Job {}: Cleaned up job.", context_id, job_id);
}
} else {
debug!("Actor for Context ID '{}', Job {}: Preserving job (preserve_tasks=true)", context_id, job_id);
}
}
/// Process a single job from the queue
async fn process_job(
redis_conn: &mut redis::aio::MultiplexedConnection,
job_id: &str,
actor_id: &str,
db_path: &str,
engine: &mut Engine,
preserve_tasks: bool,
) {
debug!("Actor '{}', Job {}: Processing started.", actor_id, job_id);
// Load job from Redis
match load_job_from_redis(redis_conn, job_id, actor_id).await {
Ok(job) => {
info!("Actor '{}' processing job_id: {}. Script: {:.50}...", job.context_id, job_id, job.script);
// Update status to started
debug!("Actor for Context ID '{}', Job {}: Attempting to update status to 'started'.", job.context_id, job_id);
if let Err(e) = Job::update_status(redis_conn, job_id, JobStatus::Started).await {
error!("Actor for Context ID '{}', Job {}: Failed to update status to 'started': {}", job.context_id, job_id, e);
} else {
debug!("Actor for Context ID '{}', Job {}: Status updated to 'started'.", job.context_id, job_id);
}
// Execute the script and update status
if let Err(e) = execute_script_and_update_status(redis_conn, engine, &job, db_path).await {
error!("Actor for Context ID '{}', Job {}: Script execution failed: {}", job.context_id, job_id, e);
// Ensure job status is set to error if execution failed
if let Err(status_err) = Job::update_status(redis_conn, job_id, JobStatus::Error).await {
error!("Actor for Context ID '{}', Job {}: Failed to update status to error after execution failure: {}", job.context_id, job_id, status_err);
}
}
// Clean up job if needed
cleanup_job(redis_conn, job_id, &job.context_id, preserve_tasks).await;
}
Err(e) => {
error!("Actor '{}', Job {}: Failed to load job: {}", actor_id, job_id, e);
// Clean up invalid job if needed
if !preserve_tasks {
if let Err(del_err) = Job::delete_from_redis(redis_conn, job_id).await {
error!("Actor '{}', Job {}: Failed to delete invalid job: {}", actor_id, job_id, del_err);
}
} else {
debug!("Actor '{}', Job {}: Preserving invalid job (preserve_tasks=true)", actor_id, job_id);
}
}
}
}
pub fn spawn_rhai_actor(
actor_id: String,
db_path: String,
mut engine: Engine,
redis_url: String,
mut shutdown_rx: mpsc::Receiver<()>,
preserve_tasks: bool,
) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
tokio::spawn(async move {
let queue_key = format!("{}{}", NAMESPACE_PREFIX, actor_id);
info!(
"Rhai Actor for Actor ID '{}' starting. Connecting to Redis at {}. Listening on queue: {}. Waiting for tasks or shutdown signal.",
actor_id, redis_url, queue_key
);
let mut redis_conn = initialize_redis_connection(&actor_id, &redis_url).await?;
loop {
let blpop_keys = vec![queue_key.clone()];
tokio::select! {
// Listen for shutdown signal
_ = shutdown_rx.recv() => {
info!("Actor for Actor ID '{}': Shutdown signal received. Terminating loop.", actor_id);
break;
}
// Listen for tasks from Redis
blpop_result = redis_conn.blpop(&blpop_keys, BLPOP_TIMEOUT_SECONDS as f64) => {
debug!("Actor for Actor ID '{}': Attempting BLPOP on queue: {}", actor_id, queue_key);
let response: Option<(String, String)> = match blpop_result {
Ok(resp) => resp,
Err(e) => {
error!("Actor '{}': Redis BLPOP error on queue {}: {}. Actor for this circle might stop.", actor_id, queue_key, e);
return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
}
};
if let Some((_queue_name_recv, job_id)) = response {
info!("Actor '{}' received job_id: {} from queue: {}", actor_id, job_id, _queue_name_recv);
process_job(&mut redis_conn, &job_id, &actor_id, &db_path, &mut engine, preserve_tasks).await;
} else {
debug!("Actor '{}': BLPOP timed out on queue {}. No new tasks. Checking for shutdown signal again.", actor_id, queue_key);
}
}
}
}
info!("Actor '{}' has shut down.", actor_id);
Ok(())
})
}
// Re-export the main trait-based interface for convenience
pub use actor_trait::{Actor, ActorConfig, spawn_actor};