merge runners into single project

This commit is contained in:
Timur Gordon
2025-09-09 15:42:20 +02:00
parent 89a3abee63
commit 629d59f7db
20 changed files with 2033 additions and 894 deletions

270
src/async_runner.rs Normal file
View File

@@ -0,0 +1,270 @@
use hero_job::Job;
use log::{debug, error, info};
use rhai::Engine;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, Mutex};
use tokio::task::JoinHandle;
use crate::runner_trait::Runner;
/// Represents a running job with its handle and metadata
struct RunningJob {
job_id: String,
handle: JoinHandle<Result<String, Box<dyn std::error::Error + Send + Sync>>>,
started_at: std::time::Instant,
}
/// Builder for AsyncRunner
#[derive(Default)]
pub struct AsyncRunnerBuilder {
runner_id: Option<String>,
db_path: Option<String>,
redis_url: Option<String>,
default_timeout: Option<Duration>,
engine: Option<Arc<dyn Fn() -> Engine + Send + Sync>>,
}
impl AsyncRunnerBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn runner_id<S: Into<String>>(mut self, runner_id: S) -> Self {
self.runner_id = Some(runner_id.into());
self
}
pub fn db_path<S: Into<String>>(mut self, db_path: S) -> Self {
self.db_path = Some(db_path.into());
self
}
pub fn redis_url<S: Into<String>>(mut self, redis_url: S) -> Self {
self.redis_url = Some(redis_url.into());
self
}
pub fn default_timeout(mut self, timeout: Duration) -> Self {
self.default_timeout = Some(timeout);
self
}
pub fn engine_factory<F>(mut self, factory: F) -> Self
where
F: Fn() -> Engine + Send + Sync + 'static,
{
self.engine = Some(Arc::new(factory));
self
}
pub fn build(self) -> Result<AsyncRunner, String> {
Ok(AsyncRunner {
runner_id: self.runner_id.ok_or("runner_id is required")?,
db_path: self.db_path.ok_or("db_path is required")?,
redis_url: self.redis_url.ok_or("redis_url is required")?,
default_timeout: self.default_timeout.unwrap_or(Duration::from_secs(300)),
engine_factory: self.engine.ok_or("engine factory is required")?,
running_jobs: Arc::new(Mutex::new(HashMap::new())),
})
}
}
/// Asynchronous runner that processes jobs concurrently
pub struct AsyncRunner {
pub runner_id: String,
pub db_path: String,
pub redis_url: String,
pub default_timeout: Duration,
pub engine_factory: Arc<dyn Fn() -> Engine + Send + Sync>,
running_jobs: Arc<Mutex<HashMap<String, RunningJob>>>,
}
impl AsyncRunner {
/// Create a new AsyncRunnerBuilder
pub fn builder() -> AsyncRunnerBuilder {
AsyncRunnerBuilder::new()
}
/// Add a running job to the tracking map
async fn add_running_job(&self, job_id: String, handle: JoinHandle<Result<String, Box<dyn std::error::Error + Send + Sync>>>) {
let running_job = RunningJob {
job_id: job_id.clone(),
handle,
started_at: std::time::Instant::now(),
};
let mut jobs = self.running_jobs.lock().await;
jobs.insert(job_id.clone(), running_job);
debug!("Async Runner: Added running job '{}'. Total running: {}",
job_id, jobs.len());
}
/// Remove a completed job from the tracking map
async fn remove_running_job(&self, job_id: &str) {
let mut jobs = self.running_jobs.lock().await;
if let Some(job) = jobs.remove(job_id) {
let duration = job.started_at.elapsed();
debug!("Async Runner: Removed completed job '{}' after {:?}. Remaining: {}",
job_id, duration, jobs.len());
}
}
/// Get the count of currently running jobs
pub async fn running_job_count(&self) -> usize {
let jobs = self.running_jobs.lock().await;
jobs.len()
}
/// Cleanup any finished jobs from the running jobs map
async fn cleanup_finished_jobs(&self) {
let mut jobs = self.running_jobs.lock().await;
let mut to_remove = Vec::new();
for (job_id, running_job) in jobs.iter() {
if running_job.handle.is_finished() {
to_remove.push(job_id.clone());
}
}
for job_id in to_remove {
if let Some(job) = jobs.remove(&job_id) {
let duration = job.started_at.elapsed();
debug!("Async Runner: Cleaned up finished job '{}' after {:?}",
job_id, duration);
}
}
}
}
impl Runner for AsyncRunner {
fn process_job(&self, job: Job) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let job_id = job.id.clone();
let runner_id = &self.runner_id;
// Determine timeout (use job-specific timeout if available, otherwise default)
let job_timeout = if job.timeout > 0 {
Duration::from_secs(job.timeout)
} else {
self.default_timeout
};
info!("Async Runner '{}', Job {}: Spawning job execution task with timeout {:?}",
runner_id, job_id, job_timeout);
// Clone necessary data for the spawned task
let job_id_clone = job_id.clone();
let runner_id_clone = runner_id.clone();
let runner_id_debug = runner_id.clone();
let job_id_debug = job_id.clone();
let _redis_url_clone = self.redis_url.clone();
let running_jobs_clone = Arc::clone(&self.running_jobs);
let engine_factory = Arc::clone(&self.engine_factory);
let db_path_clone = self.db_path.clone();
// Spawn the job execution task
let job_handle = tokio::spawn(async move {
// Create a new engine instance for this job
let mut engine = engine_factory();
let mut db_config = rhai::Map::new();
db_config.insert("DB_PATH".into(), db_path_clone.into());
db_config.insert("CALLER_ID".into(), job.caller_id.clone().into());
db_config.insert("CONTEXT_ID".into(), job.context_id.clone().into());
engine.set_default_tag(rhai::Dynamic::from(db_config));
// Execute the Rhai script
let result = match engine.eval::<rhai::Dynamic>(&job.payload) {
Ok(result) => {
let result_str = if result.is::<String>() {
result.into_string().unwrap()
} else {
result.to_string()
};
info!("Async Runner '{}', Job {}: Script executed successfully. Result: {}",
runner_id_clone, job_id_clone, result_str);
Ok(result_str)
}
Err(e) => {
let error_msg = format!("Script execution error: {}", e);
error!("Async Runner '{}', Job {}: {}", runner_id_clone, job_id_clone, error_msg);
Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
}
};
// Remove this job from the running jobs map when it completes
let mut jobs = running_jobs_clone.lock().await;
if let Some(running_job) = jobs.remove(&job_id_clone) {
let duration = running_job.started_at.elapsed();
debug!("Async Runner '{}': Removed completed job '{}' after {:?}",
runner_id_debug, job_id_debug, duration);
}
result
});
// Add the job to the running jobs map
let running_job = RunningJob {
job_id: job_id.clone(),
handle: job_handle,
started_at: std::time::Instant::now(),
};
let running_jobs_clone = Arc::clone(&self.running_jobs);
let job_id_for_map = job_id.clone();
tokio::spawn(async move {
let mut jobs = running_jobs_clone.lock().await;
jobs.insert(job_id_for_map, running_job);
debug!("Async Runner: Added running job '{}'. Total running: {}",
job_id, jobs.len());
});
// For async runners, we return immediately with a placeholder
// The actual result will be handled by the spawned task
Ok("Job spawned for async processing".to_string())
}
fn runner_type(&self) -> &'static str {
"Async"
}
fn runner_id(&self) -> &str {
&self.runner_id
}
fn redis_url(&self) -> &str {
&self.redis_url
}
}
/// Convenience function to spawn an asynchronous runner using the trait interface
///
/// This function provides a clean interface for the new async runner implementation
/// with timeout support.
pub fn spawn_async_runner<F>(
runner_id: String,
db_path: String,
redis_url: String,
shutdown_rx: mpsc::Receiver<()>,
default_timeout: std::time::Duration,
engine_factory: F,
) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>>
where
F: Fn() -> Engine + Send + Sync + 'static,
{
use std::sync::Arc;
let runner = Arc::new(
AsyncRunner::builder()
.runner_id(runner_id)
.db_path(db_path)
.redis_url(redis_url)
.default_timeout(default_timeout)
.engine_factory(engine_factory)
.build()
.expect("Failed to build AsyncRunner")
);
crate::runner_trait::spawn_runner(runner, shutdown_rx)
}

