add tests and fix job impl
This commit is contained in:
98
Cargo.lock
generated
98
Cargo.lock
generated
@@ -2,49 +2,6 @@
|
|||||||
# It is not intended for manual editing.
|
# It is not intended for manual editing.
|
||||||
version = 4
|
version = 4
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "actor_system"
|
|
||||||
version = "0.1.0"
|
|
||||||
dependencies = [
|
|
||||||
"anyhow",
|
|
||||||
"async-trait",
|
|
||||||
"chrono",
|
|
||||||
"clap",
|
|
||||||
"crossterm",
|
|
||||||
"env_logger",
|
|
||||||
"hero-job",
|
|
||||||
"hero_logger",
|
|
||||||
"heromodels",
|
|
||||||
"heromodels-derive",
|
|
||||||
"heromodels_core",
|
|
||||||
"log",
|
|
||||||
"ratatui",
|
|
||||||
"redis 0.25.4",
|
|
||||||
"rhai",
|
|
||||||
"rhailib_dsl",
|
|
||||||
"sal-git",
|
|
||||||
"sal-hetzner",
|
|
||||||
"sal-kubernetes",
|
|
||||||
"sal-mycelium",
|
|
||||||
"sal-net",
|
|
||||||
"sal-os",
|
|
||||||
"sal-postgresclient",
|
|
||||||
"sal-process",
|
|
||||||
"sal-redisclient",
|
|
||||||
"sal-service-manager",
|
|
||||||
"sal-text",
|
|
||||||
"sal-vault",
|
|
||||||
"sal-virt",
|
|
||||||
"sal-zinit-client",
|
|
||||||
"serde",
|
|
||||||
"serde_json",
|
|
||||||
"thiserror 1.0.69",
|
|
||||||
"tokio",
|
|
||||||
"toml",
|
|
||||||
"tracing",
|
|
||||||
"uuid",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "addr2line"
|
name = "addr2line"
|
||||||
version = "0.24.2"
|
version = "0.24.2"
|
||||||
@@ -1304,19 +1261,6 @@ version = "0.5.2"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c"
|
checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "hero-job"
|
|
||||||
version = "0.1.0"
|
|
||||||
dependencies = [
|
|
||||||
"chrono",
|
|
||||||
"log",
|
|
||||||
"redis 0.25.4",
|
|
||||||
"serde",
|
|
||||||
"serde_json",
|
|
||||||
"thiserror 1.0.69",
|
|
||||||
"uuid",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hero_logger"
|
name = "hero_logger"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
@@ -3195,6 +3139,48 @@ dependencies = [
|
|||||||
"windows-sys 0.52.0",
|
"windows-sys 0.52.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "runner_rust"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"anyhow",
|
||||||
|
"async-trait",
|
||||||
|
"chrono",
|
||||||
|
"clap",
|
||||||
|
"crossterm",
|
||||||
|
"env_logger",
|
||||||
|
"hero_logger",
|
||||||
|
"heromodels",
|
||||||
|
"heromodels-derive",
|
||||||
|
"heromodels_core",
|
||||||
|
"log",
|
||||||
|
"ratatui",
|
||||||
|
"redis 0.25.4",
|
||||||
|
"rhai",
|
||||||
|
"rhailib_dsl",
|
||||||
|
"sal-git",
|
||||||
|
"sal-hetzner",
|
||||||
|
"sal-kubernetes",
|
||||||
|
"sal-mycelium",
|
||||||
|
"sal-net",
|
||||||
|
"sal-os",
|
||||||
|
"sal-postgresclient",
|
||||||
|
"sal-process",
|
||||||
|
"sal-redisclient",
|
||||||
|
"sal-service-manager",
|
||||||
|
"sal-text",
|
||||||
|
"sal-vault",
|
||||||
|
"sal-virt",
|
||||||
|
"sal-zinit-client",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"thiserror 1.0.69",
|
||||||
|
"tokio",
|
||||||
|
"toml",
|
||||||
|
"tracing",
|
||||||
|
"uuid",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rust_decimal"
|
name = "rust_decimal"
|
||||||
version = "1.37.2"
|
version = "1.37.2"
|
||||||
|
@@ -1,10 +1,10 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "actor_system"
|
name = "runner_rust"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[lib]
|
[lib]
|
||||||
name = "actor_system" # Can be different from package name, or same
|
name = "runner_rust" # Can be different from package name, or same
|
||||||
path = "src/lib.rs"
|
path = "src/lib.rs"
|
||||||
|
|
||||||
[[bin]]
|
[[bin]]
|
||||||
@@ -23,9 +23,6 @@ path = "test_sync_runner.rs"
|
|||||||
name = "engine"
|
name = "engine"
|
||||||
path = "examples/engine.rs"
|
path = "examples/engine.rs"
|
||||||
|
|
||||||
[[example]]
|
|
||||||
name = "actor"
|
|
||||||
path = "examples/actor.rs"
|
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
@@ -45,8 +42,6 @@ toml = "0.8"
|
|||||||
thiserror = "1.0"
|
thiserror = "1.0"
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
# Core hero dependencies
|
# Core hero dependencies
|
||||||
hero-job = { path = "../job" }
|
|
||||||
#hero-job = { git = "https://git.ourworld.tf/herocode/job.git" }
|
|
||||||
heromodels = { git = "https://git.ourworld.tf/herocode/db.git" }
|
heromodels = { git = "https://git.ourworld.tf/herocode/db.git" }
|
||||||
heromodels_core = { git = "https://git.ourworld.tf/herocode/db.git" }
|
heromodels_core = { git = "https://git.ourworld.tf/herocode/db.git" }
|
||||||
heromodels-derive = { git = "https://git.ourworld.tf/herocode/db.git" }
|
heromodels-derive = { git = "https://git.ourworld.tf/herocode/db.git" }
|
||||||
|
60
cmd/actor.rs
60
cmd/actor.rs
@@ -1,60 +0,0 @@
|
|||||||
use actor_system::AsyncWorker;
|
|
||||||
use clap::Parser;
|
|
||||||
use log::info;
|
|
||||||
use std::sync::Arc;
|
|
||||||
use tokio::sync::mpsc;
|
|
||||||
|
|
||||||
#[derive(Parser, Debug)]
|
|
||||||
#[command(name = "actor_system")]
|
|
||||||
#[command(about = "System Actor - Asynchronous job processing actor")]
|
|
||||||
struct Args {
|
|
||||||
/// Database path
|
|
||||||
#[arg(short, long, default_value = "/tmp/system_db")]
|
|
||||||
db_path: String,
|
|
||||||
|
|
||||||
/// Redis URL
|
|
||||||
#[arg(short, long, default_value = "redis://localhost:6379")]
|
|
||||||
redis_url: String,
|
|
||||||
|
|
||||||
/// Preserve completed tasks in Redis
|
|
||||||
#[arg(short, long)]
|
|
||||||
preserve_tasks: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|
||||||
env_logger::init();
|
|
||||||
|
|
||||||
let args = Args::parse();
|
|
||||||
|
|
||||||
info!("Starting System Actor");
|
|
||||||
|
|
||||||
// Create shutdown channel
|
|
||||||
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
|
|
||||||
|
|
||||||
// Setup signal handler for graceful shutdown
|
|
||||||
let shutdown_tx_clone = shutdown_tx.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
tokio::signal::ctrl_c().await.expect("Failed to listen for Ctrl+C");
|
|
||||||
info!("Received Ctrl+C, initiating shutdown...");
|
|
||||||
let _ = shutdown_tx_clone.send(()).await;
|
|
||||||
});
|
|
||||||
|
|
||||||
// Create and start the actor
|
|
||||||
let actor = Arc::new(
|
|
||||||
AsyncWorker::builder()
|
|
||||||
.db_path(args.db_path)
|
|
||||||
.redis_url(args.redis_url)
|
|
||||||
.build()?
|
|
||||||
);
|
|
||||||
|
|
||||||
let handle = baobab_actor::spawn_actor(actor, shutdown_rx);
|
|
||||||
|
|
||||||
info!("System Actor started, waiting for jobs...");
|
|
||||||
|
|
||||||
// Wait for the actor to complete
|
|
||||||
handle.await??;
|
|
||||||
|
|
||||||
info!("System Actor shutdown complete");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
@@ -1,71 +0,0 @@
|
|||||||
use clap::Parser;
|
|
||||||
use log::info;
|
|
||||||
use std::time::Duration;
|
|
||||||
use tokio::sync::mpsc;
|
|
||||||
|
|
||||||
use actor_system::{spawn_async_actor};
|
|
||||||
|
|
||||||
#[derive(Parser, Debug)]
|
|
||||||
#[command(name = "async_actor")]
|
|
||||||
#[command(about = "Async Actor - processes jobs concurrently with SAL modules")]
|
|
||||||
struct Args {
|
|
||||||
/// Actor ID for this instance
|
|
||||||
#[arg(short, long)]
|
|
||||||
actor_id: String,
|
|
||||||
|
|
||||||
/// Database path
|
|
||||||
#[arg(short, long, default_value = "/tmp/actor_db")]
|
|
||||||
db_path: String,
|
|
||||||
|
|
||||||
/// Redis URL
|
|
||||||
#[arg(short, long, default_value = "redis://localhost:6379")]
|
|
||||||
redis_url: String,
|
|
||||||
|
|
||||||
/// Default timeout in seconds for job execution
|
|
||||||
#[arg(short, long, default_value = "300")]
|
|
||||||
timeout: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|
||||||
env_logger::init();
|
|
||||||
|
|
||||||
let args = Args::parse();
|
|
||||||
|
|
||||||
info!("Starting Async Actor with ID: {}", args.actor_id);
|
|
||||||
info!("Database path: {}", args.db_path);
|
|
||||||
info!("Redis URL: {}", args.redis_url);
|
|
||||||
info!("Default timeout: {}s", args.timeout);
|
|
||||||
|
|
||||||
// Create shutdown channel
|
|
||||||
let (_shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
|
|
||||||
|
|
||||||
// Spawn the async actor
|
|
||||||
let handle = spawn_async_actor(
|
|
||||||
args.actor_id,
|
|
||||||
args.db_path,
|
|
||||||
args.redis_url,
|
|
||||||
shutdown_rx,
|
|
||||||
Duration::from_secs(args.timeout),
|
|
||||||
);
|
|
||||||
|
|
||||||
// Wait for the actor to complete
|
|
||||||
match handle.await {
|
|
||||||
Ok(result) => {
|
|
||||||
match result {
|
|
||||||
Ok(()) => {
|
|
||||||
info!("Async Actor completed successfully");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
eprintln!("Async Actor error: {}", e);
|
|
||||||
Err(e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
eprintln!("Failed to join async actor task: {}", e);
|
|
||||||
Err(Box::new(e))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@@ -1,70 +0,0 @@
|
|||||||
use clap::Parser;
|
|
||||||
use log::info;
|
|
||||||
use tokio::sync::mpsc;
|
|
||||||
|
|
||||||
use actor_system::{spawn_sync_actor};
|
|
||||||
|
|
||||||
#[derive(Parser, Debug)]
|
|
||||||
#[command(name = "sync_actor")]
|
|
||||||
#[command(about = "Sync Actor - processes jobs sequentially with DSL modules")]
|
|
||||||
struct Args {
|
|
||||||
/// Actor ID for this instance
|
|
||||||
#[arg(short, long)]
|
|
||||||
actor_id: String,
|
|
||||||
|
|
||||||
/// Database path
|
|
||||||
#[arg(short, long, default_value = "/tmp/actor_db")]
|
|
||||||
db_path: String,
|
|
||||||
|
|
||||||
/// Redis URL
|
|
||||||
#[arg(short, long, default_value = "redis://localhost:6379")]
|
|
||||||
redis_url: String,
|
|
||||||
|
|
||||||
/// Preserve completed tasks in Redis (don't delete them)
|
|
||||||
#[arg(short, long, default_value = "false")]
|
|
||||||
preserve_tasks: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|
||||||
env_logger::init();
|
|
||||||
|
|
||||||
let args = Args::parse();
|
|
||||||
|
|
||||||
info!("Starting Sync Actor with ID: {}", args.actor_id);
|
|
||||||
info!("Database path: {}", args.db_path);
|
|
||||||
info!("Redis URL: {}", args.redis_url);
|
|
||||||
info!("Preserve tasks: {}", args.preserve_tasks);
|
|
||||||
|
|
||||||
// Create shutdown channel
|
|
||||||
let (_shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
|
|
||||||
|
|
||||||
// Spawn the sync actor
|
|
||||||
let handle = spawn_sync_actor(
|
|
||||||
args.actor_id,
|
|
||||||
args.db_path,
|
|
||||||
args.redis_url,
|
|
||||||
shutdown_rx,
|
|
||||||
args.preserve_tasks,
|
|
||||||
);
|
|
||||||
|
|
||||||
// Wait for the actor to complete
|
|
||||||
match handle.await {
|
|
||||||
Ok(result) => {
|
|
||||||
match result {
|
|
||||||
Ok(()) => {
|
|
||||||
info!("Sync Actor completed successfully");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
eprintln!("Sync Actor error: {}", e);
|
|
||||||
Err(e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
eprintln!("Failed to join sync actor task: {}", e);
|
|
||||||
Err(Box::new(e))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@@ -46,7 +46,7 @@ fn create_app(args: &Args) -> Result<App> {
|
|||||||
.unwrap_or_else(|_| ".".to_string());
|
.unwrap_or_else(|_| ".".to_string());
|
||||||
let crate_root = PathBuf::from(crate_root);
|
let crate_root = PathBuf::from(crate_root);
|
||||||
|
|
||||||
let actor_path = crate_root.join("target/debug/actor_system");
|
let actor_path = crate_root.join("target/debug/runner_rust");
|
||||||
let example_dir = Some(crate_root.join("examples/scripts"));
|
let example_dir = Some(crate_root.join("examples/scripts"));
|
||||||
|
|
||||||
let mut app = App::new(
|
let mut app = App::new(
|
||||||
@@ -68,7 +68,7 @@ fn spawn_actor_process(_args: &Args) -> Result<Child> {
|
|||||||
// Get the crate root directory
|
// Get the crate root directory
|
||||||
let crate_root = std::env::var("CARGO_MANIFEST_DIR")
|
let crate_root = std::env::var("CARGO_MANIFEST_DIR")
|
||||||
.unwrap_or_else(|_| ".".to_string());
|
.unwrap_or_else(|_| ".".to_string());
|
||||||
let actor_path = PathBuf::from(crate_root).join("target/debug/actor_system");
|
let actor_path = PathBuf::from(crate_root).join("target/debug/runner_rust");
|
||||||
info!("🎬 Spawning actor process: {}", actor_path.display());
|
info!("🎬 Spawning actor process: {}", actor_path.display());
|
||||||
|
|
||||||
let mut cmd = Command::new(&actor_path);
|
let mut cmd = Command::new(&actor_path);
|
||||||
@@ -123,7 +123,7 @@ async fn main() -> Result<()> {
|
|||||||
|
|
||||||
info!("🚀 Starting Baobab Actor TUI...");
|
info!("🚀 Starting Baobab Actor TUI...");
|
||||||
info!("Actor ID: sal (System Actor)");
|
info!("Actor ID: sal (System Actor)");
|
||||||
info!("Actor Path: {}/target/debug/actor_system", crate_root);
|
info!("Actor Path: {}/target/debug/runner_rust", crate_root);
|
||||||
info!("Redis URL: {}", args.redis_url);
|
info!("Redis URL: {}", args.redis_url);
|
||||||
info!("Script Type: SAL");
|
info!("Script Type: SAL");
|
||||||
info!("Example Directory: {}/examples/scripts", crate_root);
|
info!("Example Directory: {}/examples/scripts", crate_root);
|
||||||
|
@@ -4,7 +4,7 @@ use std::panic;
|
|||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use rhai::{Engine, Dynamic};
|
use rhai::{Engine, Dynamic};
|
||||||
|
|
||||||
use actor_system::AsyncWorker;
|
use runner_rust::AsyncRunner;
|
||||||
|
|
||||||
/// Recursively collect all .rhai files from a directory and its subdirectories
|
/// Recursively collect all .rhai files from a directory and its subdirectories
|
||||||
fn collect_rhai_files(dir: &Path, files: &mut Vec<std::path::PathBuf>) -> Result<(), Box<dyn std::error::Error>> {
|
fn collect_rhai_files(dir: &Path, files: &mut Vec<std::path::PathBuf>) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
@@ -113,7 +113,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Create a new engine instance and configure it with DSL modules
|
// Create a new engine instance and configure it with DSL modules
|
||||||
let mut engine_with_context = match create_configured_engine(db_path, i + 1, verbose) {
|
let engine_with_context = match create_configured_engine(db_path, i + 1, verbose) {
|
||||||
Ok(engine) => engine,
|
Ok(engine) => engine,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
println!("\x1b[31m✗\x1b[0m {} ... \x1b[31mFAILED\x1b[0m (engine setup: {})", script_name, e);
|
println!("\x1b[31m✗\x1b[0m {} ... \x1b[31mFAILED\x1b[0m (engine setup: {})", script_name, e);
|
||||||
@@ -185,7 +185,7 @@ fn create_configured_engine(db_path: &str, script_index: usize, verbose: bool) -
|
|||||||
let mut engine = Engine::new();
|
let mut engine = Engine::new();
|
||||||
|
|
||||||
// Register all DSL modules (same as OSIS engine configuration)
|
// Register all DSL modules (same as OSIS engine configuration)
|
||||||
actor_system::register_sal_modules(&mut engine);
|
runner_rust::register_sal_modules(&mut engine);
|
||||||
|
|
||||||
// Set up job context tags (similar to execute_job_with_engine)
|
// Set up job context tags (similar to execute_job_with_engine)
|
||||||
let mut db_config = rhai::Map::new();
|
let mut db_config = rhai::Map::new();
|
||||||
|
10
scripts/test.sh
Executable file
10
scripts/test.sh
Executable file
@@ -0,0 +1,10 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
set -e
|
||||||
|
|
||||||
|
SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
RUNNER_DIR=$(cd "$SCRIPT_DIR/.." && pwd)
|
||||||
|
|
||||||
|
pushd "$RUNNER_DIR"
|
||||||
|
cargo check --all
|
||||||
|
cargo test --all
|
||||||
|
popd
|
@@ -1,4 +1,4 @@
|
|||||||
use hero_job::Job;
|
use crate::job::Job;
|
||||||
use log::{debug, error, info};
|
use log::{debug, error, info};
|
||||||
use rhai::Engine;
|
use rhai::Engine;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
@@ -1,4 +1,4 @@
|
|||||||
use actor_system::spawn_sync_runner;
|
use runner_rust::{spawn_sync_runner, script_mode::execute_script_mode};
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
@@ -23,6 +23,10 @@ struct Args {
|
|||||||
/// Preserve tasks after completion
|
/// Preserve tasks after completion
|
||||||
#[arg(short, long, default_value_t = false)]
|
#[arg(short, long, default_value_t = false)]
|
||||||
preserve_tasks: bool,
|
preserve_tasks: bool,
|
||||||
|
|
||||||
|
/// Script to execute in single-job mode (optional)
|
||||||
|
#[arg(short, long)]
|
||||||
|
script: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
@@ -32,6 +36,30 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||||||
|
|
||||||
let args = Args::parse();
|
let args = Args::parse();
|
||||||
|
|
||||||
|
// Check if we're in script mode
|
||||||
|
if let Some(script_content) = args.script {
|
||||||
|
info!("Running in script mode with runner ID: {}", args.runner_id);
|
||||||
|
|
||||||
|
let result = execute_script_mode(
|
||||||
|
&script_content,
|
||||||
|
&args.runner_id,
|
||||||
|
args.redis_url,
|
||||||
|
std::time::Duration::from_secs(300), // Default timeout for OSIS
|
||||||
|
create_osis_engine,
|
||||||
|
).await;
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(output) => {
|
||||||
|
println!("Script execution result:\n{}", output);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Script execution failed: {}", e);
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
info!("Starting OSIS Sync Runner with ID: {}", args.runner_id);
|
info!("Starting OSIS Sync Runner with ID: {}", args.runner_id);
|
||||||
info!("Database path: {}", args.db_path);
|
info!("Database path: {}", args.db_path);
|
||||||
info!("Redis URL: {}", args.redis_url);
|
info!("Redis URL: {}", args.redis_url);
|
||||||
|
@@ -102,18 +102,18 @@ impl EngineFactory {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn register_sal_modules(engine: &mut Engine) {
|
pub fn register_sal_modules(engine: &mut Engine) {
|
||||||
sal_os::rhai::register_os_module(engine);
|
let _ = sal_os::rhai::register_os_module(engine);
|
||||||
sal_redisclient::rhai::register_redisclient_module(engine);
|
let _ = sal_redisclient::rhai::register_redisclient_module(engine);
|
||||||
sal_postgresclient::rhai::register_postgresclient_module(engine);
|
let _ = sal_postgresclient::rhai::register_postgresclient_module(engine);
|
||||||
sal_process::rhai::register_process_module(engine);
|
let _ = sal_process::rhai::register_process_module(engine);
|
||||||
sal_virt::rhai::register_virt_module(engine);
|
let _ = sal_virt::rhai::register_virt_module(engine);
|
||||||
sal_git::rhai::register_git_module(engine);
|
let _ = sal_git::rhai::register_git_module(engine);
|
||||||
sal_zinit_client::rhai::register_zinit_module(engine);
|
let _ = sal_zinit_client::rhai::register_zinit_module(engine);
|
||||||
sal_mycelium::rhai::register_mycelium_module(engine);
|
let _ = sal_mycelium::rhai::register_mycelium_module(engine);
|
||||||
sal_text::rhai::register_text_module(engine);
|
let _ = sal_text::rhai::register_text_module(engine);
|
||||||
sal_net::rhai::register_net_module(engine);
|
let _ = sal_net::rhai::register_net_module(engine);
|
||||||
sal_kubernetes::rhai::register_kubernetes_module(engine);
|
let _ = sal_kubernetes::rhai::register_kubernetes_module(engine);
|
||||||
sal_hetzner::rhai::register_hetzner_module(engine);
|
let _ = sal_hetzner::rhai::register_hetzner_module(engine);
|
||||||
|
|
||||||
println!("SAL modules registered successfully.");
|
println!("SAL modules registered successfully.");
|
||||||
}
|
}
|
||||||
|
@@ -1,7 +1,6 @@
|
|||||||
use actor_system::{spawn_async_runner, AsyncRunner};
|
use runner_rust::{spawn_async_runner, script_mode::execute_script_mode};
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
use std::sync::Arc;
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
@@ -25,6 +24,10 @@ struct Args {
|
|||||||
/// Default timeout for jobs in seconds
|
/// Default timeout for jobs in seconds
|
||||||
#[arg(short, long, default_value_t = 300)]
|
#[arg(short, long, default_value_t = 300)]
|
||||||
timeout: u64,
|
timeout: u64,
|
||||||
|
|
||||||
|
/// Script to execute in single-job mode (optional)
|
||||||
|
#[arg(short, long)]
|
||||||
|
script: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
@@ -34,6 +37,30 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||||||
|
|
||||||
let args = Args::parse();
|
let args = Args::parse();
|
||||||
|
|
||||||
|
// Check if we're in script mode
|
||||||
|
if let Some(script_content) = args.script {
|
||||||
|
info!("Running in script mode with runner ID: {}", args.runner_id);
|
||||||
|
|
||||||
|
let result = execute_script_mode(
|
||||||
|
&script_content,
|
||||||
|
&args.runner_id,
|
||||||
|
args.redis_url,
|
||||||
|
Duration::from_secs(args.timeout),
|
||||||
|
create_sal_engine,
|
||||||
|
).await;
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(output) => {
|
||||||
|
println!("Script execution result:\n{}", output);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Script execution failed: {}", e);
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
info!("Starting SAL Async Runner with ID: {}", args.runner_id);
|
info!("Starting SAL Async Runner with ID: {}", args.runner_id);
|
||||||
info!("Database path: {}", args.db_path);
|
info!("Database path: {}", args.db_path);
|
||||||
info!("Redis URL: {}", args.redis_url);
|
info!("Redis URL: {}", args.redis_url);
|
||||||
|
349
src/client.rs
Normal file
349
src/client.rs
Normal file
@@ -0,0 +1,349 @@
|
|||||||
|
//! Job client implementation for managing jobs in Redis
|
||||||
|
|
||||||
|
use chrono::Utc;
|
||||||
|
use redis::AsyncCommands;
|
||||||
|
use crate::job::{Job, JobStatus, JobError};
|
||||||
|
|
||||||
|
/// Client for managing jobs in Redis
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct Client {
|
||||||
|
redis_client: redis::Client,
|
||||||
|
namespace: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ClientBuilder {
|
||||||
|
/// Redis URL for connection
|
||||||
|
redis_url: String,
|
||||||
|
/// Namespace for queue keys
|
||||||
|
namespace: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ClientBuilder {
|
||||||
|
/// Create a new client builder
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
redis_url: "redis://localhost:6379".to_string(),
|
||||||
|
namespace: "".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the Redis URL
|
||||||
|
pub fn redis_url<S: Into<String>>(mut self, url: S) -> Self {
|
||||||
|
self.redis_url = url.into();
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the namespace for queue keys
|
||||||
|
pub fn namespace<S: Into<String>>(mut self, namespace: S) -> Self {
|
||||||
|
self.namespace = namespace.into();
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build the client
|
||||||
|
pub async fn build(self) -> Result<Client, JobError> {
|
||||||
|
// Create Redis client
|
||||||
|
let redis_client = redis::Client::open(self.redis_url.as_str())
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
|
||||||
|
Ok(Client {
|
||||||
|
redis_client,
|
||||||
|
namespace: self.namespace,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Client {
|
||||||
|
fn default() -> Self {
|
||||||
|
// Note: Default implementation creates an empty client
|
||||||
|
// Use Client::builder() for proper initialization
|
||||||
|
Self {
|
||||||
|
redis_client: redis::Client::open("redis://localhost:6379").unwrap(),
|
||||||
|
namespace: "".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Client {
|
||||||
|
/// Create a new client builder
|
||||||
|
pub fn builder() -> ClientBuilder {
|
||||||
|
ClientBuilder::new()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// List all job IDs from Redis
|
||||||
|
pub async fn list_jobs(&self) -> Result<Vec<String>, JobError> {
|
||||||
|
let mut conn = self.redis_client
|
||||||
|
.get_multiplexed_async_connection()
|
||||||
|
.await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
|
||||||
|
let keys: Vec<String> = conn.keys(format!("{}:*", &self.jobs_key())).await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
let job_ids: Vec<String> = keys
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|key| {
|
||||||
|
if key.starts_with(&format!("{}:", self.jobs_key())) {
|
||||||
|
key.strip_prefix(&format!("{}:", self.jobs_key()))
|
||||||
|
.map(|s| s.to_string())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
Ok(job_ids)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn jobs_key(&self) -> String {
|
||||||
|
if self.namespace.is_empty() {
|
||||||
|
format!("job")
|
||||||
|
} else {
|
||||||
|
format!("{}:job", self.namespace)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn job_key(&self, job_id: &str) -> String {
|
||||||
|
if self.namespace.is_empty() {
|
||||||
|
format!("job:{}", job_id)
|
||||||
|
} else {
|
||||||
|
format!("{}:job:{}", self.namespace, job_id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn job_reply_key(&self, job_id: &str) -> String {
|
||||||
|
if self.namespace.is_empty() {
|
||||||
|
format!("reply:{}", job_id)
|
||||||
|
} else {
|
||||||
|
format!("{}:reply:{}", self.namespace, job_id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn runner_key(&self, runner_name: &str) -> String {
|
||||||
|
if self.namespace.is_empty() {
|
||||||
|
format!("runner:{}", runner_name)
|
||||||
|
} else {
|
||||||
|
format!("{}:runner:{}", self.namespace, runner_name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set job error in Redis
|
||||||
|
pub async fn set_error(&self,
|
||||||
|
job_id: &str,
|
||||||
|
error: &str,
|
||||||
|
) -> Result<(), JobError> {
|
||||||
|
let job_key = self.job_key(job_id);
|
||||||
|
let now = Utc::now();
|
||||||
|
|
||||||
|
let mut conn = self.redis_client
|
||||||
|
.get_multiplexed_async_connection()
|
||||||
|
.await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
|
||||||
|
let _: () = conn.hset_multiple(&job_key, &[
|
||||||
|
("error", error),
|
||||||
|
("status", JobStatus::Error.as_str()),
|
||||||
|
("updated_at", &now.to_rfc3339()),
|
||||||
|
]).await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set job status in Redis
|
||||||
|
pub async fn set_job_status(&self,
|
||||||
|
job_id: &str,
|
||||||
|
status: JobStatus,
|
||||||
|
) -> Result<(), JobError> {
|
||||||
|
let job_key = self.job_key(job_id);
|
||||||
|
let now = Utc::now();
|
||||||
|
|
||||||
|
let mut conn = self.redis_client
|
||||||
|
.get_multiplexed_async_connection()
|
||||||
|
.await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
|
||||||
|
let _: () = conn.hset_multiple(&job_key, &[
|
||||||
|
("status", status.as_str()),
|
||||||
|
("updated_at", &now.to_rfc3339()),
|
||||||
|
]).await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get job status from Redis
|
||||||
|
pub async fn get_status(
|
||||||
|
&self,
|
||||||
|
job_id: &str,
|
||||||
|
) -> Result<JobStatus, JobError> {
|
||||||
|
let mut conn = self.redis_client
|
||||||
|
.get_multiplexed_async_connection()
|
||||||
|
.await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
|
||||||
|
let status_str: Option<String> = conn.hget(&self.job_key(job_id), "status").await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
|
||||||
|
match status_str {
|
||||||
|
Some(s) => JobStatus::from_str(&s).ok_or_else(|| JobError::InvalidStatus(s)),
|
||||||
|
None => Err(JobError::NotFound(job_id.to_string())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Delete job from Redis
|
||||||
|
pub async fn delete_from_redis(
|
||||||
|
&self,
|
||||||
|
job_id: &str,
|
||||||
|
) -> Result<(), JobError> {
|
||||||
|
let mut conn = self.redis_client
|
||||||
|
.get_multiplexed_async_connection()
|
||||||
|
.await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
|
||||||
|
let job_key = self.job_key(job_id);
|
||||||
|
let _: () = conn.del(&job_key).await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Store this job in Redis
|
||||||
|
pub async fn store_job_in_redis(&self, job: &Job) -> Result<(), JobError> {
|
||||||
|
let mut conn = self.redis_client
|
||||||
|
.get_multiplexed_async_connection()
|
||||||
|
.await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
|
||||||
|
let job_key = self.job_key(&job.id);
|
||||||
|
|
||||||
|
// Serialize the job data
|
||||||
|
let job_data = serde_json::to_string(job)
|
||||||
|
.map_err(|e| JobError::Serialization(e))?;
|
||||||
|
|
||||||
|
// Store job data in Redis hash
|
||||||
|
let _: () = conn.hset_multiple(&job_key, &[
|
||||||
|
("data", job_data),
|
||||||
|
("status", JobStatus::Dispatched.as_str().to_string()),
|
||||||
|
("created_at", job.created_at.to_rfc3339()),
|
||||||
|
("updated_at", job.updated_at.to_rfc3339()),
|
||||||
|
]).await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
|
||||||
|
// Set TTL for the job (24 hours)
|
||||||
|
let _: () = conn.expire(&job_key, 86400).await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Load a job from Redis by ID
|
||||||
|
pub async fn load_job_from_redis(
|
||||||
|
&self,
|
||||||
|
job_id: &str,
|
||||||
|
) -> Result<Job, JobError> {
|
||||||
|
let job_key = self.job_key(job_id);
|
||||||
|
|
||||||
|
let mut conn = self.redis_client
|
||||||
|
.get_multiplexed_async_connection()
|
||||||
|
.await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
|
||||||
|
// Get job data from Redis
|
||||||
|
let job_data: Option<String> = conn.hget(&job_key, "data").await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
|
||||||
|
match job_data {
|
||||||
|
Some(data) => {
|
||||||
|
let job: Job = serde_json::from_str(&data)
|
||||||
|
.map_err(|e| JobError::Serialization(e))?;
|
||||||
|
Ok(job)
|
||||||
|
}
|
||||||
|
None => Err(JobError::NotFound(job_id.to_string())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Delete a job by ID
|
||||||
|
pub async fn delete_job(&mut self, job_id: &str) -> Result<(), JobError> {
|
||||||
|
let mut conn = self.redis_client.get_multiplexed_async_connection().await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
|
||||||
|
let job_key = self.job_key(job_id);
|
||||||
|
let deleted_count: i32 = conn.del(&job_key).await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
|
||||||
|
if deleted_count == 0 {
|
||||||
|
return Err(JobError::NotFound(job_id.to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set job result in Redis
|
||||||
|
pub async fn set_result(
|
||||||
|
&self,
|
||||||
|
job_id: &str,
|
||||||
|
result: &str,
|
||||||
|
) -> Result<(), JobError> {
|
||||||
|
let job_key = self.job_key(&job_id);
|
||||||
|
let now = Utc::now();
|
||||||
|
let mut conn = self.redis_client
|
||||||
|
.get_multiplexed_async_connection()
|
||||||
|
.await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
let _: () = conn.hset_multiple(&job_key, &[
|
||||||
|
("result", result),
|
||||||
|
("status", JobStatus::Finished.as_str()),
|
||||||
|
("updated_at", &now.to_rfc3339()),
|
||||||
|
]).await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get job result from Redis
|
||||||
|
pub async fn get_result(
|
||||||
|
&self,
|
||||||
|
job_id: &str,
|
||||||
|
) -> Result<Option<String>, JobError> {
|
||||||
|
let job_key = self.job_key(job_id);
|
||||||
|
let mut conn = self.redis_client
|
||||||
|
.get_multiplexed_async_connection()
|
||||||
|
.await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
let result: Option<String> = conn.hget(&job_key, "result").await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get a job ID from the work queue (blocking pop)
|
||||||
|
pub async fn get_job_id(&self, queue_key: &str) -> Result<Option<String>, JobError> {
|
||||||
|
let mut conn = self.redis_client
|
||||||
|
.get_multiplexed_async_connection()
|
||||||
|
.await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
|
||||||
|
// Use BRPOP with a short timeout to avoid blocking indefinitely
|
||||||
|
let result: Option<(String, String)> = conn.brpop(queue_key, 1.0).await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
|
||||||
|
Ok(result.map(|(_, job_id)| job_id))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get a job by ID (alias for load_job_from_redis)
|
||||||
|
pub async fn get_job(&self, job_id: &str) -> Result<Job, JobError> {
|
||||||
|
self.load_job_from_redis(job_id).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Dispatch a job to a runner's queue
|
||||||
|
pub async fn dispatch_job(&self, job_id: &str, runner_name: &str) -> Result<(), JobError> {
|
||||||
|
let mut conn = self.redis_client
|
||||||
|
.get_multiplexed_async_connection()
|
||||||
|
.await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
|
||||||
|
let queue_key = self.runner_key(runner_name);
|
||||||
|
|
||||||
|
// Push job ID to the runner's queue (LPUSH for FIFO with BRPOP)
|
||||||
|
let _: () = conn.lpush(&queue_key, job_id).await
|
||||||
|
.map_err(|e| JobError::Redis(e))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
@@ -79,7 +79,7 @@ fn register_object_functions(engine: &mut Engine) {
|
|||||||
///
|
///
|
||||||
/// ```rust
|
/// ```rust
|
||||||
/// use rhai::Engine;
|
/// use rhai::Engine;
|
||||||
/// use actor_system::engine::osis::register_dsl_modules;
|
/// use runner_rust::engine::osis::register_dsl_modules;
|
||||||
///
|
///
|
||||||
/// let mut engine = Engine::new();
|
/// let mut engine = Engine::new();
|
||||||
/// register_dsl_modules(&mut engine);
|
/// register_dsl_modules(&mut engine);
|
||||||
|
222
src/job.rs
Normal file
222
src/job.rs
Normal file
@@ -0,0 +1,222 @@
|
|||||||
|
use chrono::{Utc};
|
||||||
|
use redis::AsyncCommands;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use thiserror::Error;
|
||||||
|
use uuid::Uuid;
|
||||||
|
use log::{error};
|
||||||
|
|
||||||
|
pub use crate::client::Client;
|
||||||
|
|
||||||
|
/// Job status enumeration
|
||||||
|
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||||
|
pub enum JobStatus {
|
||||||
|
Dispatched,
|
||||||
|
WaitingForPrerequisites,
|
||||||
|
Started,
|
||||||
|
Error,
|
||||||
|
Stopping,
|
||||||
|
Finished,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JobStatus {
|
||||||
|
pub fn as_str(&self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
JobStatus::Dispatched => "dispatched",
|
||||||
|
JobStatus::WaitingForPrerequisites => "waiting_for_prerequisites",
|
||||||
|
JobStatus::Started => "started",
|
||||||
|
JobStatus::Error => "error",
|
||||||
|
JobStatus::Stopping => "stopping",
|
||||||
|
JobStatus::Finished => "finished",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn from_str(s: &str) -> Option<Self> {
|
||||||
|
match s {
|
||||||
|
"dispatched" => Some(JobStatus::Dispatched),
|
||||||
|
"waiting_for_prerequisites" => Some(JobStatus::WaitingForPrerequisites),
|
||||||
|
"started" => Some(JobStatus::Started),
|
||||||
|
"error" => Some(JobStatus::Error),
|
||||||
|
"stopping" => Some(JobStatus::Stopping),
|
||||||
|
"finished" => Some(JobStatus::Finished),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Representation of a script execution request.
|
||||||
|
///
|
||||||
|
/// This structure contains all the information needed to execute a script
|
||||||
|
/// on a actor service, including the script content, dependencies, and metadata.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct Job {
|
||||||
|
pub id: String,
|
||||||
|
pub caller_id: String,
|
||||||
|
pub context_id: String,
|
||||||
|
pub payload: String,
|
||||||
|
pub runner: String, // name of the runner to execute this job
|
||||||
|
pub executor: String, // name of the executor the runner will use to execute this job
|
||||||
|
pub timeout: u64, // timeout in seconds
|
||||||
|
pub env_vars: HashMap<String, String>, // environment variables for script execution
|
||||||
|
pub created_at: chrono::DateTime<chrono::Utc>,
|
||||||
|
pub updated_at: chrono::DateTime<chrono::Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Error types for job operations
|
||||||
|
#[derive(Error, Debug)]
|
||||||
|
pub enum JobError {
|
||||||
|
#[error("Redis error: {0}")]
|
||||||
|
Redis(#[from] redis::RedisError),
|
||||||
|
#[error("Serialization error: {0}")]
|
||||||
|
Serialization(#[from] serde_json::Error),
|
||||||
|
#[error("Job not found: {0}")]
|
||||||
|
NotFound(String),
|
||||||
|
#[error("Invalid job status: {0}")]
|
||||||
|
InvalidStatus(String),
|
||||||
|
#[error("Timeout error: {0}")]
|
||||||
|
Timeout(String),
|
||||||
|
#[error("Invalid job data: {0}")]
|
||||||
|
InvalidData(String),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Job {
|
||||||
|
/// Create a new job with the given parameters
|
||||||
|
pub fn new(
|
||||||
|
caller_id: String,
|
||||||
|
context_id: String,
|
||||||
|
payload: String,
|
||||||
|
runner: String,
|
||||||
|
executor: String,
|
||||||
|
) -> Self {
|
||||||
|
let now = Utc::now();
|
||||||
|
Self {
|
||||||
|
id: Uuid::new_v4().to_string(),
|
||||||
|
caller_id,
|
||||||
|
context_id,
|
||||||
|
payload,
|
||||||
|
runner,
|
||||||
|
executor,
|
||||||
|
timeout: 300, // 5 minutes default
|
||||||
|
env_vars: HashMap::new(),
|
||||||
|
created_at: now,
|
||||||
|
updated_at: now,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builder for constructing job execution requests.
|
||||||
|
pub struct JobBuilder {
|
||||||
|
caller_id: String,
|
||||||
|
context_id: String,
|
||||||
|
payload: String,
|
||||||
|
runner: String,
|
||||||
|
executor: String,
|
||||||
|
timeout: u64, // timeout in seconds
|
||||||
|
env_vars: HashMap<String, String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JobBuilder {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
caller_id: "".to_string(),
|
||||||
|
context_id: "".to_string(),
|
||||||
|
payload: "".to_string(),
|
||||||
|
runner: "".to_string(),
|
||||||
|
executor: "".to_string(),
|
||||||
|
timeout: 300, // 5 minutes default
|
||||||
|
env_vars: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the caller ID for this job
|
||||||
|
pub fn caller_id(mut self, caller_id: &str) -> Self {
|
||||||
|
self.caller_id = caller_id.to_string();
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the context ID for this job
|
||||||
|
pub fn context_id(mut self, context_id: &str) -> Self {
|
||||||
|
self.context_id = context_id.to_string();
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the payload (script content) for this job
|
||||||
|
pub fn payload(mut self, payload: &str) -> Self {
|
||||||
|
self.payload = payload.to_string();
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the runner name for this job
|
||||||
|
pub fn runner(mut self, runner: &str) -> Self {
|
||||||
|
self.runner = runner.to_string();
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the executor for this job
|
||||||
|
pub fn executor(mut self, executor: &str) -> Self {
|
||||||
|
self.executor = executor.to_string();
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the timeout for job execution (in seconds)
|
||||||
|
pub fn timeout(mut self, timeout: u64) -> Self {
|
||||||
|
self.timeout = timeout;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set a single environment variable
|
||||||
|
pub fn env_var(mut self, key: &str, value: &str) -> Self {
|
||||||
|
self.env_vars.insert(key.to_string(), value.to_string());
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set multiple environment variables from a HashMap
|
||||||
|
pub fn env_vars(mut self, env_vars: HashMap<String, String>) -> Self {
|
||||||
|
self.env_vars = env_vars;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Clear all environment variables
|
||||||
|
pub fn clear_env_vars(mut self) -> Self {
|
||||||
|
self.env_vars.clear();
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build the job
|
||||||
|
pub fn build(self) -> Result<Job, JobError> {
|
||||||
|
if self.caller_id.is_empty() {
|
||||||
|
return Err(JobError::InvalidData("caller_id is required".to_string()));
|
||||||
|
}
|
||||||
|
if self.context_id.is_empty() {
|
||||||
|
return Err(JobError::InvalidData("context_id is required".to_string()));
|
||||||
|
}
|
||||||
|
if self.payload.is_empty() {
|
||||||
|
return Err(JobError::InvalidData("payload is required".to_string()));
|
||||||
|
}
|
||||||
|
if self.runner.is_empty() {
|
||||||
|
return Err(JobError::InvalidData("runner is required".to_string()));
|
||||||
|
}
|
||||||
|
if self.executor.is_empty() {
|
||||||
|
return Err(JobError::InvalidData("executor is required".to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut job = Job::new(
|
||||||
|
self.caller_id,
|
||||||
|
self.context_id,
|
||||||
|
self.payload,
|
||||||
|
self.runner,
|
||||||
|
self.executor,
|
||||||
|
);
|
||||||
|
|
||||||
|
job.timeout = self.timeout;
|
||||||
|
job.env_vars = self.env_vars;
|
||||||
|
|
||||||
|
Ok(job)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for JobBuilder {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
85
src/lib.rs
85
src/lib.rs
@@ -1,22 +1,24 @@
|
|||||||
// Core modules
|
// Core modules
|
||||||
pub mod runner_trait;
|
pub mod engine;
|
||||||
pub mod async_runner;
|
pub mod async_runner;
|
||||||
pub mod sync_runner;
|
pub mod sync_runner;
|
||||||
pub mod engine;
|
pub mod runner_trait;
|
||||||
|
pub mod script_mode;
|
||||||
|
pub mod job;
|
||||||
|
pub mod client;
|
||||||
|
|
||||||
// Public exports for convenience
|
// Public exports for convenience
|
||||||
pub use runner_trait::{Runner, RunnerConfig, spawn_runner};
|
pub use runner_trait::{Runner, RunnerConfig, spawn_runner};
|
||||||
pub use async_runner::{AsyncRunner, spawn_async_runner};
|
pub use async_runner::{AsyncRunner, spawn_async_runner};
|
||||||
pub use sync_runner::{SyncRunner, spawn_sync_runner};
|
pub use sync_runner::{SyncRunner, SyncRunnerConfig, spawn_sync_runner};
|
||||||
pub use engine::system::{register_sal_modules, create_system_engine};
|
pub use engine::system::{register_sal_modules, create_system_engine};
|
||||||
pub use engine::osis::{register_dsl_modules, create_osis_engine, create_shared_osis_engine};
|
pub use engine::osis::{register_dsl_modules, create_osis_engine, create_shared_osis_engine};
|
||||||
|
|
||||||
// Re-export job types from hero-job crate
|
// Re-export job types from local job module
|
||||||
pub use hero_job::{Job, JobStatus, JobError, JobBuilder};
|
pub use job::{Job, JobStatus, JobError, JobBuilder, Client};
|
||||||
pub use redis::AsyncCommands;
|
pub use redis::AsyncCommands;
|
||||||
use log::{debug, error, info};
|
use log::{debug, error, info};
|
||||||
|
|
||||||
const NAMESPACE_PREFIX: &str = "hero:job:";
|
|
||||||
const BLPOP_TIMEOUT_SECONDS: usize = 5;
|
const BLPOP_TIMEOUT_SECONDS: usize = 5;
|
||||||
|
|
||||||
/// Initialize Redis connection for the runner
|
/// Initialize Redis connection for the runner
|
||||||
@@ -40,43 +42,42 @@ pub async fn initialize_redis_connection(
|
|||||||
Ok(redis_conn)
|
Ok(redis_conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Load job from Redis using the supervisor's Job API
|
// /// Load job from Redis using the supervisor's Job API
|
||||||
pub async fn load_job_from_redis(
|
// pub async fn load_job_from_redis(
|
||||||
redis_conn: &mut redis::aio::MultiplexedConnection,
|
// redis_conn: &mut redis::aio::MultiplexedConnection,
|
||||||
job_id: &str,
|
// job_id: &str,
|
||||||
runner_id: &str,
|
// runner_id: &str,
|
||||||
) -> Result<Job, JobError> {
|
// ) -> Result<Job, JobError> {
|
||||||
debug!("Runner '{}', Job {}: Loading job from Redis", runner_id, job_id);
|
// debug!("Runner '{}', Job {}: Loading job from Redis", runner_id, job_id);
|
||||||
|
|
||||||
// Load job data from Redis hash
|
// // Load job data from Redis hash
|
||||||
let job_key = format!("hero:job:{}", job_id);
|
// let job_data: std::collections::HashMap<String, String> = redis_conn.hgetall(&client.job_key(job_id)).await
|
||||||
let job_data: std::collections::HashMap<String, String> = redis_conn.hgetall(&job_key).await
|
// .map_err(JobError::Redis)?;
|
||||||
.map_err(JobError::Redis)?;
|
|
||||||
|
|
||||||
if job_data.is_empty() {
|
// if job_data.is_empty() {
|
||||||
return Err(JobError::NotFound(job_id.to_string()));
|
// return Err(JobError::NotFound(job_id.to_string()));
|
||||||
}
|
// }
|
||||||
|
|
||||||
// Parse job from hash data using the supervisor's Job struct
|
// // Parse job from hash data using the supervisor's Job struct
|
||||||
let job = Job {
|
// let job = Job {
|
||||||
id: job_id.to_string(),
|
// id: job_id.to_string(),
|
||||||
caller_id: job_data.get("caller_id").unwrap_or(&"".to_string()).clone(),
|
// caller_id: job_data.get("caller_id").unwrap_or(&"".to_string()).clone(),
|
||||||
context_id: job_data.get("context_id").unwrap_or(&"".to_string()).clone(),
|
// context_id: job_data.get("context_id").unwrap_or(&"".to_string()).clone(),
|
||||||
payload: job_data.get("payload").unwrap_or(&"".to_string()).clone(),
|
// payload: job_data.get("payload").unwrap_or(&"".to_string()).clone(),
|
||||||
runner: job_data.get("runner").unwrap_or(&"default".to_string()).clone(),
|
// runner: job_data.get("runner").unwrap_or(&"default".to_string()).clone(),
|
||||||
executor: job_data.get("executor").unwrap_or(&"rhai".to_string()).clone(),
|
// executor: job_data.get("executor").unwrap_or(&"rhai".to_string()).clone(),
|
||||||
timeout: job_data.get("timeout").and_then(|s| s.parse().ok()).unwrap_or(300),
|
// timeout: job_data.get("timeout").and_then(|s| s.parse().ok()).unwrap_or(300),
|
||||||
env_vars: serde_json::from_str(job_data.get("env_vars").unwrap_or(&"{}".to_string()))
|
// env_vars: serde_json::from_str(job_data.get("env_vars").unwrap_or(&"{}".to_string()))
|
||||||
.map_err(JobError::Serialization)?,
|
// .map_err(JobError::Serialization)?,
|
||||||
created_at: job_data.get("created_at")
|
// created_at: job_data.get("created_at")
|
||||||
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
|
// .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
|
||||||
.map(|dt| dt.with_timezone(&chrono::Utc))
|
// .map(|dt| dt.with_timezone(&chrono::Utc))
|
||||||
.unwrap_or_else(chrono::Utc::now),
|
// .unwrap_or_else(chrono::Utc::now),
|
||||||
updated_at: job_data.get("updated_at")
|
// updated_at: job_data.get("updated_at")
|
||||||
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
|
// .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
|
||||||
.map(|dt| dt.with_timezone(&chrono::Utc))
|
// .map(|dt| dt.with_timezone(&chrono::Utc))
|
||||||
.unwrap_or_else(chrono::Utc::now),
|
// .unwrap_or_else(chrono::Utc::now),
|
||||||
};
|
// };
|
||||||
|
|
||||||
Ok(job)
|
// Ok(job)
|
||||||
}
|
// }
|
@@ -27,7 +27,7 @@
|
|||||||
//! └───────────────┘
|
//! └───────────────┘
|
||||||
//! ```
|
//! ```
|
||||||
|
|
||||||
use hero_job::{Job, JobStatus};
|
use crate::job::{Job, JobStatus, Client};
|
||||||
use log::{debug, error, info};
|
use log::{debug, error, info};
|
||||||
use redis::AsyncCommands;
|
use redis::AsyncCommands;
|
||||||
|
|
||||||
@@ -116,8 +116,15 @@ pub trait Runner: Send + Sync + 'static {
|
|||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let runner_id = self.runner_id();
|
let runner_id = self.runner_id();
|
||||||
let redis_url = self.redis_url();
|
let redis_url = self.redis_url();
|
||||||
// Canonical work queue based on runner_id
|
|
||||||
let queue_key = format!("runner_queue:{}", runner_id);
|
// Create client to get the proper queue key
|
||||||
|
let client = Client::builder()
|
||||||
|
.redis_url(redis_url)
|
||||||
|
.build()
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("Failed to create client: {}", e))?;
|
||||||
|
|
||||||
|
let queue_key = client.runner_key(runner_id);
|
||||||
info!(
|
info!(
|
||||||
"{} Runner '{}' starting. Connecting to Redis at {}. Listening on queue: {}",
|
"{} Runner '{}' starting. Connecting to Redis at {}. Listening on queue: {}",
|
||||||
self.runner_type(),
|
self.runner_type(),
|
||||||
@@ -156,7 +163,7 @@ pub trait Runner: Send + Sync + 'static {
|
|||||||
self.runner_type(), runner_id, job_id, _queue_name_recv);
|
self.runner_type(), runner_id, job_id, _queue_name_recv);
|
||||||
|
|
||||||
// Load the job from Redis
|
// Load the job from Redis
|
||||||
match crate::load_job_from_redis(&mut redis_conn, &job_id, runner_id).await {
|
match client.load_job_from_redis(&job_id).await {
|
||||||
Ok(job) => {
|
Ok(job) => {
|
||||||
// Check for ping job and handle it directly
|
// Check for ping job and handle it directly
|
||||||
if job.payload.trim() == "ping" {
|
if job.payload.trim() == "ping" {
|
||||||
@@ -164,18 +171,18 @@ pub trait Runner: Send + Sync + 'static {
|
|||||||
self.runner_type(), runner_id, job_id);
|
self.runner_type(), runner_id, job_id);
|
||||||
|
|
||||||
// Update job status to started
|
// Update job status to started
|
||||||
if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Started).await {
|
if let Err(e) = client.set_job_status(&job_id, JobStatus::Started).await {
|
||||||
error!("{} Runner '{}': Failed to update ping job '{}' status to Started: {}",
|
error!("{} Runner '{}': Failed to update ping job '{}' status to Started: {}",
|
||||||
self.runner_type(), runner_id, job_id, e);
|
self.runner_type(), runner_id, job_id, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set result to "pong" and mark as finished
|
// Set result to "pong" and mark as finished
|
||||||
if let Err(e) = Job::set_result(&mut redis_conn, &job_id, "pong").await {
|
if let Err(e) = client.set_result(&job_id, "pong").await {
|
||||||
error!("{} Runner '{}': Failed to set ping job '{}' result: {}",
|
error!("{} Runner '{}': Failed to set ping job '{}' result: {}",
|
||||||
self.runner_type(), runner_id, job_id, e);
|
self.runner_type(), runner_id, job_id, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Finished).await {
|
if let Err(e) = client.set_job_status(&job_id, JobStatus::Finished).await {
|
||||||
error!("{} Runner '{}': Failed to update ping job '{}' status to Finished: {}",
|
error!("{} Runner '{}': Failed to update ping job '{}' status to Finished: {}",
|
||||||
self.runner_type(), runner_id, job_id, e);
|
self.runner_type(), runner_id, job_id, e);
|
||||||
}
|
}
|
||||||
@@ -184,7 +191,7 @@ pub trait Runner: Send + Sync + 'static {
|
|||||||
self.runner_type(), runner_id, job_id);
|
self.runner_type(), runner_id, job_id);
|
||||||
} else {
|
} else {
|
||||||
// Update job status to started
|
// Update job status to started
|
||||||
if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Started).await {
|
if let Err(e) = client.set_job_status(&job_id, JobStatus::Started).await {
|
||||||
error!("{} Runner '{}': Failed to update job '{}' status to Started: {}",
|
error!("{} Runner '{}': Failed to update job '{}' status to Started: {}",
|
||||||
self.runner_type(), runner_id, job_id, e);
|
self.runner_type(), runner_id, job_id, e);
|
||||||
}
|
}
|
||||||
@@ -193,12 +200,12 @@ pub trait Runner: Send + Sync + 'static {
|
|||||||
match self.process_job(job) {
|
match self.process_job(job) {
|
||||||
Ok(result) => {
|
Ok(result) => {
|
||||||
// Set result and mark as finished
|
// Set result and mark as finished
|
||||||
if let Err(e) = Job::set_result(&mut redis_conn, &job_id, &result).await {
|
if let Err(e) = client.set_result(&job_id, &result).await {
|
||||||
error!("{} Runner '{}': Failed to set job '{}' result: {}",
|
error!("{} Runner '{}': Failed to set job '{}' result: {}",
|
||||||
self.runner_type(), runner_id, job_id, e);
|
self.runner_type(), runner_id, job_id, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Finished).await {
|
if let Err(e) = client.set_job_status(&job_id, JobStatus::Finished).await {
|
||||||
error!("{} Runner '{}': Failed to update job '{}' status to Finished: {}",
|
error!("{} Runner '{}': Failed to update job '{}' status to Finished: {}",
|
||||||
self.runner_type(), runner_id, job_id, e);
|
self.runner_type(), runner_id, job_id, e);
|
||||||
}
|
}
|
||||||
@@ -209,12 +216,12 @@ pub trait Runner: Send + Sync + 'static {
|
|||||||
self.runner_type(), runner_id, job_id, error_str);
|
self.runner_type(), runner_id, job_id, error_str);
|
||||||
|
|
||||||
// Set error and mark as error
|
// Set error and mark as error
|
||||||
if let Err(e) = Job::set_error(&mut redis_conn, &job_id, &error_str).await {
|
if let Err(e) = client.set_error(&job_id, &error_str).await {
|
||||||
error!("{} Runner '{}': Failed to set job '{}' error: {}",
|
error!("{} Runner '{}': Failed to set job '{}' error: {}",
|
||||||
self.runner_type(), runner_id, job_id, e);
|
self.runner_type(), runner_id, job_id, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Error).await {
|
if let Err(e) = client.set_job_status(&job_id, JobStatus::Error).await {
|
||||||
error!("{} Runner '{}': Failed to update job '{}' status to Error: {}",
|
error!("{} Runner '{}': Failed to update job '{}' status to Error: {}",
|
||||||
self.runner_type(), runner_id, job_id, e);
|
self.runner_type(), runner_id, job_id, e);
|
||||||
}
|
}
|
||||||
@@ -261,7 +268,5 @@ pub fn spawn_runner<W: Runner>(
|
|||||||
runner.spawn(shutdown_rx)
|
runner.spawn(shutdown_rx)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Helper to derive queue name from runner_id
|
|
||||||
fn derive_queue_from_runner_id(runner_id: &str) -> String {
|
|
||||||
format!("runner_queue:{}", runner_id)
|
|
||||||
}
|
|
||||||
|
169
src/script_mode.rs
Normal file
169
src/script_mode.rs
Normal file
@@ -0,0 +1,169 @@
|
|||||||
|
use std::time::Duration;
|
||||||
|
use tokio::time::{timeout, sleep};
|
||||||
|
use crate::job::{JobBuilder, Client, JobStatus};
|
||||||
|
use log::{info, error};
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use crate::async_runner::AsyncRunner;
|
||||||
|
use crate::runner_trait::{Runner, RunnerConfig};
|
||||||
|
|
||||||
|
/// Execute a script in single-job mode
|
||||||
|
/// Creates a job, submits it, waits for completion, and returns the result
|
||||||
|
pub async fn execute_script_mode<F>(
|
||||||
|
script_content: &str,
|
||||||
|
runner_id: &str,
|
||||||
|
redis_url: String,
|
||||||
|
job_timeout: Duration,
|
||||||
|
engine_factory: F,
|
||||||
|
) -> Result<String, Box<dyn std::error::Error + Send + Sync>>
|
||||||
|
where
|
||||||
|
F: Fn() -> rhai::Engine + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
info!("Executing script in single-job mode");
|
||||||
|
|
||||||
|
// Create job client
|
||||||
|
let job_client = Client::builder()
|
||||||
|
.redis_url(&redis_url)
|
||||||
|
.build()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Create the job using JobBuilder
|
||||||
|
let job = JobBuilder::new()
|
||||||
|
.caller_id("script_mode")
|
||||||
|
.context_id("single_job")
|
||||||
|
.payload(script_content)
|
||||||
|
.runner(runner_id)
|
||||||
|
.executor("rhai")
|
||||||
|
.timeout(job_timeout.as_secs())
|
||||||
|
.build()?;
|
||||||
|
|
||||||
|
let job_id = job.id.clone();
|
||||||
|
info!("Created job with ID: {}", job_id);
|
||||||
|
|
||||||
|
// Submit the job
|
||||||
|
job_client.store_job_in_redis(&job).await?;
|
||||||
|
info!("Job stored in Redis");
|
||||||
|
|
||||||
|
// Dispatch the job to the runner's queue
|
||||||
|
job_client.dispatch_job(&job_id, runner_id).await?;
|
||||||
|
info!("Job dispatched to runner queue: {}", runner_id);
|
||||||
|
|
||||||
|
// Create and spawn a temporary runner to process the job
|
||||||
|
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
|
||||||
|
|
||||||
|
let config = RunnerConfig {
|
||||||
|
runner_id: runner_id.to_string(),
|
||||||
|
db_path: "/tmp".to_string(), // Temporary path for script mode
|
||||||
|
redis_url: redis_url.clone(),
|
||||||
|
default_timeout: Some(job_timeout),
|
||||||
|
};
|
||||||
|
|
||||||
|
let runner = Arc::new(
|
||||||
|
AsyncRunner::builder()
|
||||||
|
.runner_id(&config.runner_id)
|
||||||
|
.db_path(&config.db_path)
|
||||||
|
.redis_url(&config.redis_url)
|
||||||
|
.default_timeout(config.default_timeout.unwrap_or(job_timeout))
|
||||||
|
.engine_factory(engine_factory)
|
||||||
|
.build()
|
||||||
|
.map_err(|e| format!("Failed to build runner: {}", e))?
|
||||||
|
);
|
||||||
|
let runner_handle = runner.spawn(shutdown_rx);
|
||||||
|
|
||||||
|
info!("Temporary runner spawned for job processing");
|
||||||
|
|
||||||
|
// Wait for job completion with timeout
|
||||||
|
let result = timeout(job_timeout, wait_for_job_completion(&job_client, &job_id)).await;
|
||||||
|
|
||||||
|
// Shutdown the temporary runner
|
||||||
|
let _ = shutdown_tx.send(()).await;
|
||||||
|
let _ = runner_handle.await;
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(job_result) => {
|
||||||
|
match job_result {
|
||||||
|
Ok(job_status) => {
|
||||||
|
match job_status {
|
||||||
|
JobStatus::Finished => {
|
||||||
|
info!("Job completed successfully");
|
||||||
|
// Get the job result from Redis
|
||||||
|
match job_client.get_result(&job_id).await {
|
||||||
|
Ok(Some(result)) => Ok(result),
|
||||||
|
Ok(None) => Ok("Job completed with no result".to_string()),
|
||||||
|
Err(e) => {
|
||||||
|
error!("Failed to get job result: {}", e);
|
||||||
|
Ok("Job completed but result unavailable".to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
JobStatus::Error => {
|
||||||
|
// Get the job error from Redis - for now just return a generic error
|
||||||
|
error!("Job failed with status: Error");
|
||||||
|
return Err("Job execution failed".into());
|
||||||
|
/*match job_client.get_job_error(&job_id).await {
|
||||||
|
Ok(Some(error_msg)) => {
|
||||||
|
error!("Job failed: {}", error_msg);
|
||||||
|
Err(format!("Job failed: {}", error_msg).into())
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
error!("Job failed with no error message");
|
||||||
|
Err("Job failed with no error message".into())
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Failed to get job error: {}", e);
|
||||||
|
Err("Job failed but error details unavailable".into())
|
||||||
|
}
|
||||||
|
}*/
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
error!("Job ended in unexpected status: {:?}", job_status);
|
||||||
|
Err(format!("Job ended in unexpected status: {:?}", job_status).into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Error waiting for job completion: {}", e);
|
||||||
|
Err(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
error!("Job execution timed out after {:?}", job_timeout);
|
||||||
|
// Try to cancel the job
|
||||||
|
let _ = job_client.set_job_status(&job_id, JobStatus::Error).await;
|
||||||
|
Err("Job execution timed out".into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wait for job completion by polling Redis
|
||||||
|
async fn wait_for_job_completion(
|
||||||
|
job_client: &Client,
|
||||||
|
job_id: &str,
|
||||||
|
) -> Result<JobStatus, Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
let poll_interval = Duration::from_millis(500);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match job_client.get_status(job_id).await {
|
||||||
|
Ok(status) => {
|
||||||
|
match status {
|
||||||
|
JobStatus::Finished | JobStatus::Error => {
|
||||||
|
return Ok(status);
|
||||||
|
}
|
||||||
|
JobStatus::Dispatched | JobStatus::WaitingForPrerequisites | JobStatus::Started => {
|
||||||
|
// Continue polling
|
||||||
|
tokio::time::sleep(poll_interval).await;
|
||||||
|
}
|
||||||
|
JobStatus::Stopping => {
|
||||||
|
// Job is being stopped, wait a bit more
|
||||||
|
tokio::time::sleep(poll_interval).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Error polling job status: {}", e);
|
||||||
|
tokio::time::sleep(poll_interval).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@@ -1,4 +1,4 @@
|
|||||||
use hero_job::Job;
|
use crate::job::Job;
|
||||||
use log::{debug, error, info};
|
use log::{debug, error, info};
|
||||||
use rhai::{Dynamic, Engine};
|
use rhai::{Dynamic, Engine};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
@@ -1,47 +0,0 @@
|
|||||||
use actor_system::spawn_sync_runner;
|
|
||||||
use actor_system::engine::osis::create_osis_engine;
|
|
||||||
use tokio::sync::mpsc;
|
|
||||||
use tokio::time::{sleep, Duration};
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
||||||
// Initialize logging
|
|
||||||
env_logger::init();
|
|
||||||
|
|
||||||
// Create shutdown channel
|
|
||||||
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
|
|
||||||
|
|
||||||
println!("Starting sync runner test...");
|
|
||||||
|
|
||||||
// Spawn the sync runner with proper parameters
|
|
||||||
let runner_handle = spawn_sync_runner(
|
|
||||||
"test_runner".to_string(),
|
|
||||||
"/tmp/test_runner.db".to_string(),
|
|
||||||
"redis://localhost:6379".to_string(),
|
|
||||||
shutdown_rx,
|
|
||||||
false, // preserve_tasks
|
|
||||||
|| create_osis_engine(),
|
|
||||||
);
|
|
||||||
|
|
||||||
// Let it run for a few seconds
|
|
||||||
println!("Sync runner started, letting it run for 5 seconds...");
|
|
||||||
sleep(Duration::from_secs(5)).await;
|
|
||||||
|
|
||||||
// Send shutdown signal
|
|
||||||
println!("Sending shutdown signal...");
|
|
||||||
let _ = shutdown_tx.send(()).await;
|
|
||||||
|
|
||||||
// Wait for the runner to finish
|
|
||||||
match runner_handle.await {
|
|
||||||
Ok(result) => {
|
|
||||||
match result {
|
|
||||||
Ok(_) => println!("Sync runner completed successfully!"),
|
|
||||||
Err(e) => println!("Sync runner error: {}", e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => println!("Join error: {}", e),
|
|
||||||
}
|
|
||||||
|
|
||||||
println!("Test completed!");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
130
tests/e2e_tests.rs
Normal file
130
tests/e2e_tests.rs
Normal file
@@ -0,0 +1,130 @@
|
|||||||
|
use std::process::Command;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::time::timeout;
|
||||||
|
|
||||||
|
/// Test the SAL runner in script mode with a simple ping script
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_sal_runner_script_mode_ping() {
|
||||||
|
let output = timeout(
|
||||||
|
Duration::from_secs(10),
|
||||||
|
run_sal_runner_script_mode("test_sal_ping")
|
||||||
|
).await;
|
||||||
|
|
||||||
|
match output {
|
||||||
|
Ok(result) => {
|
||||||
|
assert!(result.is_ok(), "SAL runner should execute successfully");
|
||||||
|
let stdout = result.unwrap();
|
||||||
|
assert!(stdout.contains("pong"),
|
||||||
|
"Output should contain 'pong' response: {}", stdout);
|
||||||
|
}
|
||||||
|
Err(_) => panic!("Test timed out after 10 seconds"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test the OSIS runner in script mode with a simple ping script
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_osis_runner_script_mode_ping() {
|
||||||
|
let output = timeout(
|
||||||
|
Duration::from_secs(10),
|
||||||
|
run_osis_runner_script_mode("test_osis_ping")
|
||||||
|
).await;
|
||||||
|
|
||||||
|
match output {
|
||||||
|
Ok(result) => {
|
||||||
|
assert!(result.is_ok(), "OSIS runner should execute successfully");
|
||||||
|
let stdout = result.unwrap();
|
||||||
|
assert!(stdout.contains("pong"),
|
||||||
|
"Output should contain 'pong' response: {}", stdout);
|
||||||
|
}
|
||||||
|
Err(_) => panic!("Test timed out after 10 seconds"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Helper function to run SAL runner in script mode
|
||||||
|
async fn run_sal_runner_script_mode(
|
||||||
|
runner_id: &str
|
||||||
|
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
let output = Command::new("cargo")
|
||||||
|
.args(&[
|
||||||
|
"run", "--bin", "runner_sal", "--",
|
||||||
|
runner_id,
|
||||||
|
"-s", "ping"
|
||||||
|
])
|
||||||
|
.output()?;
|
||||||
|
|
||||||
|
if output.status.success() {
|
||||||
|
Ok(String::from_utf8_lossy(&output.stdout).to_string())
|
||||||
|
} else {
|
||||||
|
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||||
|
Err(format!("Command failed: {}", stderr).into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Helper function to run OSIS runner in script mode
|
||||||
|
async fn run_osis_runner_script_mode(
|
||||||
|
runner_id: &str
|
||||||
|
) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
let output = Command::new("cargo")
|
||||||
|
.args(&[
|
||||||
|
"run", "--bin", "runner_osis", "--",
|
||||||
|
runner_id,
|
||||||
|
"-s", "ping"
|
||||||
|
])
|
||||||
|
.output()?;
|
||||||
|
|
||||||
|
if output.status.success() {
|
||||||
|
Ok(String::from_utf8_lossy(&output.stdout).to_string())
|
||||||
|
} else {
|
||||||
|
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||||
|
Err(format!("Command failed: {}", stderr).into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test basic compilation and help output
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_sal_runner_help() {
|
||||||
|
let output = Command::new("cargo")
|
||||||
|
.args(&["run", "--bin", "runner_sal", "--", "--help"])
|
||||||
|
.output()
|
||||||
|
.expect("Failed to execute command");
|
||||||
|
|
||||||
|
assert!(output.status.success(), "Help command should succeed");
|
||||||
|
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||||
|
assert!(stdout.contains("Usage") || stdout.contains("USAGE"),
|
||||||
|
"Help output should contain usage information");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test basic compilation and help output for OSIS runner
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_osis_runner_help() {
|
||||||
|
let output = Command::new("cargo")
|
||||||
|
.args(&["run", "--bin", "runner_osis", "--", "--help"])
|
||||||
|
.output()
|
||||||
|
.expect("Failed to execute command");
|
||||||
|
|
||||||
|
assert!(output.status.success(), "Help command should succeed");
|
||||||
|
let stdout = String::from_utf8_lossy(&output.stdout);
|
||||||
|
assert!(stdout.contains("Usage") || stdout.contains("USAGE"),
|
||||||
|
"Help output should contain usage information");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test library functionality - job creation and basic operations
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_job_creation_and_serialization() {
|
||||||
|
use runner_rust::JobBuilder;
|
||||||
|
|
||||||
|
let job = JobBuilder::new()
|
||||||
|
.caller_id("test_caller")
|
||||||
|
.context_id("test_context")
|
||||||
|
.payload("ping")
|
||||||
|
.runner("default")
|
||||||
|
.executor("rhai")
|
||||||
|
.build()
|
||||||
|
.expect("Job creation should succeed");
|
||||||
|
|
||||||
|
assert_eq!(job.caller_id, "test_caller");
|
||||||
|
assert_eq!(job.context_id, "test_context");
|
||||||
|
assert_eq!(job.payload, "ping");
|
||||||
|
assert_eq!(job.runner, "default");
|
||||||
|
assert_eq!(job.executor, "rhai");
|
||||||
|
}
|
Reference in New Issue
Block a user