From d744c2cd165d29d870cc8c59beecaa803b0193b8 Mon Sep 17 00:00:00 2001 From: Timur Gordon <31495328+timurgordon@users.noreply.github.com> Date: Tue, 9 Sep 2025 17:54:09 +0200 Subject: [PATCH] add tests and fix job impl --- Cargo.lock | 98 +++++----- Cargo.toml | 9 +- cmd/actor.rs | 60 ------ cmd/async_actor.rs | 71 ------- cmd/sync_actor.rs | 70 ------- cmd/terminal_ui.rs | 6 +- examples/engine.rs | 6 +- scripts/test.sh | 10 + src/async_runner.rs | 2 +- src/bin/runner_osis/main.rs | 30 ++- src/bin/runner_sal/engine.rs | 24 +-- src/bin/runner_sal/main.rs | 31 +++- src/client.rs | 349 +++++++++++++++++++++++++++++++++++ src/engine/osis.rs | 2 +- src/job.rs | 222 ++++++++++++++++++++++ src/lib.rs | 85 ++++----- src/runner_trait.rs | 37 ++-- src/script_mode.rs | 169 +++++++++++++++++ src/sync_runner.rs | 2 +- test_sync_runner.rs | 47 ----- tests/e2e_tests.rs | 130 +++++++++++++ 21 files changed, 1067 insertions(+), 393 deletions(-) delete mode 100644 cmd/actor.rs delete mode 100644 cmd/async_actor.rs delete mode 100644 cmd/sync_actor.rs create mode 100755 scripts/test.sh create mode 100644 src/client.rs create mode 100644 src/job.rs create mode 100644 src/script_mode.rs delete mode 100644 test_sync_runner.rs create mode 100644 tests/e2e_tests.rs diff --git a/Cargo.lock b/Cargo.lock index a81c407..df0335c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,49 +2,6 @@ # It is not intended for manual editing. version = 4 -[[package]] -name = "actor_system" -version = "0.1.0" -dependencies = [ - "anyhow", - "async-trait", - "chrono", - "clap", - "crossterm", - "env_logger", - "hero-job", - "hero_logger", - "heromodels", - "heromodels-derive", - "heromodels_core", - "log", - "ratatui", - "redis 0.25.4", - "rhai", - "rhailib_dsl", - "sal-git", - "sal-hetzner", - "sal-kubernetes", - "sal-mycelium", - "sal-net", - "sal-os", - "sal-postgresclient", - "sal-process", - "sal-redisclient", - "sal-service-manager", - "sal-text", - "sal-vault", - "sal-virt", - "sal-zinit-client", - "serde", - "serde_json", - "thiserror 1.0.69", - "tokio", - "toml", - "tracing", - "uuid", -] - [[package]] name = "addr2line" version = "0.24.2" @@ -1304,19 +1261,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" -[[package]] -name = "hero-job" -version = "0.1.0" -dependencies = [ - "chrono", - "log", - "redis 0.25.4", - "serde", - "serde_json", - "thiserror 1.0.69", - "uuid", -] - [[package]] name = "hero_logger" version = "0.1.0" @@ -3195,6 +3139,48 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "runner_rust" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "chrono", + "clap", + "crossterm", + "env_logger", + "hero_logger", + "heromodels", + "heromodels-derive", + "heromodels_core", + "log", + "ratatui", + "redis 0.25.4", + "rhai", + "rhailib_dsl", + "sal-git", + "sal-hetzner", + "sal-kubernetes", + "sal-mycelium", + "sal-net", + "sal-os", + "sal-postgresclient", + "sal-process", + "sal-redisclient", + "sal-service-manager", + "sal-text", + "sal-vault", + "sal-virt", + "sal-zinit-client", + "serde", + "serde_json", + "thiserror 1.0.69", + "tokio", + "toml", + "tracing", + "uuid", +] + [[package]] name = "rust_decimal" version = "1.37.2" diff --git a/Cargo.toml b/Cargo.toml index ec121ff..31b2375 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,10 @@ [package] -name = "actor_system" +name = "runner_rust" version = "0.1.0" edition = "2024" [lib] -name = "actor_system" # Can be different from package name, or same +name = "runner_rust" # Can be different from package name, or same path = "src/lib.rs" [[bin]] @@ -23,9 +23,6 @@ path = "test_sync_runner.rs" name = "engine" path = "examples/engine.rs" -[[example]] -name = "actor" -path = "examples/actor.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -45,8 +42,6 @@ toml = "0.8" thiserror = "1.0" async-trait = "0.1" # Core hero dependencies -hero-job = { path = "../job" } -#hero-job = { git = "https://git.ourworld.tf/herocode/job.git" } heromodels = { git = "https://git.ourworld.tf/herocode/db.git" } heromodels_core = { git = "https://git.ourworld.tf/herocode/db.git" } heromodels-derive = { git = "https://git.ourworld.tf/herocode/db.git" } diff --git a/cmd/actor.rs b/cmd/actor.rs deleted file mode 100644 index 51cfa94..0000000 --- a/cmd/actor.rs +++ /dev/null @@ -1,60 +0,0 @@ -use actor_system::AsyncWorker; -use clap::Parser; -use log::info; -use std::sync::Arc; -use tokio::sync::mpsc; - -#[derive(Parser, Debug)] -#[command(name = "actor_system")] -#[command(about = "System Actor - Asynchronous job processing actor")] -struct Args { - /// Database path - #[arg(short, long, default_value = "/tmp/system_db")] - db_path: String, - - /// Redis URL - #[arg(short, long, default_value = "redis://localhost:6379")] - redis_url: String, - - /// Preserve completed tasks in Redis - #[arg(short, long)] - preserve_tasks: bool, -} - -#[tokio::main] -async fn main() -> Result<(), Box> { - env_logger::init(); - - let args = Args::parse(); - - info!("Starting System Actor"); - - // Create shutdown channel - let (shutdown_tx, shutdown_rx) = mpsc::channel(1); - - // Setup signal handler for graceful shutdown - let shutdown_tx_clone = shutdown_tx.clone(); - tokio::spawn(async move { - tokio::signal::ctrl_c().await.expect("Failed to listen for Ctrl+C"); - info!("Received Ctrl+C, initiating shutdown..."); - let _ = shutdown_tx_clone.send(()).await; - }); - - // Create and start the actor - let actor = Arc::new( - AsyncWorker::builder() - .db_path(args.db_path) - .redis_url(args.redis_url) - .build()? - ); - - let handle = baobab_actor::spawn_actor(actor, shutdown_rx); - - info!("System Actor started, waiting for jobs..."); - - // Wait for the actor to complete - handle.await??; - - info!("System Actor shutdown complete"); - Ok(()) -} diff --git a/cmd/async_actor.rs b/cmd/async_actor.rs deleted file mode 100644 index cb6d70b..0000000 --- a/cmd/async_actor.rs +++ /dev/null @@ -1,71 +0,0 @@ -use clap::Parser; -use log::info; -use std::time::Duration; -use tokio::sync::mpsc; - -use actor_system::{spawn_async_actor}; - -#[derive(Parser, Debug)] -#[command(name = "async_actor")] -#[command(about = "Async Actor - processes jobs concurrently with SAL modules")] -struct Args { - /// Actor ID for this instance - #[arg(short, long)] - actor_id: String, - - /// Database path - #[arg(short, long, default_value = "/tmp/actor_db")] - db_path: String, - - /// Redis URL - #[arg(short, long, default_value = "redis://localhost:6379")] - redis_url: String, - - /// Default timeout in seconds for job execution - #[arg(short, long, default_value = "300")] - timeout: u64, -} - -#[tokio::main] -async fn main() -> Result<(), Box> { - env_logger::init(); - - let args = Args::parse(); - - info!("Starting Async Actor with ID: {}", args.actor_id); - info!("Database path: {}", args.db_path); - info!("Redis URL: {}", args.redis_url); - info!("Default timeout: {}s", args.timeout); - - // Create shutdown channel - let (_shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); - - // Spawn the async actor - let handle = spawn_async_actor( - args.actor_id, - args.db_path, - args.redis_url, - shutdown_rx, - Duration::from_secs(args.timeout), - ); - - // Wait for the actor to complete - match handle.await { - Ok(result) => { - match result { - Ok(()) => { - info!("Async Actor completed successfully"); - Ok(()) - } - Err(e) => { - eprintln!("Async Actor error: {}", e); - Err(e) - } - } - } - Err(e) => { - eprintln!("Failed to join async actor task: {}", e); - Err(Box::new(e)) - } - } -} diff --git a/cmd/sync_actor.rs b/cmd/sync_actor.rs deleted file mode 100644 index 1725e73..0000000 --- a/cmd/sync_actor.rs +++ /dev/null @@ -1,70 +0,0 @@ -use clap::Parser; -use log::info; -use tokio::sync::mpsc; - -use actor_system::{spawn_sync_actor}; - -#[derive(Parser, Debug)] -#[command(name = "sync_actor")] -#[command(about = "Sync Actor - processes jobs sequentially with DSL modules")] -struct Args { - /// Actor ID for this instance - #[arg(short, long)] - actor_id: String, - - /// Database path - #[arg(short, long, default_value = "/tmp/actor_db")] - db_path: String, - - /// Redis URL - #[arg(short, long, default_value = "redis://localhost:6379")] - redis_url: String, - - /// Preserve completed tasks in Redis (don't delete them) - #[arg(short, long, default_value = "false")] - preserve_tasks: bool, -} - -#[tokio::main] -async fn main() -> Result<(), Box> { - env_logger::init(); - - let args = Args::parse(); - - info!("Starting Sync Actor with ID: {}", args.actor_id); - info!("Database path: {}", args.db_path); - info!("Redis URL: {}", args.redis_url); - info!("Preserve tasks: {}", args.preserve_tasks); - - // Create shutdown channel - let (_shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); - - // Spawn the sync actor - let handle = spawn_sync_actor( - args.actor_id, - args.db_path, - args.redis_url, - shutdown_rx, - args.preserve_tasks, - ); - - // Wait for the actor to complete - match handle.await { - Ok(result) => { - match result { - Ok(()) => { - info!("Sync Actor completed successfully"); - Ok(()) - } - Err(e) => { - eprintln!("Sync Actor error: {}", e); - Err(e) - } - } - } - Err(e) => { - eprintln!("Failed to join sync actor task: {}", e); - Err(Box::new(e)) - } - } -} diff --git a/cmd/terminal_ui.rs b/cmd/terminal_ui.rs index 27ef531..3b0b201 100644 --- a/cmd/terminal_ui.rs +++ b/cmd/terminal_ui.rs @@ -46,7 +46,7 @@ fn create_app(args: &Args) -> Result { .unwrap_or_else(|_| ".".to_string()); let crate_root = PathBuf::from(crate_root); - let actor_path = crate_root.join("target/debug/actor_system"); + let actor_path = crate_root.join("target/debug/runner_rust"); let example_dir = Some(crate_root.join("examples/scripts")); let mut app = App::new( @@ -68,7 +68,7 @@ fn spawn_actor_process(_args: &Args) -> Result { // Get the crate root directory let crate_root = std::env::var("CARGO_MANIFEST_DIR") .unwrap_or_else(|_| ".".to_string()); - let actor_path = PathBuf::from(crate_root).join("target/debug/actor_system"); + let actor_path = PathBuf::from(crate_root).join("target/debug/runner_rust"); info!("🎬 Spawning actor process: {}", actor_path.display()); let mut cmd = Command::new(&actor_path); @@ -123,7 +123,7 @@ async fn main() -> Result<()> { info!("🚀 Starting Baobab Actor TUI..."); info!("Actor ID: sal (System Actor)"); - info!("Actor Path: {}/target/debug/actor_system", crate_root); + info!("Actor Path: {}/target/debug/runner_rust", crate_root); info!("Redis URL: {}", args.redis_url); info!("Script Type: SAL"); info!("Example Directory: {}/examples/scripts", crate_root); diff --git a/examples/engine.rs b/examples/engine.rs index d90fa94..11bfd56 100644 --- a/examples/engine.rs +++ b/examples/engine.rs @@ -4,7 +4,7 @@ use std::panic; use std::path::Path; use rhai::{Engine, Dynamic}; -use actor_system::AsyncWorker; +use runner_rust::AsyncRunner; /// Recursively collect all .rhai files from a directory and its subdirectories fn collect_rhai_files(dir: &Path, files: &mut Vec) -> Result<(), Box> { @@ -113,7 +113,7 @@ fn main() -> Result<(), Box> { }; // Create a new engine instance and configure it with DSL modules - let mut engine_with_context = match create_configured_engine(db_path, i + 1, verbose) { + let engine_with_context = match create_configured_engine(db_path, i + 1, verbose) { Ok(engine) => engine, Err(e) => { println!("\x1b[31m✗\x1b[0m {} ... \x1b[31mFAILED\x1b[0m (engine setup: {})", script_name, e); @@ -185,7 +185,7 @@ fn create_configured_engine(db_path: &str, script_index: usize, verbose: bool) - let mut engine = Engine::new(); // Register all DSL modules (same as OSIS engine configuration) - actor_system::register_sal_modules(&mut engine); + runner_rust::register_sal_modules(&mut engine); // Set up job context tags (similar to execute_job_with_engine) let mut db_config = rhai::Map::new(); diff --git a/scripts/test.sh b/scripts/test.sh new file mode 100755 index 0000000..a47a20b --- /dev/null +++ b/scripts/test.sh @@ -0,0 +1,10 @@ +#!/bin/bash +set -e + +SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +RUNNER_DIR=$(cd "$SCRIPT_DIR/.." && pwd) + +pushd "$RUNNER_DIR" +cargo check --all +cargo test --all +popd \ No newline at end of file diff --git a/src/async_runner.rs b/src/async_runner.rs index d77fb2d..09b2278 100644 --- a/src/async_runner.rs +++ b/src/async_runner.rs @@ -1,4 +1,4 @@ -use hero_job::Job; +use crate::job::Job; use log::{debug, error, info}; use rhai::Engine; use std::collections::HashMap; diff --git a/src/bin/runner_osis/main.rs b/src/bin/runner_osis/main.rs index e81b079..1f03c61 100644 --- a/src/bin/runner_osis/main.rs +++ b/src/bin/runner_osis/main.rs @@ -1,4 +1,4 @@ -use actor_system::spawn_sync_runner; +use runner_rust::{spawn_sync_runner, script_mode::execute_script_mode}; use clap::Parser; use log::{error, info}; use tokio::sync::mpsc; @@ -23,6 +23,10 @@ struct Args { /// Preserve tasks after completion #[arg(short, long, default_value_t = false)] preserve_tasks: bool, + + /// Script to execute in single-job mode (optional) + #[arg(short, long)] + script: Option, } #[tokio::main] @@ -32,6 +36,30 @@ async fn main() -> Result<(), Box> { let args = Args::parse(); + // Check if we're in script mode + if let Some(script_content) = args.script { + info!("Running in script mode with runner ID: {}", args.runner_id); + + let result = execute_script_mode( + &script_content, + &args.runner_id, + args.redis_url, + std::time::Duration::from_secs(300), // Default timeout for OSIS + create_osis_engine, + ).await; + + match result { + Ok(output) => { + println!("Script execution result:\n{}", output); + return Ok(()); + } + Err(e) => { + error!("Script execution failed: {}", e); + return Err(e); + } + } + } + info!("Starting OSIS Sync Runner with ID: {}", args.runner_id); info!("Database path: {}", args.db_path); info!("Redis URL: {}", args.redis_url); diff --git a/src/bin/runner_sal/engine.rs b/src/bin/runner_sal/engine.rs index 5e41ac7..7f00afc 100644 --- a/src/bin/runner_sal/engine.rs +++ b/src/bin/runner_sal/engine.rs @@ -102,18 +102,18 @@ impl EngineFactory { } pub fn register_sal_modules(engine: &mut Engine) { - sal_os::rhai::register_os_module(engine); - sal_redisclient::rhai::register_redisclient_module(engine); - sal_postgresclient::rhai::register_postgresclient_module(engine); - sal_process::rhai::register_process_module(engine); - sal_virt::rhai::register_virt_module(engine); - sal_git::rhai::register_git_module(engine); - sal_zinit_client::rhai::register_zinit_module(engine); - sal_mycelium::rhai::register_mycelium_module(engine); - sal_text::rhai::register_text_module(engine); - sal_net::rhai::register_net_module(engine); - sal_kubernetes::rhai::register_kubernetes_module(engine); - sal_hetzner::rhai::register_hetzner_module(engine); + let _ = sal_os::rhai::register_os_module(engine); + let _ = sal_redisclient::rhai::register_redisclient_module(engine); + let _ = sal_postgresclient::rhai::register_postgresclient_module(engine); + let _ = sal_process::rhai::register_process_module(engine); + let _ = sal_virt::rhai::register_virt_module(engine); + let _ = sal_git::rhai::register_git_module(engine); + let _ = sal_zinit_client::rhai::register_zinit_module(engine); + let _ = sal_mycelium::rhai::register_mycelium_module(engine); + let _ = sal_text::rhai::register_text_module(engine); + let _ = sal_net::rhai::register_net_module(engine); + let _ = sal_kubernetes::rhai::register_kubernetes_module(engine); + let _ = sal_hetzner::rhai::register_hetzner_module(engine); println!("SAL modules registered successfully."); } diff --git a/src/bin/runner_sal/main.rs b/src/bin/runner_sal/main.rs index 3f34ee5..12bb073 100644 --- a/src/bin/runner_sal/main.rs +++ b/src/bin/runner_sal/main.rs @@ -1,7 +1,6 @@ -use actor_system::{spawn_async_runner, AsyncRunner}; +use runner_rust::{spawn_async_runner, script_mode::execute_script_mode}; use clap::Parser; use log::{error, info}; -use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; @@ -25,6 +24,10 @@ struct Args { /// Default timeout for jobs in seconds #[arg(short, long, default_value_t = 300)] timeout: u64, + + /// Script to execute in single-job mode (optional) + #[arg(short, long)] + script: Option, } #[tokio::main] @@ -34,6 +37,30 @@ async fn main() -> Result<(), Box> { let args = Args::parse(); + // Check if we're in script mode + if let Some(script_content) = args.script { + info!("Running in script mode with runner ID: {}", args.runner_id); + + let result = execute_script_mode( + &script_content, + &args.runner_id, + args.redis_url, + Duration::from_secs(args.timeout), + create_sal_engine, + ).await; + + match result { + Ok(output) => { + println!("Script execution result:\n{}", output); + return Ok(()); + } + Err(e) => { + error!("Script execution failed: {}", e); + return Err(e); + } + } + } + info!("Starting SAL Async Runner with ID: {}", args.runner_id); info!("Database path: {}", args.db_path); info!("Redis URL: {}", args.redis_url); diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..228a3bf --- /dev/null +++ b/src/client.rs @@ -0,0 +1,349 @@ +//! Job client implementation for managing jobs in Redis + +use chrono::Utc; +use redis::AsyncCommands; +use crate::job::{Job, JobStatus, JobError}; + +/// Client for managing jobs in Redis +#[derive(Debug, Clone)] +pub struct Client { + redis_client: redis::Client, + namespace: String, +} + +pub struct ClientBuilder { + /// Redis URL for connection + redis_url: String, + /// Namespace for queue keys + namespace: String, +} + +impl ClientBuilder { + /// Create a new client builder + pub fn new() -> Self { + Self { + redis_url: "redis://localhost:6379".to_string(), + namespace: "".to_string(), + } + } + + /// Set the Redis URL + pub fn redis_url>(mut self, url: S) -> Self { + self.redis_url = url.into(); + self + } + + /// Set the namespace for queue keys + pub fn namespace>(mut self, namespace: S) -> Self { + self.namespace = namespace.into(); + self + } + + /// Build the client + pub async fn build(self) -> Result { + // Create Redis client + let redis_client = redis::Client::open(self.redis_url.as_str()) + .map_err(|e| JobError::Redis(e))?; + + Ok(Client { + redis_client, + namespace: self.namespace, + }) + } +} + +impl Default for Client { + fn default() -> Self { + // Note: Default implementation creates an empty client + // Use Client::builder() for proper initialization + Self { + redis_client: redis::Client::open("redis://localhost:6379").unwrap(), + namespace: "".to_string(), + } + } +} + +impl Client { + /// Create a new client builder + pub fn builder() -> ClientBuilder { + ClientBuilder::new() + } + + /// List all job IDs from Redis + pub async fn list_jobs(&self) -> Result, JobError> { + let mut conn = self.redis_client + .get_multiplexed_async_connection() + .await + .map_err(|e| JobError::Redis(e))?; + + let keys: Vec = conn.keys(format!("{}:*", &self.jobs_key())).await + .map_err(|e| JobError::Redis(e))?; + let job_ids: Vec = keys + .into_iter() + .filter_map(|key| { + if key.starts_with(&format!("{}:", self.jobs_key())) { + key.strip_prefix(&format!("{}:", self.jobs_key())) + .map(|s| s.to_string()) + } else { + None + } + }) + .collect(); + + Ok(job_ids) + } + + fn jobs_key(&self) -> String { + if self.namespace.is_empty() { + format!("job") + } else { + format!("{}:job", self.namespace) + } + } + + pub fn job_key(&self, job_id: &str) -> String { + if self.namespace.is_empty() { + format!("job:{}", job_id) + } else { + format!("{}:job:{}", self.namespace, job_id) + } + } + + pub fn job_reply_key(&self, job_id: &str) -> String { + if self.namespace.is_empty() { + format!("reply:{}", job_id) + } else { + format!("{}:reply:{}", self.namespace, job_id) + } + } + + pub fn runner_key(&self, runner_name: &str) -> String { + if self.namespace.is_empty() { + format!("runner:{}", runner_name) + } else { + format!("{}:runner:{}", self.namespace, runner_name) + } + } + + /// Set job error in Redis + pub async fn set_error(&self, + job_id: &str, + error: &str, + ) -> Result<(), JobError> { + let job_key = self.job_key(job_id); + let now = Utc::now(); + + let mut conn = self.redis_client + .get_multiplexed_async_connection() + .await + .map_err(|e| JobError::Redis(e))?; + + let _: () = conn.hset_multiple(&job_key, &[ + ("error", error), + ("status", JobStatus::Error.as_str()), + ("updated_at", &now.to_rfc3339()), + ]).await + .map_err(|e| JobError::Redis(e))?; + + Ok(()) + } + + /// Set job status in Redis + pub async fn set_job_status(&self, + job_id: &str, + status: JobStatus, + ) -> Result<(), JobError> { + let job_key = self.job_key(job_id); + let now = Utc::now(); + + let mut conn = self.redis_client + .get_multiplexed_async_connection() + .await + .map_err(|e| JobError::Redis(e))?; + + let _: () = conn.hset_multiple(&job_key, &[ + ("status", status.as_str()), + ("updated_at", &now.to_rfc3339()), + ]).await + .map_err(|e| JobError::Redis(e))?; + Ok(()) + } + + /// Get job status from Redis + pub async fn get_status( + &self, + job_id: &str, + ) -> Result { + let mut conn = self.redis_client + .get_multiplexed_async_connection() + .await + .map_err(|e| JobError::Redis(e))?; + + let status_str: Option = conn.hget(&self.job_key(job_id), "status").await + .map_err(|e| JobError::Redis(e))?; + + match status_str { + Some(s) => JobStatus::from_str(&s).ok_or_else(|| JobError::InvalidStatus(s)), + None => Err(JobError::NotFound(job_id.to_string())), + } + } + + /// Delete job from Redis + pub async fn delete_from_redis( + &self, + job_id: &str, + ) -> Result<(), JobError> { + let mut conn = self.redis_client + .get_multiplexed_async_connection() + .await + .map_err(|e| JobError::Redis(e))?; + + let job_key = self.job_key(job_id); + let _: () = conn.del(&job_key).await + .map_err(|e| JobError::Redis(e))?; + Ok(()) + } + + /// Store this job in Redis + pub async fn store_job_in_redis(&self, job: &Job) -> Result<(), JobError> { + let mut conn = self.redis_client + .get_multiplexed_async_connection() + .await + .map_err(|e| JobError::Redis(e))?; + + let job_key = self.job_key(&job.id); + + // Serialize the job data + let job_data = serde_json::to_string(job) + .map_err(|e| JobError::Serialization(e))?; + + // Store job data in Redis hash + let _: () = conn.hset_multiple(&job_key, &[ + ("data", job_data), + ("status", JobStatus::Dispatched.as_str().to_string()), + ("created_at", job.created_at.to_rfc3339()), + ("updated_at", job.updated_at.to_rfc3339()), + ]).await + .map_err(|e| JobError::Redis(e))?; + + // Set TTL for the job (24 hours) + let _: () = conn.expire(&job_key, 86400).await + .map_err(|e| JobError::Redis(e))?; + + Ok(()) + } + + /// Load a job from Redis by ID + pub async fn load_job_from_redis( + &self, + job_id: &str, + ) -> Result { + let job_key = self.job_key(job_id); + + let mut conn = self.redis_client + .get_multiplexed_async_connection() + .await + .map_err(|e| JobError::Redis(e))?; + + // Get job data from Redis + let job_data: Option = conn.hget(&job_key, "data").await + .map_err(|e| JobError::Redis(e))?; + + match job_data { + Some(data) => { + let job: Job = serde_json::from_str(&data) + .map_err(|e| JobError::Serialization(e))?; + Ok(job) + } + None => Err(JobError::NotFound(job_id.to_string())), + } + } + + /// Delete a job by ID + pub async fn delete_job(&mut self, job_id: &str) -> Result<(), JobError> { + let mut conn = self.redis_client.get_multiplexed_async_connection().await + .map_err(|e| JobError::Redis(e))?; + + let job_key = self.job_key(job_id); + let deleted_count: i32 = conn.del(&job_key).await + .map_err(|e| JobError::Redis(e))?; + + if deleted_count == 0 { + return Err(JobError::NotFound(job_id.to_string())); + } + + Ok(()) + } + + /// Set job result in Redis + pub async fn set_result( + &self, + job_id: &str, + result: &str, + ) -> Result<(), JobError> { + let job_key = self.job_key(&job_id); + let now = Utc::now(); + let mut conn = self.redis_client + .get_multiplexed_async_connection() + .await + .map_err(|e| JobError::Redis(e))?; + let _: () = conn.hset_multiple(&job_key, &[ + ("result", result), + ("status", JobStatus::Finished.as_str()), + ("updated_at", &now.to_rfc3339()), + ]).await + .map_err(|e| JobError::Redis(e))?; + + Ok(()) + } + + /// Get job result from Redis + pub async fn get_result( + &self, + job_id: &str, + ) -> Result, JobError> { + let job_key = self.job_key(job_id); + let mut conn = self.redis_client + .get_multiplexed_async_connection() + .await + .map_err(|e| JobError::Redis(e))?; + let result: Option = conn.hget(&job_key, "result").await + .map_err(|e| JobError::Redis(e))?; + Ok(result) + } + + /// Get a job ID from the work queue (blocking pop) + pub async fn get_job_id(&self, queue_key: &str) -> Result, JobError> { + let mut conn = self.redis_client + .get_multiplexed_async_connection() + .await + .map_err(|e| JobError::Redis(e))?; + + // Use BRPOP with a short timeout to avoid blocking indefinitely + let result: Option<(String, String)> = conn.brpop(queue_key, 1.0).await + .map_err(|e| JobError::Redis(e))?; + + Ok(result.map(|(_, job_id)| job_id)) + } + + /// Get a job by ID (alias for load_job_from_redis) + pub async fn get_job(&self, job_id: &str) -> Result { + self.load_job_from_redis(job_id).await + } + + /// Dispatch a job to a runner's queue + pub async fn dispatch_job(&self, job_id: &str, runner_name: &str) -> Result<(), JobError> { + let mut conn = self.redis_client + .get_multiplexed_async_connection() + .await + .map_err(|e| JobError::Redis(e))?; + + let queue_key = self.runner_key(runner_name); + + // Push job ID to the runner's queue (LPUSH for FIFO with BRPOP) + let _: () = conn.lpush(&queue_key, job_id).await + .map_err(|e| JobError::Redis(e))?; + + Ok(()) + } +} diff --git a/src/engine/osis.rs b/src/engine/osis.rs index d407075..d82a2d3 100644 --- a/src/engine/osis.rs +++ b/src/engine/osis.rs @@ -79,7 +79,7 @@ fn register_object_functions(engine: &mut Engine) { /// /// ```rust /// use rhai::Engine; -/// use actor_system::engine::osis::register_dsl_modules; +/// use runner_rust::engine::osis::register_dsl_modules; /// /// let mut engine = Engine::new(); /// register_dsl_modules(&mut engine); diff --git a/src/job.rs b/src/job.rs new file mode 100644 index 0000000..fa551f3 --- /dev/null +++ b/src/job.rs @@ -0,0 +1,222 @@ +use chrono::{Utc}; +use redis::AsyncCommands; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use thiserror::Error; +use uuid::Uuid; +use log::{error}; + +pub use crate::client::Client; + +/// Job status enumeration +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum JobStatus { + Dispatched, + WaitingForPrerequisites, + Started, + Error, + Stopping, + Finished, +} + +impl JobStatus { + pub fn as_str(&self) -> &'static str { + match self { + JobStatus::Dispatched => "dispatched", + JobStatus::WaitingForPrerequisites => "waiting_for_prerequisites", + JobStatus::Started => "started", + JobStatus::Error => "error", + JobStatus::Stopping => "stopping", + JobStatus::Finished => "finished", + } + } + + pub fn from_str(s: &str) -> Option { + match s { + "dispatched" => Some(JobStatus::Dispatched), + "waiting_for_prerequisites" => Some(JobStatus::WaitingForPrerequisites), + "started" => Some(JobStatus::Started), + "error" => Some(JobStatus::Error), + "stopping" => Some(JobStatus::Stopping), + "finished" => Some(JobStatus::Finished), + _ => None, + } + } +} + +/// Representation of a script execution request. +/// +/// This structure contains all the information needed to execute a script +/// on a actor service, including the script content, dependencies, and metadata. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Job { + pub id: String, + pub caller_id: String, + pub context_id: String, + pub payload: String, + pub runner: String, // name of the runner to execute this job + pub executor: String, // name of the executor the runner will use to execute this job + pub timeout: u64, // timeout in seconds + pub env_vars: HashMap, // environment variables for script execution + pub created_at: chrono::DateTime, + pub updated_at: chrono::DateTime, +} + +/// Error types for job operations +#[derive(Error, Debug)] +pub enum JobError { + #[error("Redis error: {0}")] + Redis(#[from] redis::RedisError), + #[error("Serialization error: {0}")] + Serialization(#[from] serde_json::Error), + #[error("Job not found: {0}")] + NotFound(String), + #[error("Invalid job status: {0}")] + InvalidStatus(String), + #[error("Timeout error: {0}")] + Timeout(String), + #[error("Invalid job data: {0}")] + InvalidData(String), +} + +impl Job { + /// Create a new job with the given parameters + pub fn new( + caller_id: String, + context_id: String, + payload: String, + runner: String, + executor: String, + ) -> Self { + let now = Utc::now(); + Self { + id: Uuid::new_v4().to_string(), + caller_id, + context_id, + payload, + runner, + executor, + timeout: 300, // 5 minutes default + env_vars: HashMap::new(), + created_at: now, + updated_at: now, + } + } +} + +/// Builder for constructing job execution requests. +pub struct JobBuilder { + caller_id: String, + context_id: String, + payload: String, + runner: String, + executor: String, + timeout: u64, // timeout in seconds + env_vars: HashMap, +} + +impl JobBuilder { + pub fn new() -> Self { + Self { + caller_id: "".to_string(), + context_id: "".to_string(), + payload: "".to_string(), + runner: "".to_string(), + executor: "".to_string(), + timeout: 300, // 5 minutes default + env_vars: HashMap::new(), + } + } + + /// Set the caller ID for this job + pub fn caller_id(mut self, caller_id: &str) -> Self { + self.caller_id = caller_id.to_string(); + self + } + + /// Set the context ID for this job + pub fn context_id(mut self, context_id: &str) -> Self { + self.context_id = context_id.to_string(); + self + } + + /// Set the payload (script content) for this job + pub fn payload(mut self, payload: &str) -> Self { + self.payload = payload.to_string(); + self + } + + /// Set the runner name for this job + pub fn runner(mut self, runner: &str) -> Self { + self.runner = runner.to_string(); + self + } + + /// Set the executor for this job + pub fn executor(mut self, executor: &str) -> Self { + self.executor = executor.to_string(); + self + } + + /// Set the timeout for job execution (in seconds) + pub fn timeout(mut self, timeout: u64) -> Self { + self.timeout = timeout; + self + } + + /// Set a single environment variable + pub fn env_var(mut self, key: &str, value: &str) -> Self { + self.env_vars.insert(key.to_string(), value.to_string()); + self + } + + /// Set multiple environment variables from a HashMap + pub fn env_vars(mut self, env_vars: HashMap) -> Self { + self.env_vars = env_vars; + self + } + + /// Clear all environment variables + pub fn clear_env_vars(mut self) -> Self { + self.env_vars.clear(); + self + } + + /// Build the job + pub fn build(self) -> Result { + if self.caller_id.is_empty() { + return Err(JobError::InvalidData("caller_id is required".to_string())); + } + if self.context_id.is_empty() { + return Err(JobError::InvalidData("context_id is required".to_string())); + } + if self.payload.is_empty() { + return Err(JobError::InvalidData("payload is required".to_string())); + } + if self.runner.is_empty() { + return Err(JobError::InvalidData("runner is required".to_string())); + } + if self.executor.is_empty() { + return Err(JobError::InvalidData("executor is required".to_string())); + } + + let mut job = Job::new( + self.caller_id, + self.context_id, + self.payload, + self.runner, + self.executor, + ); + + job.timeout = self.timeout; + job.env_vars = self.env_vars; + + Ok(job) + } +} + +impl Default for JobBuilder { + fn default() -> Self { + Self::new() + } +} diff --git a/src/lib.rs b/src/lib.rs index 823100d..432975a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,22 +1,24 @@ // Core modules -pub mod runner_trait; +pub mod engine; pub mod async_runner; pub mod sync_runner; -pub mod engine; +pub mod runner_trait; +pub mod script_mode; +pub mod job; +pub mod client; // Public exports for convenience pub use runner_trait::{Runner, RunnerConfig, spawn_runner}; pub use async_runner::{AsyncRunner, spawn_async_runner}; -pub use sync_runner::{SyncRunner, spawn_sync_runner}; +pub use sync_runner::{SyncRunner, SyncRunnerConfig, spawn_sync_runner}; pub use engine::system::{register_sal_modules, create_system_engine}; pub use engine::osis::{register_dsl_modules, create_osis_engine, create_shared_osis_engine}; -// Re-export job types from hero-job crate -pub use hero_job::{Job, JobStatus, JobError, JobBuilder}; +// Re-export job types from local job module +pub use job::{Job, JobStatus, JobError, JobBuilder, Client}; pub use redis::AsyncCommands; use log::{debug, error, info}; -const NAMESPACE_PREFIX: &str = "hero:job:"; const BLPOP_TIMEOUT_SECONDS: usize = 5; /// Initialize Redis connection for the runner @@ -40,43 +42,42 @@ pub async fn initialize_redis_connection( Ok(redis_conn) } -/// Load job from Redis using the supervisor's Job API -pub async fn load_job_from_redis( - redis_conn: &mut redis::aio::MultiplexedConnection, - job_id: &str, - runner_id: &str, -) -> Result { - debug!("Runner '{}', Job {}: Loading job from Redis", runner_id, job_id); +// /// Load job from Redis using the supervisor's Job API +// pub async fn load_job_from_redis( +// redis_conn: &mut redis::aio::MultiplexedConnection, +// job_id: &str, +// runner_id: &str, +// ) -> Result { +// debug!("Runner '{}', Job {}: Loading job from Redis", runner_id, job_id); - // Load job data from Redis hash - let job_key = format!("hero:job:{}", job_id); - let job_data: std::collections::HashMap = redis_conn.hgetall(&job_key).await - .map_err(JobError::Redis)?; +// // Load job data from Redis hash +// let job_data: std::collections::HashMap = redis_conn.hgetall(&client.job_key(job_id)).await +// .map_err(JobError::Redis)?; - if job_data.is_empty() { - return Err(JobError::NotFound(job_id.to_string())); - } +// if job_data.is_empty() { +// return Err(JobError::NotFound(job_id.to_string())); +// } - // Parse job from hash data using the supervisor's Job struct - let job = Job { - id: job_id.to_string(), - caller_id: job_data.get("caller_id").unwrap_or(&"".to_string()).clone(), - context_id: job_data.get("context_id").unwrap_or(&"".to_string()).clone(), - payload: job_data.get("payload").unwrap_or(&"".to_string()).clone(), - runner: job_data.get("runner").unwrap_or(&"default".to_string()).clone(), - executor: job_data.get("executor").unwrap_or(&"rhai".to_string()).clone(), - timeout: job_data.get("timeout").and_then(|s| s.parse().ok()).unwrap_or(300), - env_vars: serde_json::from_str(job_data.get("env_vars").unwrap_or(&"{}".to_string())) - .map_err(JobError::Serialization)?, - created_at: job_data.get("created_at") - .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) - .map(|dt| dt.with_timezone(&chrono::Utc)) - .unwrap_or_else(chrono::Utc::now), - updated_at: job_data.get("updated_at") - .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) - .map(|dt| dt.with_timezone(&chrono::Utc)) - .unwrap_or_else(chrono::Utc::now), - }; +// // Parse job from hash data using the supervisor's Job struct +// let job = Job { +// id: job_id.to_string(), +// caller_id: job_data.get("caller_id").unwrap_or(&"".to_string()).clone(), +// context_id: job_data.get("context_id").unwrap_or(&"".to_string()).clone(), +// payload: job_data.get("payload").unwrap_or(&"".to_string()).clone(), +// runner: job_data.get("runner").unwrap_or(&"default".to_string()).clone(), +// executor: job_data.get("executor").unwrap_or(&"rhai".to_string()).clone(), +// timeout: job_data.get("timeout").and_then(|s| s.parse().ok()).unwrap_or(300), +// env_vars: serde_json::from_str(job_data.get("env_vars").unwrap_or(&"{}".to_string())) +// .map_err(JobError::Serialization)?, +// created_at: job_data.get("created_at") +// .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) +// .map(|dt| dt.with_timezone(&chrono::Utc)) +// .unwrap_or_else(chrono::Utc::now), +// updated_at: job_data.get("updated_at") +// .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) +// .map(|dt| dt.with_timezone(&chrono::Utc)) +// .unwrap_or_else(chrono::Utc::now), +// }; - Ok(job) -} \ No newline at end of file +// Ok(job) +// } \ No newline at end of file diff --git a/src/runner_trait.rs b/src/runner_trait.rs index f0be836..f813ef3 100644 --- a/src/runner_trait.rs +++ b/src/runner_trait.rs @@ -27,7 +27,7 @@ //! └───────────────┘ //! ``` -use hero_job::{Job, JobStatus}; +use crate::job::{Job, JobStatus, Client}; use log::{debug, error, info}; use redis::AsyncCommands; @@ -116,8 +116,15 @@ pub trait Runner: Send + Sync + 'static { tokio::spawn(async move { let runner_id = self.runner_id(); let redis_url = self.redis_url(); - // Canonical work queue based on runner_id - let queue_key = format!("runner_queue:{}", runner_id); + + // Create client to get the proper queue key + let client = Client::builder() + .redis_url(redis_url) + .build() + .await + .map_err(|e| format!("Failed to create client: {}", e))?; + + let queue_key = client.runner_key(runner_id); info!( "{} Runner '{}' starting. Connecting to Redis at {}. Listening on queue: {}", self.runner_type(), @@ -156,7 +163,7 @@ pub trait Runner: Send + Sync + 'static { self.runner_type(), runner_id, job_id, _queue_name_recv); // Load the job from Redis - match crate::load_job_from_redis(&mut redis_conn, &job_id, runner_id).await { + match client.load_job_from_redis(&job_id).await { Ok(job) => { // Check for ping job and handle it directly if job.payload.trim() == "ping" { @@ -164,18 +171,18 @@ pub trait Runner: Send + Sync + 'static { self.runner_type(), runner_id, job_id); // Update job status to started - if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Started).await { + if let Err(e) = client.set_job_status(&job_id, JobStatus::Started).await { error!("{} Runner '{}': Failed to update ping job '{}' status to Started: {}", self.runner_type(), runner_id, job_id, e); } // Set result to "pong" and mark as finished - if let Err(e) = Job::set_result(&mut redis_conn, &job_id, "pong").await { + if let Err(e) = client.set_result(&job_id, "pong").await { error!("{} Runner '{}': Failed to set ping job '{}' result: {}", self.runner_type(), runner_id, job_id, e); } - if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Finished).await { + if let Err(e) = client.set_job_status(&job_id, JobStatus::Finished).await { error!("{} Runner '{}': Failed to update ping job '{}' status to Finished: {}", self.runner_type(), runner_id, job_id, e); } @@ -184,7 +191,7 @@ pub trait Runner: Send + Sync + 'static { self.runner_type(), runner_id, job_id); } else { // Update job status to started - if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Started).await { + if let Err(e) = client.set_job_status(&job_id, JobStatus::Started).await { error!("{} Runner '{}': Failed to update job '{}' status to Started: {}", self.runner_type(), runner_id, job_id, e); } @@ -193,12 +200,12 @@ pub trait Runner: Send + Sync + 'static { match self.process_job(job) { Ok(result) => { // Set result and mark as finished - if let Err(e) = Job::set_result(&mut redis_conn, &job_id, &result).await { + if let Err(e) = client.set_result(&job_id, &result).await { error!("{} Runner '{}': Failed to set job '{}' result: {}", self.runner_type(), runner_id, job_id, e); } - if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Finished).await { + if let Err(e) = client.set_job_status(&job_id, JobStatus::Finished).await { error!("{} Runner '{}': Failed to update job '{}' status to Finished: {}", self.runner_type(), runner_id, job_id, e); } @@ -209,12 +216,12 @@ pub trait Runner: Send + Sync + 'static { self.runner_type(), runner_id, job_id, error_str); // Set error and mark as error - if let Err(e) = Job::set_error(&mut redis_conn, &job_id, &error_str).await { + if let Err(e) = client.set_error(&job_id, &error_str).await { error!("{} Runner '{}': Failed to set job '{}' error: {}", self.runner_type(), runner_id, job_id, e); } - if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Error).await { + if let Err(e) = client.set_job_status(&job_id, JobStatus::Error).await { error!("{} Runner '{}': Failed to update job '{}' status to Error: {}", self.runner_type(), runner_id, job_id, e); } @@ -261,7 +268,5 @@ pub fn spawn_runner( runner.spawn(shutdown_rx) } -/// Helper to derive queue name from runner_id -fn derive_queue_from_runner_id(runner_id: &str) -> String { - format!("runner_queue:{}", runner_id) -} + + diff --git a/src/script_mode.rs b/src/script_mode.rs new file mode 100644 index 0000000..f8d9813 --- /dev/null +++ b/src/script_mode.rs @@ -0,0 +1,169 @@ +use std::time::Duration; +use tokio::time::{timeout, sleep}; +use crate::job::{JobBuilder, Client, JobStatus}; +use log::{info, error}; +use tokio::sync::mpsc; +use std::sync::Arc; +use crate::async_runner::AsyncRunner; +use crate::runner_trait::{Runner, RunnerConfig}; + +/// Execute a script in single-job mode +/// Creates a job, submits it, waits for completion, and returns the result +pub async fn execute_script_mode( + script_content: &str, + runner_id: &str, + redis_url: String, + job_timeout: Duration, + engine_factory: F, +) -> Result> +where + F: Fn() -> rhai::Engine + Send + Sync + 'static, +{ + info!("Executing script in single-job mode"); + + // Create job client + let job_client = Client::builder() + .redis_url(&redis_url) + .build() + .await?; + + // Create the job using JobBuilder + let job = JobBuilder::new() + .caller_id("script_mode") + .context_id("single_job") + .payload(script_content) + .runner(runner_id) + .executor("rhai") + .timeout(job_timeout.as_secs()) + .build()?; + + let job_id = job.id.clone(); + info!("Created job with ID: {}", job_id); + + // Submit the job + job_client.store_job_in_redis(&job).await?; + info!("Job stored in Redis"); + + // Dispatch the job to the runner's queue + job_client.dispatch_job(&job_id, runner_id).await?; + info!("Job dispatched to runner queue: {}", runner_id); + + // Create and spawn a temporary runner to process the job + let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); + + let config = RunnerConfig { + runner_id: runner_id.to_string(), + db_path: "/tmp".to_string(), // Temporary path for script mode + redis_url: redis_url.clone(), + default_timeout: Some(job_timeout), + }; + + let runner = Arc::new( + AsyncRunner::builder() + .runner_id(&config.runner_id) + .db_path(&config.db_path) + .redis_url(&config.redis_url) + .default_timeout(config.default_timeout.unwrap_or(job_timeout)) + .engine_factory(engine_factory) + .build() + .map_err(|e| format!("Failed to build runner: {}", e))? + ); + let runner_handle = runner.spawn(shutdown_rx); + + info!("Temporary runner spawned for job processing"); + + // Wait for job completion with timeout + let result = timeout(job_timeout, wait_for_job_completion(&job_client, &job_id)).await; + + // Shutdown the temporary runner + let _ = shutdown_tx.send(()).await; + let _ = runner_handle.await; + + match result { + Ok(job_result) => { + match job_result { + Ok(job_status) => { + match job_status { + JobStatus::Finished => { + info!("Job completed successfully"); + // Get the job result from Redis + match job_client.get_result(&job_id).await { + Ok(Some(result)) => Ok(result), + Ok(None) => Ok("Job completed with no result".to_string()), + Err(e) => { + error!("Failed to get job result: {}", e); + Ok("Job completed but result unavailable".to_string()) + } + } + } + JobStatus::Error => { + // Get the job error from Redis - for now just return a generic error + error!("Job failed with status: Error"); + return Err("Job execution failed".into()); + /*match job_client.get_job_error(&job_id).await { + Ok(Some(error_msg)) => { + error!("Job failed: {}", error_msg); + Err(format!("Job failed: {}", error_msg).into()) + } + Ok(None) => { + error!("Job failed with no error message"); + Err("Job failed with no error message".into()) + } + Err(e) => { + error!("Failed to get job error: {}", e); + Err("Job failed but error details unavailable".into()) + } + }*/ + } + _ => { + error!("Job ended in unexpected status: {:?}", job_status); + Err(format!("Job ended in unexpected status: {:?}", job_status).into()) + } + } + } + Err(e) => { + error!("Error waiting for job completion: {}", e); + Err(e) + } + } + } + Err(_) => { + error!("Job execution timed out after {:?}", job_timeout); + // Try to cancel the job + let _ = job_client.set_job_status(&job_id, JobStatus::Error).await; + Err("Job execution timed out".into()) + } + } +} + +/// Wait for job completion by polling Redis +async fn wait_for_job_completion( + job_client: &Client, + job_id: &str, +) -> Result> { + let poll_interval = Duration::from_millis(500); + + loop { + match job_client.get_status(job_id).await { + Ok(status) => { + match status { + JobStatus::Finished | JobStatus::Error => { + return Ok(status); + } + JobStatus::Dispatched | JobStatus::WaitingForPrerequisites | JobStatus::Started => { + // Continue polling + tokio::time::sleep(poll_interval).await; + } + JobStatus::Stopping => { + // Job is being stopped, wait a bit more + tokio::time::sleep(poll_interval).await; + } + } + } + Err(e) => { + error!("Error polling job status: {}", e); + tokio::time::sleep(poll_interval).await; + } + } + } +} diff --git a/src/sync_runner.rs b/src/sync_runner.rs index 5c3972d..7e732cf 100644 --- a/src/sync_runner.rs +++ b/src/sync_runner.rs @@ -1,4 +1,4 @@ -use hero_job::Job; +use crate::job::Job; use log::{debug, error, info}; use rhai::{Dynamic, Engine}; use std::sync::Arc; diff --git a/test_sync_runner.rs b/test_sync_runner.rs deleted file mode 100644 index 075d720..0000000 --- a/test_sync_runner.rs +++ /dev/null @@ -1,47 +0,0 @@ -use actor_system::spawn_sync_runner; -use actor_system::engine::osis::create_osis_engine; -use tokio::sync::mpsc; -use tokio::time::{sleep, Duration}; - -#[tokio::main] -async fn main() -> Result<(), Box> { - // Initialize logging - env_logger::init(); - - // Create shutdown channel - let (shutdown_tx, shutdown_rx) = mpsc::channel(1); - - println!("Starting sync runner test..."); - - // Spawn the sync runner with proper parameters - let runner_handle = spawn_sync_runner( - "test_runner".to_string(), - "/tmp/test_runner.db".to_string(), - "redis://localhost:6379".to_string(), - shutdown_rx, - false, // preserve_tasks - || create_osis_engine(), - ); - - // Let it run for a few seconds - println!("Sync runner started, letting it run for 5 seconds..."); - sleep(Duration::from_secs(5)).await; - - // Send shutdown signal - println!("Sending shutdown signal..."); - let _ = shutdown_tx.send(()).await; - - // Wait for the runner to finish - match runner_handle.await { - Ok(result) => { - match result { - Ok(_) => println!("Sync runner completed successfully!"), - Err(e) => println!("Sync runner error: {}", e), - } - } - Err(e) => println!("Join error: {}", e), - } - - println!("Test completed!"); - Ok(()) -} diff --git a/tests/e2e_tests.rs b/tests/e2e_tests.rs new file mode 100644 index 0000000..22ecb75 --- /dev/null +++ b/tests/e2e_tests.rs @@ -0,0 +1,130 @@ +use std::process::Command; +use std::time::Duration; +use tokio::time::timeout; + +/// Test the SAL runner in script mode with a simple ping script +#[tokio::test] +async fn test_sal_runner_script_mode_ping() { + let output = timeout( + Duration::from_secs(10), + run_sal_runner_script_mode("test_sal_ping") + ).await; + + match output { + Ok(result) => { + assert!(result.is_ok(), "SAL runner should execute successfully"); + let stdout = result.unwrap(); + assert!(stdout.contains("pong"), + "Output should contain 'pong' response: {}", stdout); + } + Err(_) => panic!("Test timed out after 10 seconds"), + } +} + +/// Test the OSIS runner in script mode with a simple ping script +#[tokio::test] +async fn test_osis_runner_script_mode_ping() { + let output = timeout( + Duration::from_secs(10), + run_osis_runner_script_mode("test_osis_ping") + ).await; + + match output { + Ok(result) => { + assert!(result.is_ok(), "OSIS runner should execute successfully"); + let stdout = result.unwrap(); + assert!(stdout.contains("pong"), + "Output should contain 'pong' response: {}", stdout); + } + Err(_) => panic!("Test timed out after 10 seconds"), + } +} + +/// Helper function to run SAL runner in script mode +async fn run_sal_runner_script_mode( + runner_id: &str +) -> Result> { + let output = Command::new("cargo") + .args(&[ + "run", "--bin", "runner_sal", "--", + runner_id, + "-s", "ping" + ]) + .output()?; + + if output.status.success() { + Ok(String::from_utf8_lossy(&output.stdout).to_string()) + } else { + let stderr = String::from_utf8_lossy(&output.stderr); + Err(format!("Command failed: {}", stderr).into()) + } +} + +/// Helper function to run OSIS runner in script mode +async fn run_osis_runner_script_mode( + runner_id: &str +) -> Result> { + let output = Command::new("cargo") + .args(&[ + "run", "--bin", "runner_osis", "--", + runner_id, + "-s", "ping" + ]) + .output()?; + + if output.status.success() { + Ok(String::from_utf8_lossy(&output.stdout).to_string()) + } else { + let stderr = String::from_utf8_lossy(&output.stderr); + Err(format!("Command failed: {}", stderr).into()) + } +} + +/// Test basic compilation and help output +#[tokio::test] +async fn test_sal_runner_help() { + let output = Command::new("cargo") + .args(&["run", "--bin", "runner_sal", "--", "--help"]) + .output() + .expect("Failed to execute command"); + + assert!(output.status.success(), "Help command should succeed"); + let stdout = String::from_utf8_lossy(&output.stdout); + assert!(stdout.contains("Usage") || stdout.contains("USAGE"), + "Help output should contain usage information"); +} + +/// Test basic compilation and help output for OSIS runner +#[tokio::test] +async fn test_osis_runner_help() { + let output = Command::new("cargo") + .args(&["run", "--bin", "runner_osis", "--", "--help"]) + .output() + .expect("Failed to execute command"); + + assert!(output.status.success(), "Help command should succeed"); + let stdout = String::from_utf8_lossy(&output.stdout); + assert!(stdout.contains("Usage") || stdout.contains("USAGE"), + "Help output should contain usage information"); +} + +/// Test library functionality - job creation and basic operations +#[tokio::test] +async fn test_job_creation_and_serialization() { + use runner_rust::JobBuilder; + + let job = JobBuilder::new() + .caller_id("test_caller") + .context_id("test_context") + .payload("ping") + .runner("default") + .executor("rhai") + .build() + .expect("Job creation should succeed"); + + assert_eq!(job.caller_id, "test_caller"); + assert_eq!(job.context_id, "test_context"); + assert_eq!(job.payload, "ping"); + assert_eq!(job.runner, "default"); + assert_eq!(job.executor, "rhai"); +}