View File

@@ -0,0 +1,118 @@
# OSIS Runner
The OSIS (Object Storage Information System) Runner is a synchronous job processing engine that executes Rhai scripts with access to OSIS-specific operations and data management capabilities.
## Features
- **Synchronous Processing**: Processes jobs sequentially, ensuring deterministic execution order
- **Redis Integration**: Uses Redis for job queue management and coordination
- **OSIS Operations**: Access to object storage, metadata management, and information system operations
- **Task Persistence**: Optional task preservation for debugging and audit purposes
- **Graceful Shutdown**: Responds to SIGINT (Ctrl+C) for clean termination
- **SQLite Database**: Local database storage for job state and metadata
## Usage
```bash
cargo run --bin runner_osis -- <RUNNER_ID> [OPTIONS]
```
### Arguments
- `<RUNNER_ID>`: Unique identifier for this runner instance (required, positional)
### Options
- `-d, --db-path <PATH>`: SQLite database file path (default: `/tmp/osis.db`)
- `-r, --redis-url <URL>`: Redis connection URL (default: `redis://localhost:6379`)
- `-p, --preserve-tasks`: Preserve completed tasks in database for debugging (default: `false`)
### Examples
```bash
# Basic usage with default settings
cargo run --bin runner_osis -- myrunner
# Custom Redis URL and database path
cargo run --bin runner_osis -- osis-prod -r redis://prod-redis:6379 -d /var/lib/osis.db
# Enable task preservation for debugging
cargo run --bin runner_osis -- debug-runner -p
```
## Available OSIS Modules
The OSIS runner provides access to specialized modules for information system operations:
- **Object Storage**: File and object management operations
- **Metadata Management**: Information indexing and retrieval
- **Data Processing**: Content analysis and transformation
- **System Integration**: Interface with external information systems
- **Audit and Logging**: Comprehensive operation tracking
## Architecture
The OSIS runner uses a synchronous architecture that:
1. Connects to Redis for job queue management
2. Initializes SQLite database for local state management
3. Creates a Rhai engine with OSIS modules registered
4. Processes jobs sequentially in FIFO order
5. Optionally preserves task history for debugging
6. Handles graceful shutdown on SIGINT
## Synchronous vs Asynchronous
Unlike the SAL runner, the OSIS runner processes jobs synchronously:
- **Sequential Processing**: Jobs are processed one at a time
- **Deterministic Order**: Ensures predictable execution sequence
- **Resource Safety**: Prevents resource conflicts in data operations
- **Debugging Friendly**: Easier to trace and debug job execution
## Database Schema
The runner maintains a SQLite database with the following structure:
- **Jobs Table**: Active and completed job records
- **Task History**: Optional preservation of task execution details
- **Metadata**: Runner configuration and state information
## Error Handling
The runner provides detailed error messages for:
- Redis connection failures
- Database initialization and access problems
- Script execution errors
- Resource cleanup issues
- Shutdown sequence problems
## Logging
Set the `RUST_LOG` environment variable to control logging levels:
```bash
RUST_LOG=info cargo run --bin runner_osis -- myrunner
```
Available log levels: `error`, `warn`, `info`, `debug`, `trace`
## Task Preservation
When `--preserve-tasks` is enabled:
- Completed tasks remain in the database
- Useful for debugging and audit trails
- May require periodic cleanup for long-running instances
- Increases database size over time
## Use Cases
The OSIS runner is ideal for:
- Data processing pipelines requiring strict ordering
- Information system operations with dependencies
- Batch processing jobs that must complete sequentially
- Debugging scenarios where task history is important
- Operations requiring transactional consistency

View File

@@ -0,0 +1,123 @@
use rhai::Engine;
use rhailib_dsl;
use std::sync::{Arc, OnceLock};
/// Engine factory for creating and sharing Rhai engines with DSL modules.
pub struct EngineFactory {
engine: Arc<Engine>,
}
impl EngineFactory {
/// Create a new engine factory with a configured Rhai engine.
pub fn new() -> Self {
let mut engine = Engine::new();
register_dsl_modules(&mut engine);
// Logger
hero_logger::rhai_integration::configure_rhai_logging(&mut engine, "osis_runner");
Self {
engine: Arc::new(engine),
}
}
/// Get a shared reference to the engine.
pub fn get_engine(&self) -> Arc<Engine> {
Arc::clone(&self.engine)
}
/// Get the global singleton engine factory.
pub fn global() -> &'static EngineFactory {
static FACTORY: OnceLock<EngineFactory> = OnceLock::new();
FACTORY.get_or_init(|| EngineFactory::new())
}
}
/// Register basic object functions directly in the engine.
/// This provides object functionality without relying on the problematic rhailib_dsl object module.
fn register_object_functions(engine: &mut Engine) {
use heromodels::models::object::Object;
// Register the Object type
engine.register_type_with_name::<Object>("Object");
// Register constructor function
engine.register_fn("new_object", || Object::new());
// Register setter functions
engine.register_fn("object_title", |obj: &mut Object, title: String| {
obj.title = title;
obj.clone()
});
engine.register_fn(
"object_description",
|obj: &mut Object, description: String| {
obj.description = description;
obj.clone()
},
);
// Register getter functions
engine.register_fn("get_object_id", |obj: &mut Object| obj.id() as i64);
engine.register_fn("get_object_title", |obj: &mut Object| obj.title.clone());
engine.register_fn("get_object_description", |obj: &mut Object| {
obj.description.clone()
});
}
/// Registers all DSL modules with the provided Rhai engine.
///
/// This function is the main entry point for integrating the rhailib DSL with a Rhai engine.
/// It registers all business domain modules, making their functions available to Rhai scripts.
///
/// # Arguments
///
/// * `engine` - A mutable reference to the Rhai engine to register modules with
///
/// # Registered Modules
///
/// This function registers the following domain modules:
/// - Access control functions
/// - Business operation functions (companies, products, sales, shareholders)
/// - Calendar and scheduling functions
/// - Circle and community management functions
/// - Company management functions
/// - Contact management functions
/// - Core utility functions
/// - Financial operation functions (accounts, assets, marketplace)
/// - Workflow management functions (flows, steps, signatures)
/// - Library and content management functions
/// - Generic object manipulation functions (custom implementation)
pub fn register_dsl_modules(engine: &mut Engine) {
rhailib_dsl::access::register_access_rhai_module(engine);
rhailib_dsl::biz::register_biz_rhai_module(engine);
rhailib_dsl::calendar::register_calendar_rhai_module(engine);
rhailib_dsl::circle::register_circle_rhai_module(engine);
rhailib_dsl::company::register_company_rhai_module(engine);
rhailib_dsl::contact::register_contact_rhai_module(engine);
rhailib_dsl::core::register_core_rhai_module(engine);
rhailib_dsl::finance::register_finance_rhai_modules(engine);
// rhailib_dsl::flow::register_flow_rhai_modules(engine);
rhailib_dsl::library::register_library_rhai_module(engine);
// Skip problematic object module for now - can be implemented separately if needed
// rhailib_dsl::object::register_object_fns(engine);
rhailib_dsl::payment::register_payment_rhai_module(engine);
// Register basic object functionality directly
register_object_functions(engine);
println!("Rhailib Domain Specific Language modules registered successfully.");
}
/// Create a new osis engine instance.
pub fn create_osis_engine() -> Engine {
let mut engine = Engine::new();
register_dsl_modules(&mut engine);
hero_logger::rhai_integration::configure_rhai_logging(&mut engine, "osis_runner");
engine
}
/// Create a shared osis engine using the factory.
pub fn create_shared_osis_engine() -> Arc<Engine> {
EngineFactory::global().get_engine()
}

View File

@@ -0,0 +1,79 @@
use actor_system::spawn_sync_runner;
use clap::Parser;
use log::{error, info};
use tokio::sync::mpsc;
mod engine;
use engine::create_osis_engine;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Runner ID
runner_id: String,
/// Database path
#[arg(short, long, default_value = "/tmp/osis.db")]
db_path: String,
/// Redis URL
#[arg(short = 'r', long, default_value = "redis://localhost:6379")]
redis_url: String,
/// Preserve tasks after completion
#[arg(short, long, default_value_t = false)]
preserve_tasks: bool,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Initialize logging
env_logger::init();
let args = Args::parse();
info!("Starting OSIS Sync Runner with ID: {}", args.runner_id);
info!("Database path: {}", args.db_path);
info!("Redis URL: {}", args.redis_url);
info!("Preserve tasks: {}", args.preserve_tasks);
// Create shutdown channel
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
// Setup signal handling for graceful shutdown
let shutdown_tx_clone = shutdown_tx.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.expect("Failed to listen for ctrl+c");
info!("Received Ctrl+C, initiating shutdown...");
let _ = shutdown_tx_clone.send(()).await;
});
// Spawn the sync runner with engine factory
let runner_handle = spawn_sync_runner(
args.runner_id.clone(),
args.db_path,
args.redis_url,
shutdown_rx,
args.preserve_tasks,
create_osis_engine,
);
info!("OSIS Sync Runner '{}' started successfully", args.runner_id);
// Wait for the runner to complete
match runner_handle.await {
Ok(Ok(())) => {
info!("OSIS Sync Runner '{}' shut down successfully", args.runner_id);
}
Ok(Err(e)) => {
error!("OSIS Sync Runner '{}' encountered an error: {}", args.runner_id, e);
return Err(e);
}
Err(e) => {
error!("Failed to join OSIS Sync Runner '{}' task: {}", args.runner_id, e);
return Err(Box::new(e));
}
}
Ok(())
}

View File

@@ -0,0 +1,87 @@
# SAL Runner
The SAL (System Abstraction Layer) Runner is an asynchronous job processing engine that executes Rhai scripts with access to system-level operations and infrastructure management capabilities.
## Features
- **Asynchronous Processing**: Handles multiple jobs concurrently with configurable timeouts
- **Redis Integration**: Uses Redis for job queue management and coordination
- **System Operations**: Full access to SAL modules including OS, networking, containers, and cloud services
- **Graceful Shutdown**: Responds to SIGINT (Ctrl+C) for clean termination
- **Comprehensive Logging**: Detailed logging for monitoring and debugging
## Usage
```bash
cargo run --bin runner_sal -- <RUNNER_ID> [OPTIONS]
```
### Arguments
- `<RUNNER_ID>`: Unique identifier for this runner instance (required, positional)
### Options
- `-d, --db-path <PATH>`: Database file path (default: `/tmp/sal.db`)
- `-r, --redis-url <URL>`: Redis connection URL (default: `redis://localhost:6379`)
- `-t, --timeout <SECONDS>`: Default job timeout in seconds (default: `300`)
### Examples
```bash
# Basic usage with default settings
cargo run --bin runner_sal -- myrunner
# Custom Redis URL and database path
cargo run --bin runner_sal -- production-runner -r redis://prod-redis:6379 -d /var/lib/sal.db
# Custom timeout for long-running jobs
cargo run --bin runner_sal -- batch-runner -t 3600
```
## Available SAL Modules
The SAL runner provides access to the following system modules through Rhai scripts:
- **OS Operations**: File system, process management, system information
- **Redis Client**: Redis database operations and caching
- **PostgreSQL Client**: Database connectivity and queries
- **Process Management**: System process control and monitoring
- **Virtualization**: Container and VM management
- **Git Operations**: Version control system integration
- **Zinit Client**: Service management and initialization
- **Mycelium**: Networking and mesh connectivity
- **Text Processing**: String manipulation and text utilities
- **Network Operations**: HTTP requests, network utilities
- **Kubernetes**: Container orchestration and cluster management
- **Hetzner Cloud**: Cloud infrastructure management
## Architecture
The SAL runner uses an asynchronous architecture that:
1. Connects to Redis for job queue management
2. Creates a Rhai engine with all SAL modules registered
3. Processes jobs concurrently with configurable timeouts
4. Handles graceful shutdown on SIGINT
5. Provides comprehensive error handling and logging
## Error Handling
The runner provides detailed error messages for common issues:
- Redis connection failures
- Database access problems
- Script execution errors
- Timeout handling
- Resource cleanup on shutdown
## Logging
Set the `RUST_LOG` environment variable to control logging levels:
```bash
RUST_LOG=debug cargo run --bin runner_sal -- myrunner
```
Available log levels: `error`, `warn`, `info`, `debug`, `trace`

View File

@@ -71,7 +71,7 @@ pub use sal_kubernetes::KubernetesManager;
pub use sal_os::rhai::copy as os_copy;
pub use sal_hetzner::rhai::register_hetzner_module;
/// Engine factory for creating and sharing Rhai engines.
/// Engine factory for creating and sharing Rhai engines with SAL modules.
pub struct EngineFactory {
engine: Arc<Engine>,
}
@@ -82,7 +82,7 @@ impl EngineFactory {
let mut engine = Engine::new();
register_sal_modules(&mut engine);
// Logger
hero_logger::rhai_integration::configure_rhai_logging(&mut engine, "osis_actor");
hero_logger::rhai_integration::configure_rhai_logging(&mut engine, "sal_runner");
Self {
engine: Arc::new(engine),
@@ -118,15 +118,15 @@ pub fn register_sal_modules(engine: &mut Engine) {
println!("SAL modules registered successfully.");
}
/// Create a shared heromodels engine using the factory.
pub fn create_system_engine() -> Arc<Engine> {
EngineFactory::global().get_engine()
/// Create a new SAL engine instance.
pub fn create_sal_engine() -> Engine {
let mut engine = Engine::new();
register_sal_modules(&mut engine);
hero_logger::rhai_integration::configure_rhai_logging(&mut engine, "sal_runner");
engine
}
/// Evaluate a Rhai script string.
pub fn eval_script(
engine: &Engine,
script: &str,
) -> Result<rhai::Dynamic, Box<rhai::EvalAltResult>> {
engine.eval(script)
}
/// Create a shared system engine using the factory.
pub fn create_shared_sal_engine() -> Arc<Engine> {
EngineFactory::global().get_engine()
}

View File

@@ -0,0 +1,81 @@
use actor_system::{spawn_async_runner, AsyncRunner};
use clap::Parser;
use log::{error, info};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
mod engine;
use engine::create_sal_engine;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Runner ID
runner_id: String,
/// Database path
#[arg(short, long, default_value = "/tmp/sal.db")]
db_path: String,
/// Redis URL
#[arg(short = 'r', long, default_value = "redis://localhost:6379")]
redis_url: String,
/// Default timeout for jobs in seconds
#[arg(short, long, default_value_t = 300)]
timeout: u64,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Initialize logging
env_logger::init();
let args = Args::parse();
info!("Starting SAL Async Runner with ID: {}", args.runner_id);
info!("Database path: {}", args.db_path);
info!("Redis URL: {}", args.redis_url);
info!("Default timeout: {} seconds", args.timeout);
// Create shutdown channel
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
// Setup signal handling for graceful shutdown
let shutdown_tx_clone = shutdown_tx.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.expect("Failed to listen for ctrl+c");
info!("Received Ctrl+C, initiating shutdown...");
let _ = shutdown_tx_clone.send(()).await;
});
// Spawn the async runner with engine factory
let runner_handle = spawn_async_runner(
args.runner_id.clone(),
args.db_path,
args.redis_url,
shutdown_rx,
Duration::from_secs(args.timeout),
create_sal_engine,
);
info!("SAL Async Runner '{}' started successfully", args.runner_id);
// Wait for the runner to complete
match runner_handle.await {
Ok(Ok(())) => {
info!("SAL Async Runner '{}' shut down successfully", args.runner_id);
}
Ok(Err(e)) => {
error!("SAL Async Runner '{}' encountered an error: {}", args.runner_id, e);
return Err(e);
}
Err(e) => {
error!("Failed to join SAL Async Runner '{}' task: {}", args.runner_id, e);
return Err(Box::new(e));
}
}
Ok(())
}

19
src/engine/mod.rs Normal file
View File

@@ -0,0 +1,19 @@
/// Engine module for Rhai script execution
///
/// This module provides two different engine configurations:
/// - `system`: SAL modules for system operations (async worker)
/// - `osis`: DSL modules for business operations (sync worker)
pub mod system;
pub mod osis;
// Re-export common Rhai types for convenience
pub use rhai::{Array, Dynamic, Engine, EvalAltResult, Map};
/// Evaluate a Rhai script string with any engine
pub fn eval_script(
engine: &Engine,
script: &str,
) -> Result<rhai::Dynamic, Box<rhai::EvalAltResult>> {
engine.eval(script)
}

143
src/engine/osis.rs Normal file
View File

@@ -0,0 +1,143 @@
// use heromodels::models::heroledger::rhai::register_heroledger_rhai_modules;
use rhai::Engine;
use rhailib_dsl;
use std::sync::{Arc, OnceLock};
/// Engine factory for creating and sharing Rhai engines with DSL modules.
pub struct EngineFactory {
engine: Arc<Engine>,
}
impl EngineFactory {
/// Create a new engine factory with a configured Rhai engine.
pub fn new() -> Self {
let mut engine = Engine::new();
register_dsl_modules(&mut engine);
// Logger
hero_logger::rhai_integration::configure_rhai_logging(&mut engine, "osis_actor");
Self {
engine: Arc::new(engine),
}
}
/// Get a shared reference to the engine.
pub fn get_engine(&self) -> Arc<Engine> {
Arc::clone(&self.engine)
}
/// Get the global singleton engine factory.
pub fn global() -> &'static EngineFactory {
static FACTORY: OnceLock<EngineFactory> = OnceLock::new();
FACTORY.get_or_init(|| EngineFactory::new())
}
}
/// Register basic object functions directly in the engine.
/// This provides object functionality without relying on the problematic rhailib_dsl object module.
fn register_object_functions(engine: &mut Engine) {
use heromodels::models::object::Object;
// Register the Object type
engine.register_type_with_name::<Object>("Object");
// Register constructor function
engine.register_fn("new_object", || Object::new());
// Register setter functions
engine.register_fn("object_title", |obj: &mut Object, title: String| {
obj.title = title;
obj.clone()
});
engine.register_fn(
"object_description",
|obj: &mut Object, description: String| {
obj.description = description;
obj.clone()
},
);
// Register getter functions
engine.register_fn("get_object_id", |obj: &mut Object| obj.id() as i64);
engine.register_fn("get_object_title", |obj: &mut Object| obj.title.clone());
engine.register_fn("get_object_description", |obj: &mut Object| {
obj.description.clone()
});
}
/// Registers all DSL modules with the provided Rhai engine.
///
/// This function is the main entry point for integrating the rhailib DSL with a Rhai engine.
/// It registers all business domain modules, making their functions available to Rhai scripts.
///
/// # Arguments
///
/// * `engine` - A mutable reference to the Rhai engine to register modules with
///
/// # Example
///
/// ```rust
/// use rhai::Engine;
/// use actor_system::engine::osis::register_dsl_modules;
///
/// let mut engine = Engine::new();
/// register_dsl_modules(&mut engine);
///
/// // Engine now has access to all DSL functions
/// let result = engine.eval::<String>(r#"
/// let company = new_company().name("Test Corp");
/// company.name
/// "#).unwrap();
/// assert_eq!(result, "Test Corp");
/// ```
///
/// # Registered Modules
///
/// This function registers the following domain modules:
/// - Access control functions
/// - Business operation functions (companies, products, sales, shareholders)
/// - Calendar and scheduling functions
/// - Circle and community management functions
/// - Company management functions
/// - Contact management functions
/// - Core utility functions
/// - Financial operation functions (accounts, assets, marketplace)
/// - Workflow management functions (flows, steps, signatures)
/// - Library and content management functions
/// - Generic object manipulation functions (custom implementation)
pub fn register_dsl_modules(engine: &mut Engine) {
rhailib_dsl::access::register_access_rhai_module(engine);
rhailib_dsl::biz::register_biz_rhai_module(engine);
rhailib_dsl::calendar::register_calendar_rhai_module(engine);
rhailib_dsl::circle::register_circle_rhai_module(engine);
rhailib_dsl::company::register_company_rhai_module(engine);
rhailib_dsl::contact::register_contact_rhai_module(engine);
rhailib_dsl::core::register_core_rhai_module(engine);
rhailib_dsl::finance::register_finance_rhai_modules(engine);
// rhailib_dsl::flow::register_flow_rhai_modules(engine);
rhailib_dsl::library::register_library_rhai_module(engine);
// Skip problematic object module for now - can be implemented separately if needed
// rhailib_dsl::object::register_object_fns(engine);
rhailib_dsl::payment::register_payment_rhai_module(engine);
// Register basic object functionality directly
register_object_functions(engine);
// heromodels::heroledger::rhai::register_heroledger_rhai_modules(&mut engine);
println!("Rhailib Domain Specific Language modules registered successfully.");
}
/// Create a new osis engine instance.
pub fn create_osis_engine() -> Engine {
let mut engine = Engine::new();
register_dsl_modules(&mut engine);
hero_logger::rhai_integration::configure_rhai_logging(&mut engine, "osis_actor");
engine
}
/// Create a shared osis engine using the factory.
pub fn create_shared_osis_engine() -> Arc<Engine> {
EngineFactory::global().get_engine()
}

124
src/engine/system.rs Normal file
View File

@@ -0,0 +1,124 @@
use std::sync::{Arc, OnceLock};
// Re-export common Rhai types for convenience
pub use rhai::{Array, Dynamic, Engine, EvalAltResult, Map};
// Re-export specific functions from sal-os package
pub use sal_os::rhai::{
delete,
// Download functions
download,
download_install,
// File system functions
exist,
file_size,
find_dir,
find_dirs,
find_file,
find_files,
mkdir,
register_os_module,
rsync,
};
// Re-export Redis client module registration function
pub use sal_redisclient::rhai::register_redisclient_module;
// Re-export PostgreSQL client module registration function
pub use sal_postgresclient::rhai::register_postgresclient_module;
pub use sal_process::rhai::{
kill,
process_get,
process_list,
register_process_module,
// Run functions
// Process management functions
which,
};
// Re-export virt functions from sal-virt package
pub use sal_virt::rhai::nerdctl::{
nerdctl_copy,
nerdctl_exec,
nerdctl_image_build,
nerdctl_image_commit,
nerdctl_image_pull,
nerdctl_image_push,
nerdctl_image_remove,
nerdctl_image_tag,
// Image functions
nerdctl_images,
nerdctl_list,
nerdctl_remove,
// Container functions
nerdctl_run,
nerdctl_run_with_name,
nerdctl_run_with_port,
nerdctl_stop,
};
pub use sal_virt::rhai::{
bah_new, register_bah_module, register_nerdctl_module, register_rfs_module,
};
pub use sal_git::rhai::register_git_module;
pub use sal_git::{GitRepo, GitTree};
pub use sal_zinit_client::rhai::register_zinit_module;
pub use sal_mycelium::rhai::register_mycelium_module;
pub use sal_text::rhai::register_text_module;
pub use sal_net::rhai::register_net_module;
pub use sal_kubernetes::rhai::register_kubernetes_module;
pub use sal_kubernetes::KubernetesManager;
pub use sal_os::rhai::copy as os_copy;
pub use sal_hetzner::rhai::register_hetzner_module;
/// Engine factory for creating and sharing Rhai engines with SAL modules.
pub struct EngineFactory {
engine: Arc<Engine>,
}
impl EngineFactory {
/// Create a new engine factory with a configured Rhai engine.
pub fn new() -> Self {
let mut engine = Engine::new();
register_sal_modules(&mut engine);
// Logger
hero_logger::rhai_integration::configure_rhai_logging(&mut engine, "system_actor");
Self {
engine: Arc::new(engine),
}
}
/// Get a shared reference to the engine.
pub fn get_engine(&self) -> Arc<Engine> {
Arc::clone(&self.engine)
}
/// Get the global singleton engine factory.
pub fn global() -> &'static EngineFactory {
static FACTORY: OnceLock<EngineFactory> = OnceLock::new();
FACTORY.get_or_init(|| EngineFactory::new())
}
}
pub fn register_sal_modules(engine: &mut Engine) {
let _ = sal_os::rhai::register_os_module(engine);
let _ = sal_redisclient::rhai::register_redisclient_module(engine);
let _ = sal_postgresclient::rhai::register_postgresclient_module(engine);
let _ = sal_process::rhai::register_process_module(engine);
let _ = sal_virt::rhai::register_virt_module(engine);
let _ = sal_git::rhai::register_git_module(engine);
let _ = sal_zinit_client::rhai::register_zinit_module(engine);
let _ = sal_mycelium::rhai::register_mycelium_module(engine);
let _ = sal_text::rhai::register_text_module(engine);
let _ = sal_net::rhai::register_net_module(engine);
let _ = sal_kubernetes::rhai::register_kubernetes_module(engine);
let _ = sal_hetzner::rhai::register_hetzner_module(engine);
println!("SAL modules registered successfully.");
}
/// Create a shared system engine using the factory.
pub fn create_system_engine() -> Arc<Engine> {
EngineFactory::global().get_engine()
}

View File

@@ -1,340 +1,82 @@
mod engine;
// Core modules
pub mod runner_trait;
pub mod async_runner;
pub mod sync_runner;
pub mod engine;
// Public exports
pub use engine::register_sal_modules;
pub use engine::create_system_engine;
// Public exports for convenience
pub use runner_trait::{Runner, RunnerConfig, spawn_runner};
pub use async_runner::{AsyncRunner, spawn_async_runner};
pub use sync_runner::{SyncRunner, spawn_sync_runner};
pub use engine::system::{register_sal_modules, create_system_engine};
pub use engine::osis::{register_dsl_modules, create_osis_engine, create_shared_osis_engine};
use async_trait::async_trait;
use hero_job::{Job, JobStatus};
use log::{debug, error, info, warn};
use rhai::Engine;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, Mutex};
use tokio::task::JoinHandle;
use tokio::time::timeout;
use baobab_actor::{actor_trait::Actor, spawn_actor, initialize_redis_connection};
// Re-export job types from hero-job crate
pub use hero_job::{Job, JobStatus, JobError, JobBuilder};
pub use redis::AsyncCommands;
use log::{debug, error, info};
/// Represents a running job with its handle and metadata
#[derive(Debug)]
struct RunningJob {
job_id: String,
handle: JoinHandle<()>,
started_at: std::time::Instant,
const NAMESPACE_PREFIX: &str = "hero:job:";
const BLPOP_TIMEOUT_SECONDS: usize = 5;
/// Initialize Redis connection for the runner
pub async fn initialize_redis_connection(
runner_id: &str,
redis_url: &str,
) -> Result<redis::aio::MultiplexedConnection, Box<dyn std::error::Error + Send + Sync>> {
let redis_client = redis::Client::open(redis_url)
.map_err(|e| {
error!("Runner for Runner ID '{}': Failed to open Redis client: {}", runner_id, e);
e
})?;
let redis_conn = redis_client.get_multiplexed_async_connection().await
.map_err(|e| {
error!("Runner for Runner ID '{}': Failed to get Redis connection: {}", runner_id, e);
e
})?;
info!("Runner for Runner ID '{}' successfully connected to Redis.", runner_id);
Ok(redis_conn)
}
/// Builder for AsyncWorker
#[derive(Debug, Default)]
pub struct AsyncWorkerBuilder {
actor_id: Option<String>,
db_path: Option<String>,
redis_url: Option<String>,
default_timeout: Option<Duration>,
}
impl AsyncWorkerBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn actor_id<S: Into<String>>(mut self, actor_id: S) -> Self {
self.actor_id = Some(actor_id.into());
self
}
pub fn db_path<S: Into<String>>(mut self, db_path: S) -> Self {
self.db_path = Some(db_path.into());
self
}
pub fn redis_url<S: Into<String>>(mut self, redis_url: S) -> Self {
self.redis_url = Some(redis_url.into());
self
}
pub fn default_timeout(mut self, timeout: Duration) -> Self {
self.default_timeout = Some(timeout);
self
}
pub fn build(self) -> Result<AsyncWorker, String> {
Ok(AsyncWorker {
actor_id: self.actor_id.ok_or("actor_id is required")?,
db_path: self.db_path.ok_or("db_path is required")?,
redis_url: self.redis_url.ok_or("redis_url is required")?,
default_timeout: self.default_timeout.unwrap_or(Duration::from_secs(300)),
running_jobs: Arc::new(Mutex::new(HashMap::new())),
})
}
}
/// Asynchronous actor that processes jobs concurrently
#[derive(Debug, Clone)]
pub struct AsyncWorker {
pub actor_id: String,
pub db_path: String,
pub redis_url: String,
pub default_timeout: Duration,
running_jobs: Arc<Mutex<HashMap<String, RunningJob>>>,
}
impl AsyncWorker {
/// Create a new AsyncWorkerBuilder
pub fn builder() -> AsyncWorkerBuilder {
AsyncWorkerBuilder::new()
}
/// Add a running job to the tracking map
async fn add_running_job(&self, job_id: String, handle: JoinHandle<()>) {
let running_job = RunningJob {
job_id: job_id.clone(),
handle,
started_at: std::time::Instant::now(),
};
let mut jobs = self.running_jobs.lock().await;
jobs.insert(job_id.clone(), running_job);
debug!("Async Worker: Added running job '{}'. Total running: {}",
job_id, jobs.len());
}
/// Remove a completed job from the tracking map
async fn remove_running_job(&self, job_id: &str) {
let mut jobs = self.running_jobs.lock().await;
if let Some(job) = jobs.remove(job_id) {
let duration = job.started_at.elapsed();
debug!("Async Worker: Removed completed job '{}' after {:?}. Remaining: {}",
job_id, duration, jobs.len());
}
}
/// Get the count of currently running jobs
pub async fn running_job_count(&self) -> usize {
let jobs = self.running_jobs.lock().await;
jobs.len()
}
/// Cleanup any finished jobs from the running jobs map
async fn cleanup_finished_jobs(&self) {
let mut jobs = self.running_jobs.lock().await;
let mut to_remove = Vec::new();
for (job_id, running_job) in jobs.iter() {
if running_job.handle.is_finished() {
to_remove.push(job_id.clone());
}
}
for job_id in to_remove {
if let Some(job) = jobs.remove(&job_id) {
let duration = job.started_at.elapsed();
debug!("Async Worker: Cleaned up finished job '{}' after {:?}",
job_id, duration);
}
}
}
/// Execute a single job asynchronously with timeout support
async fn execute_job_with_timeout(
job: Job,
engine: Engine,
actor_id: String,
redis_url: String,
job_timeout: Duration,
) {
let job_id = job.id.clone();
info!("Async Worker '{}', Job {}: Starting execution with timeout {:?}",
actor_id, job_id, job_timeout);
// Create a new Redis connection for this job
let mut redis_conn = match initialize_redis_connection(&actor_id, &redis_url).await {
Ok(conn) => conn,
Err(e) => {
error!("Async Worker '{}', Job {}: Failed to initialize Redis connection: {}",
actor_id, job_id, e);
return;
}
};
// Update job status to Started
if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Started).await {
error!("Async Worker '{}', Job {}: Failed to update status to Started: {}",
actor_id, job_id, e);
return;
}
// Create the script execution task
let script_task = async {
// Execute the Rhai script
match engine.eval::<rhai::Dynamic>(&job.script) {
Ok(result) => {
let result_str = format!("{:?}", result);
info!("Async Worker '{}', Job {}: Script executed successfully. Result: {}",
actor_id, job_id, result_str);
// Update job with success result
if let Err(e) = Job::set_result(&mut redis_conn, &job_id, &result_str).await {
error!("Async Worker '{}', Job {}: Failed to set result: {}",
actor_id, job_id, e);
return;
}
if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Finished).await {
error!("Async Worker '{}', Job {}: Failed to update status to Finished: {}",
actor_id, job_id, e);
}
}
Err(e) => {
let error_msg = format!("Script execution error: {}", e);
error!("Async Worker '{}', Job {}: {}", actor_id, job_id, error_msg);
// Update job with error
if let Err(e) = Job::set_error(&mut redis_conn, &job_id, &error_msg).await {
error!("Async Worker '{}', Job {}: Failed to set error: {}",
actor_id, job_id, e);
return;
}
if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Error).await {
error!("Async Worker '{}', Job {}: Failed to update status to Error: {}",
actor_id, job_id, e);
}
}
}
};
// Execute the script with timeout
match timeout(job_timeout, script_task).await {
Ok(()) => {
info!("Async Worker '{}', Job {}: Completed within timeout", actor_id, job_id);
}
Err(_) => {
warn!("Async Worker '{}', Job {}: Timed out after {:?}, marking as error",
actor_id, job_id, job_timeout);
let timeout_msg = format!("Job timed out after {:?}", job_timeout);
if let Err(e) = Job::set_error(&mut redis_conn, &job_id, &timeout_msg).await {
error!("Async Worker '{}', Job {}: Failed to set timeout error: {}",
actor_id, job_id, e);
}
if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Error).await {
error!("Async Worker '{}', Job {}: Failed to update status to Error after timeout: {}",
actor_id, job_id, e);
}
}
}
info!("Async Worker '{}', Job {}: Job processing completed", actor_id, job_id);
}
}
impl Default for AsyncWorker {
fn default() -> Self {
// Default AsyncWorker with placeholder values
// In practice, use the builder pattern instead
Self {
actor_id: "default_async_actor".to_string(),
db_path: "/tmp".to_string(),
redis_url: "redis://localhost:6379".to_string(),
default_timeout: Duration::from_secs(300),
running_jobs: Arc::new(Mutex::new(HashMap::new())),
}
}
}
#[async_trait]
impl Actor for AsyncWorker {
async fn process_job(&self, job: hero_job::Job, _redis_conn: &mut redis::aio::MultiplexedConnection) {
let job_id = job.id.clone();
let actor_id = &self.actor_id.clone();
// Determine timeout (use job-specific timeout if available, otherwise default)
let job_timeout = if job.timeout.as_secs() > 0 {
job.timeout
} else {
self.default_timeout // Use actor's default timeout
};
info!("Async Worker '{}', Job {}: Spawning job execution task with timeout {:?}",
actor_id, job_id, job_timeout);
// Clone necessary data for the spawned task
let job_id_clone = job_id.clone();
let actor_id_clone = actor_id.clone();
let actor_id_debug = actor_id.clone(); // Additional clone for debug statement
let job_id_debug = job_id.clone(); // Additional clone for debug statement
let redis_url_clone = self.redis_url.clone();
let running_jobs_clone = Arc::clone(&self.running_jobs);
// Spawn the job execution task
let job_handle = tokio::spawn(async move {
// Create engine for this job - we need to get it from somewhere
// For now, let's assume we need to create a new engine instance
let mut engine = rhai::Engine::new();
engine::register_sal_modules(&mut engine);
Self::execute_job_with_timeout(
job,
engine,
actor_id_clone,
redis_url_clone,
job_timeout,
).await;
// Remove this job from the running jobs map when it completes
let mut jobs = running_jobs_clone.lock().await;
if let Some(running_job) = jobs.remove(&job_id_clone) {
let duration = running_job.started_at.elapsed();
debug!("Async Worker '{}': Removed completed job '{}' after {:?}",
actor_id_debug, job_id_debug, duration);
}
});
// Add the job to the running jobs map
self.add_running_job(job_id, job_handle).await;
// Cleanup finished jobs periodically
self.cleanup_finished_jobs().await;
}
fn actor_type(&self) -> &'static str {
"Async"
/// Load job from Redis using the supervisor's Job API
pub async fn load_job_from_redis(
redis_conn: &mut redis::aio::MultiplexedConnection,
job_id: &str,
runner_id: &str,
) -> Result<Job, JobError> {
debug!("Runner '{}', Job {}: Loading job from Redis", runner_id, job_id);
// Load job data from Redis hash
let job_key = format!("hero:job:{}", job_id);
let job_data: std::collections::HashMap<String, String> = redis_conn.hgetall(&job_key).await
.map_err(JobError::Redis)?;
if job_data.is_empty() {
return Err(JobError::NotFound(job_id.to_string()));
}
fn actor_id(&self) -> &str {
&self.actor_id
}
// Parse job from hash data using the supervisor's Job struct
let job = Job {
id: job_id.to_string(),
caller_id: job_data.get("caller_id").unwrap_or(&"".to_string()).clone(),
context_id: job_data.get("context_id").unwrap_or(&"".to_string()).clone(),
payload: job_data.get("payload").unwrap_or(&"".to_string()).clone(),
runner: job_data.get("runner").unwrap_or(&"default".to_string()).clone(),
executor: job_data.get("executor").unwrap_or(&"rhai".to_string()).clone(),
timeout: job_data.get("timeout").and_then(|s| s.parse().ok()).unwrap_or(300),
env_vars: serde_json::from_str(job_data.get("env_vars").unwrap_or(&"{}".to_string()))
.map_err(JobError::Serialization)?,
created_at: job_data.get("created_at")
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(chrono::Utc::now),
updated_at: job_data.get("updated_at")
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(chrono::Utc::now),
};
fn redis_url(&self) -> &str {
&self.redis_url
}
}
/// Convenience function to spawn an asynchronous actor using the trait interface
///
/// This function provides a clean interface for the new async actor implementation
/// with timeout support.
pub fn spawn_async_actor(
actor_id: String,
db_path: String,
engine: rhai::Engine,
redis_url: String,
shutdown_rx: mpsc::Receiver<()>,
default_timeout: std::time::Duration,
) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
use std::sync::Arc;
let actor = Arc::new(
AsyncWorker::builder()
.actor_id(actor_id)
.db_path(db_path)
.redis_url(redis_url)
.default_timeout(default_timeout)
.build()
.expect("Failed to build AsyncActor")
);
spawn_actor(actor, shutdown_rx)
Ok(job)
}

267
src/runner_trait.rs Normal file
View File

@@ -0,0 +1,267 @@
//! # Runner Trait Abstraction
//!
//! This module provides a trait-based abstraction for Rhai runners that eliminates
//! code duplication between synchronous and asynchronous runner implementations.
//!
//! The `Runner` trait defines the common interface and behavior, while specific
//! implementations handle job processing differently (sync vs async).
//!
//! ## Architecture
//!
//! ```text
//! ┌─────────────────┐ ┌─────────────────┐
//! │ SyncRunner │ │ AsyncRunner │
//! │ │ │ │
//! │ process_job() │ │ process_job() │
//! │ (sequential) │ │ (concurrent) │
//! └─────────────────┘ └─────────────────┘
//! │ │
//! └───────┬───────────────┘
//! │
//! ┌───────▼───────┐
//! │ Runner Trait │
//! │ │
//! │ spawn() │
//! │ config │
//! │ common loop │
//! └───────────────┘
//! ```
use hero_job::{Job, JobStatus};
use log::{debug, error, info};
use redis::AsyncCommands;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use crate::{initialize_redis_connection, BLPOP_TIMEOUT_SECONDS};
/// Configuration for runner instances
#[derive(Debug, Clone)]
pub struct RunnerConfig {
pub runner_id: String,
pub db_path: String,
pub redis_url: String,
pub default_timeout: Option<Duration>, // Only used by async runners
}
impl RunnerConfig {
/// Create a new runner configuration
pub fn new(
runner_id: String,
db_path: String,
redis_url: String,
) -> Self {
Self {
runner_id,
db_path,
redis_url,
default_timeout: None,
}
}
/// Set default timeout for async runners
pub fn with_default_timeout(mut self, timeout: Duration) -> Self {
self.default_timeout = Some(timeout);
self
}
}
/// Trait defining the common interface for Rhai runners
///
/// This trait abstracts the common functionality between synchronous and
/// asynchronous runners, allowing them to share the same spawn logic and
/// Redis polling loop while implementing different job processing strategies.
pub trait Runner: Send + Sync + 'static {
/// Process a single job
///
/// This is the core method that differentiates runner implementations:
/// - Sync runners process jobs sequentially, one at a time
/// - Async runners spawn concurrent tasks for each job
///
/// # Arguments
///
/// * `job` - The job to process
///
/// Note: The engine is now owned by the runner implementation as a field
/// For sync runners, this should be a blocking operation
/// For async runners, this can spawn tasks and return immediately
fn process_job(&self, job: Job) -> Result<String, Box<dyn std::error::Error + Send + Sync>>;
/// Get the runner type name for logging
fn runner_type(&self) -> &'static str;
/// Get runner ID for this runner instance
fn runner_id(&self) -> &str;
/// Get Redis URL for this runner instance
fn redis_url(&self) -> &str;
/// Spawn the runner
///
/// This method provides the common runner loop implementation that both
/// sync and async runners can use. It handles:
/// - Redis connection setup
/// - Job polling from Redis queue
/// - Shutdown signal handling
/// - Delegating job processing to the implementation
///
/// Note: The engine is now owned by the runner implementation as a field
fn spawn(
self: Arc<Self>,
mut shutdown_rx: mpsc::Receiver<()>,
) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
tokio::spawn(async move {
let runner_id = self.runner_id();
let redis_url = self.redis_url();
// Canonical work queue based on runner_id
let queue_key = format!("runner_queue:{}", runner_id);
info!(
"{} Runner '{}' starting. Connecting to Redis at {}. Listening on queue: {}",
self.runner_type(),
runner_id,
redis_url,
queue_key
);
let mut redis_conn = initialize_redis_connection(runner_id, redis_url).await?;
loop {
let blpop_keys = vec![queue_key.clone()];
tokio::select! {
// Listen for shutdown signal
_ = shutdown_rx.recv() => {
info!("{} Runner '{}': Shutdown signal received. Terminating loop.",
self.runner_type(), runner_id);
break;
}
// Listen for tasks from Redis
blpop_result = redis_conn.blpop(&blpop_keys, BLPOP_TIMEOUT_SECONDS as f64) => {
debug!("{} Runner '{}': Attempting BLPOP on queue: {}",
self.runner_type(), runner_id, queue_key);
let response: Option<(String, String)> = match blpop_result {
Ok(resp) => resp,
Err(e) => {
error!("{} Runner '{}': Redis BLPOP error on queue {}: {}. Runner for this circle might stop.",
self.runner_type(), runner_id, queue_key, e);
return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
}
};
if let Some((_queue_name_recv, job_id)) = response {
info!("{} Runner '{}' received job_id: {} from queue: {}",
self.runner_type(), runner_id, job_id, _queue_name_recv);
// Load the job from Redis
match crate::load_job_from_redis(&mut redis_conn, &job_id, runner_id).await {
Ok(job) => {
// Check for ping job and handle it directly
if job.payload.trim() == "ping" {
info!("{} Runner '{}': Received ping job '{}', responding with pong",
self.runner_type(), runner_id, job_id);
// Update job status to started
if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Started).await {
error!("{} Runner '{}': Failed to update ping job '{}' status to Started: {}",
self.runner_type(), runner_id, job_id, e);
}
// Set result to "pong" and mark as finished
if let Err(e) = Job::set_result(&mut redis_conn, &job_id, "pong").await {
error!("{} Runner '{}': Failed to set ping job '{}' result: {}",
self.runner_type(), runner_id, job_id, e);
}
if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Finished).await {
error!("{} Runner '{}': Failed to update ping job '{}' status to Finished: {}",
self.runner_type(), runner_id, job_id, e);
}
info!("{} Runner '{}': Successfully responded to ping job '{}' with pong",
self.runner_type(), runner_id, job_id);
} else {
// Update job status to started
if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Started).await {
error!("{} Runner '{}': Failed to update job '{}' status to Started: {}",
self.runner_type(), runner_id, job_id, e);
}
// Delegate job processing to the implementation
match self.process_job(job) {
Ok(result) => {
// Set result and mark as finished
if let Err(e) = Job::set_result(&mut redis_conn, &job_id, &result).await {
error!("{} Runner '{}': Failed to set job '{}' result: {}",
self.runner_type(), runner_id, job_id, e);
}
if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Finished).await {
error!("{} Runner '{}': Failed to update job '{}' status to Finished: {}",
self.runner_type(), runner_id, job_id, e);
}
}
Err(e) => {
let error_str = format!("{:?}", e);
error!("{} Runner '{}': Job '{}' processing failed: {}",
self.runner_type(), runner_id, job_id, error_str);
// Set error and mark as error
if let Err(e) = Job::set_error(&mut redis_conn, &job_id, &error_str).await {
error!("{} Runner '{}': Failed to set job '{}' error: {}",
self.runner_type(), runner_id, job_id, e);
}
if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Error).await {
error!("{} Runner '{}': Failed to update job '{}' status to Error: {}",
self.runner_type(), runner_id, job_id, e);
}
}
}
}
}
Err(e) => {
error!("{} Runner '{}': Failed to load job '{}': {}",
self.runner_type(), runner_id, job_id, e);
}
}
} else {
debug!("{} Runner '{}': BLPOP timed out on queue {}. No new tasks.",
self.runner_type(), runner_id, queue_key);
}
}
}
}
info!("{} Runner '{}' has shut down.", self.runner_type(), runner_id);
Ok(())
})
}
}
/// Convenience function to spawn a runner with the trait-based interface
///
/// This function provides a unified interface for spawning any runner implementation
/// that implements the Runner trait.
///
/// # Arguments
///
/// * `runner` - The runner implementation to spawn
/// * `shutdown_rx` - Channel receiver for shutdown signals
///
/// # Returns
///
/// Returns a `JoinHandle` that can be awaited to wait for runner shutdown.
pub fn spawn_runner<W: Runner>(
runner: Arc<W>,
shutdown_rx: mpsc::Receiver<()>,
) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
runner.spawn(shutdown_rx)
}
/// Helper to derive queue name from runner_id
fn derive_queue_from_runner_id(runner_id: &str) -> String {
format!("runner_queue:{}", runner_id)
}

124
src/sync_runner.rs Normal file
View File

@@ -0,0 +1,124 @@
use hero_job::Job;
use log::{debug, error, info};
use rhai::{Dynamic, Engine};
use std::sync::Arc;
use crate::runner_trait::Runner;
/// Configuration for sync runner instances
#[derive(Debug, Clone)]
pub struct SyncRunnerConfig {
pub runner_id: String,
pub db_path: String,
pub redis_url: String,
pub preserve_tasks: bool,
}
/// Synchronous runner that processes jobs sequentially
pub struct SyncRunner {
pub config: SyncRunnerConfig,
pub engine_factory: Arc<dyn Fn() -> Engine + Send + Sync>,
}
impl SyncRunner {
/// Create a new SyncRunner with the provided engine factory
pub fn new<F>(config: SyncRunnerConfig, engine_factory: F) -> Self
where
F: Fn() -> Engine + Send + Sync + 'static,
{
Self {
config,
engine_factory: Arc::new(engine_factory),
}
}
/// Execute a job with the given engine, setting proper job context
///
/// This function sets up the engine with job context (DB_PATH, CALLER_ID, CONTEXT_ID)
/// and evaluates the script. It returns the result or error.
fn execute_job_with_engine(
engine: &mut Engine,
job: &Job,
db_path: &str,
) -> Result<Dynamic, Box<rhai::EvalAltResult>> {
// Set up job context in the engine
let mut db_config = rhai::Map::new();
db_config.insert("DB_PATH".into(), db_path.to_string().into());
db_config.insert("CALLER_ID".into(), job.caller_id.clone().into());
db_config.insert("CONTEXT_ID".into(), job.context_id.clone().into());
engine.set_default_tag(Dynamic::from(db_config));
debug!("Sync Runner for Context ID '{}': Evaluating script with Rhai engine (job context set).", job.context_id);
// Execute the script with the configured engine
engine.eval::<Dynamic>(&job.payload)
}
}
impl Runner for SyncRunner {
fn process_job(&self, job: Job) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
let job_id = &job.id;
let runner_id = &self.config.runner_id;
debug!("Sync Runner '{}', Job {}: Processing started.", runner_id, job_id);
info!("Sync Runner '{}' processing job_id: {}. Script: {:.50}...", job.context_id, job_id, job.payload);
// Create a new engine instance for this job execution
let mut engine = (self.engine_factory)();
// Execute the script
match Self::execute_job_with_engine(&mut engine, &job, &self.config.db_path) {
Ok(result) => {
let output_str = if result.is::<String>() {
result.into_string().unwrap()
} else {
result.to_string()
};
info!("Sync Runner for Context ID '{}' job {} completed. Output: {}", job.context_id, job.id, output_str);
Ok(output_str)
}
Err(e) => {
let error_str = format!("{:?}", *e);
error!("Sync Runner for Context ID '{}' job {} script evaluation failed. Error: {}", job.context_id, job.id, error_str);
Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
}
}
}
fn runner_type(&self) -> &'static str {
"Sync"
}
fn runner_id(&self) -> &str {
&self.config.runner_id
}
fn redis_url(&self) -> &str {
&self.config.redis_url
}
}
/// Convenience function to spawn a synchronous runner using the trait interface
pub fn spawn_sync_runner<F>(
runner_id: String,
db_path: String,
redis_url: String,
shutdown_rx: tokio::sync::mpsc::Receiver<()>,
preserve_tasks: bool,
engine_factory: F,
) -> tokio::task::JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>>
where
F: Fn() -> Engine + Send + Sync + 'static,
{
let config = SyncRunnerConfig {
runner_id,
db_path,
redis_url,
preserve_tasks,
};
let runner = Arc::new(SyncRunner::new(config, engine_factory));
crate::runner_trait::spawn_runner(runner, shutdown_rx)
}