From 0c918a8f5fb871655bd2d345884cf55837a98f92 Mon Sep 17 00:00:00 2001 From: Timur Gordon <31495328+timurgordon@users.noreply.github.com> Date: Tue, 28 Oct 2025 03:32:57 +0100 Subject: [PATCH] Add get_error method to client for standardized error retrieval - Implemented get_error() method to fetch job error messages from Redis - Mirrors get_result() pattern for consistency - Used by supervisor to retrieve job errors without manual Redis queries - Cleanup: removed old runner_osis directory --- Cargo.lock | 18 +++--- Cargo.toml | 2 +- examples/osiris/main.rs | 8 +-- src/bin/runner_osiris.rs | 40 ++++-------- src/bin/runner_osis/README.md | 118 ---------------------------------- src/bin/runner_osis/engine.rs | 14 ---- src/bin/runner_osis/main.rs | 107 ------------------------------ src/client.rs | 15 +++++ src/sync_runner.rs | 16 +---- 9 files changed, 43 insertions(+), 295 deletions(-) delete mode 100644 src/bin/runner_osis/README.md delete mode 100644 src/bin/runner_osis/engine.rs delete mode 100644 src/bin/runner_osis/main.rs diff --git a/Cargo.lock b/Cargo.lock index 4ab7760..91a72aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -447,7 +447,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" dependencies = [ "lazy_static", - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] @@ -898,7 +898,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" dependencies = [ "libc", - "windows-sys 0.60.2", + "windows-sys 0.52.0", ] [[package]] @@ -1579,7 +1579,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.6.0", + "socket2 0.5.10", "system-configuration 0.6.1", "tokio", "tower-service", @@ -1822,7 +1822,7 @@ checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -1858,7 +1858,7 @@ dependencies = [ "portable-atomic", "portable-atomic-util", "serde", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -3246,7 +3246,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.4.15", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -3259,7 +3259,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.9.4", - "windows-sys 0.60.2", + "windows-sys 0.52.0", ] [[package]] @@ -4103,7 +4103,7 @@ dependencies = [ "getrandom 0.3.3", "once_cell", "rustix 1.0.8", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -4957,7 +4957,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.48.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index ad1d1ff..f4dd04b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,7 +56,7 @@ heromodels_core = { git = "https://git.ourworld.tf/herocode/db.git" } heromodels-derive = { git = "https://git.ourworld.tf/herocode/db.git" } rhailib_dsl = { git = "https://git.ourworld.tf/herocode/rhailib.git" } hero_logger = { git = "https://git.ourworld.tf/herocode/baobab.git", branch = "logger" } -osiris = { git = "https://git.ourworld.tf/herocode/osiris.git" } +osiris = { path = "../osiris" } # SAL modules for system engine sal-os = { git = "https://git.ourworld.tf/herocode/herolib_rust.git" } sal-redisclient = { git = "https://git.ourworld.tf/herocode/herolib_rust.git" } diff --git a/examples/osiris/main.rs b/examples/osiris/main.rs index 186cc36..fb1874a 100644 --- a/examples/osiris/main.rs +++ b/examples/osiris/main.rs @@ -80,7 +80,7 @@ async fn main() -> Result<(), Box> { let job1 = JobBuilder::new() .caller_id("example_client") .context_id("demo_context") - .payload(create_note_script) + .payload(&create_note_script) .runner("demo_runner") .executor("rhai") .timeout(30) @@ -114,7 +114,7 @@ async fn main() -> Result<(), Box> { let job2 = JobBuilder::new() .caller_id("example_client") .context_id("demo_context") - .payload(create_event_script) + .payload(&create_event_script) .runner("demo_runner") .executor("rhai") .timeout(30) @@ -148,7 +148,7 @@ async fn main() -> Result<(), Box> { let job3 = JobBuilder::new() .caller_id("example_client") .context_id("demo_context") - .payload(query_script) + .payload(&query_script) .runner("demo_runner") .executor("rhai") .timeout(30) @@ -182,7 +182,7 @@ async fn main() -> Result<(), Box> { let job4 = JobBuilder::new() .caller_id("example_client") .context_id("demo_context") - .payload(access_denied_script) + .payload(&access_denied_script) .runner("demo_runner") .executor("rhai") .timeout(30) diff --git a/src/bin/runner_osiris.rs b/src/bin/runner_osiris.rs index 12b0ca8..9434758 100644 --- a/src/bin/runner_osiris.rs +++ b/src/bin/runner_osiris.rs @@ -2,7 +2,7 @@ use runner_rust::{spawn_sync_runner, script_mode::execute_script_mode}; use clap::Parser; use log::{error, info}; use tokio::sync::mpsc; -use rhai::Engine; +use osiris::rhai::create_osiris_engine; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] @@ -10,35 +10,19 @@ struct Args { /// Runner ID runner_id: String, - /// Database path - #[arg(short, long, default_value = "/tmp/osis.db")] - db_path: String, - - /// Redis URL + /// Redis URL (also used as HeroDB URL) #[arg(short = 'r', long, default_value = "redis://localhost:6379")] redis_url: String, - /// Preserve tasks after completion - #[arg(short, long, default_value_t = false)] - preserve_tasks: bool, + /// Base database ID for OSIRIS contexts + #[arg(long, default_value_t = 1)] + base_db_id: u16, /// Script to execute in single-job mode (optional) #[arg(short, long)] script: Option, } -/// Create a new OSIRIS engine instance. -/// -/// This creates an engine with dynamic context management via get_context(): -/// - Registers all OSIRIS functions (Note, Event, etc.) -/// - Sets up get_context() for participant-based access control -/// - Configures the Rhai engine for OSIRIS scripts -fn create_osis_engine() -> Engine { - // Create a basic Rhai engine - // TODO: Add OSIRIS-specific registrations when available - Engine::new() -} - #[tokio::main] async fn main() -> Result<(), Box> { // Initialize logging @@ -50,12 +34,15 @@ async fn main() -> Result<(), Box> { if let Some(script_content) = args.script { info!("Running in script mode with runner ID: {}", args.runner_id); + let redis_url = args.redis_url.clone(); + let base_db_id = args.base_db_id; let result = execute_script_mode( &script_content, &args.runner_id, args.redis_url, std::time::Duration::from_secs(300), // Default timeout for OSIS - create_osis_engine, + move || create_osiris_engine(&redis_url, base_db_id) + .expect("Failed to create OSIRIS engine"), ).await; match result { @@ -71,9 +58,7 @@ async fn main() -> Result<(), Box> { } info!("Starting OSIS Sync Runner with ID: {}", args.runner_id); - info!("Database path: {}", args.db_path); info!("Redis URL: {}", args.redis_url); - info!("Preserve tasks: {}", args.preserve_tasks); // Create shutdown channel let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); @@ -87,13 +72,14 @@ async fn main() -> Result<(), Box> { }); // Spawn the sync runner with engine factory + let redis_url = args.redis_url.clone(); + let base_db_id = args.base_db_id; let runner_handle = spawn_sync_runner( args.runner_id.clone(), - args.db_path, args.redis_url, shutdown_rx, - args.preserve_tasks, - create_osis_engine, + move || create_osiris_engine(&redis_url, base_db_id) + .expect("Failed to create OSIRIS engine"), ); info!("OSIS Sync Runner '{}' started successfully", args.runner_id); diff --git a/src/bin/runner_osis/README.md b/src/bin/runner_osis/README.md deleted file mode 100644 index 119274a..0000000 --- a/src/bin/runner_osis/README.md +++ /dev/null @@ -1,118 +0,0 @@ -# OSIS Runner - -The OSIS (Object Storage Information System) Runner is a synchronous job processing engine that executes Rhai scripts with access to OSIS-specific operations and data management capabilities. - -## Features - -- **Synchronous Processing**: Processes jobs sequentially, ensuring deterministic execution order -- **Redis Integration**: Uses Redis for job queue management and coordination -- **OSIS Operations**: Access to object storage, metadata management, and information system operations -- **Task Persistence**: Optional task preservation for debugging and audit purposes -- **Graceful Shutdown**: Responds to SIGINT (Ctrl+C) for clean termination -- **SQLite Database**: Local database storage for job state and metadata - -## Usage - -```bash -cargo run --bin runner_osis -- [OPTIONS] -``` - -### Arguments - -- ``: Unique identifier for this runner instance (required, positional) - -### Options - -- `-d, --db-path `: SQLite database file path (default: `/tmp/osis.db`) -- `-r, --redis-url `: Redis connection URL (default: `redis://localhost:6379`) -- `-p, --preserve-tasks`: Preserve completed tasks in database for debugging (default: `false`) - -### Examples - -```bash -# Basic usage with default settings -cargo run --bin runner_osis -- myrunner - -# Custom Redis URL and database path -cargo run --bin runner_osis -- osis-prod -r redis://prod-redis:6379 -d /var/lib/osis.db - -# Enable task preservation for debugging -cargo run --bin runner_osis -- debug-runner -p -``` - -## Available OSIS Modules - -The OSIS runner provides access to specialized modules for information system operations: - -- **Object Storage**: File and object management operations -- **Metadata Management**: Information indexing and retrieval -- **Data Processing**: Content analysis and transformation -- **System Integration**: Interface with external information systems -- **Audit and Logging**: Comprehensive operation tracking - -## Architecture - -The OSIS runner uses a synchronous architecture that: - -1. Connects to Redis for job queue management -2. Initializes SQLite database for local state management -3. Creates a Rhai engine with OSIS modules registered -4. Processes jobs sequentially in FIFO order -5. Optionally preserves task history for debugging -6. Handles graceful shutdown on SIGINT - -## Synchronous vs Asynchronous - -Unlike the SAL runner, the OSIS runner processes jobs synchronously: - -- **Sequential Processing**: Jobs are processed one at a time -- **Deterministic Order**: Ensures predictable execution sequence -- **Resource Safety**: Prevents resource conflicts in data operations -- **Debugging Friendly**: Easier to trace and debug job execution - -## Database Schema - -The runner maintains a SQLite database with the following structure: - -- **Jobs Table**: Active and completed job records -- **Task History**: Optional preservation of task execution details -- **Metadata**: Runner configuration and state information - -## Error Handling - -The runner provides detailed error messages for: - -- Redis connection failures -- Database initialization and access problems -- Script execution errors -- Resource cleanup issues -- Shutdown sequence problems - -## Logging - -Set the `RUST_LOG` environment variable to control logging levels: - -```bash -RUST_LOG=info cargo run --bin runner_osis -- myrunner -``` - -Available log levels: `error`, `warn`, `info`, `debug`, `trace` - -## Task Preservation - -When `--preserve-tasks` is enabled: - -- Completed tasks remain in the database -- Useful for debugging and audit trails -- May require periodic cleanup for long-running instances -- Increases database size over time - -## Use Cases - -The OSIS runner is ideal for: - -- Data processing pipelines requiring strict ordering -- Information system operations with dependencies -- Batch processing jobs that must complete sequentially -- Debugging scenarios where task history is important -- Operations requiring transactional consistency diff --git a/src/bin/runner_osis/engine.rs b/src/bin/runner_osis/engine.rs deleted file mode 100644 index dd4ede0..0000000 --- a/src/bin/runner_osis/engine.rs +++ /dev/null @@ -1,14 +0,0 @@ -use rhai::Engine; - -/// Create a new OSIRIS engine instance. -/// -/// This simply delegates to osiris::rhai::create_osiris_engine which: -/// - Registers all OSIRIS functions (Note, Event, etc.) -/// - Sets up HeroDB context management -/// - Configures the Rhai engine for OSIRIS scripts -pub fn create_osis_engine() -> Engine { - // Use the osiris engine creation - it handles everything - osiris::rhai::create_osiris_engine("default_owner", "redis://localhost:6379", 1) - .expect("Failed to create OSIRIS engine") - .0 // Return just the engine, not the scope -} diff --git a/src/bin/runner_osis/main.rs b/src/bin/runner_osis/main.rs deleted file mode 100644 index 1f03c61..0000000 --- a/src/bin/runner_osis/main.rs +++ /dev/null @@ -1,107 +0,0 @@ -use runner_rust::{spawn_sync_runner, script_mode::execute_script_mode}; -use clap::Parser; -use log::{error, info}; -use tokio::sync::mpsc; - -mod engine; -use engine::create_osis_engine; - -#[derive(Parser, Debug)] -#[command(author, version, about, long_about = None)] -struct Args { - /// Runner ID - runner_id: String, - - /// Database path - #[arg(short, long, default_value = "/tmp/osis.db")] - db_path: String, - - /// Redis URL - #[arg(short = 'r', long, default_value = "redis://localhost:6379")] - redis_url: String, - - /// Preserve tasks after completion - #[arg(short, long, default_value_t = false)] - preserve_tasks: bool, - - /// Script to execute in single-job mode (optional) - #[arg(short, long)] - script: Option, -} - -#[tokio::main] -async fn main() -> Result<(), Box> { - // Initialize logging - env_logger::init(); - - let args = Args::parse(); - - // Check if we're in script mode - if let Some(script_content) = args.script { - info!("Running in script mode with runner ID: {}", args.runner_id); - - let result = execute_script_mode( - &script_content, - &args.runner_id, - args.redis_url, - std::time::Duration::from_secs(300), // Default timeout for OSIS - create_osis_engine, - ).await; - - match result { - Ok(output) => { - println!("Script execution result:\n{}", output); - return Ok(()); - } - Err(e) => { - error!("Script execution failed: {}", e); - return Err(e); - } - } - } - - info!("Starting OSIS Sync Runner with ID: {}", args.runner_id); - info!("Database path: {}", args.db_path); - info!("Redis URL: {}", args.redis_url); - info!("Preserve tasks: {}", args.preserve_tasks); - - // Create shutdown channel - let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); - - // Setup signal handling for graceful shutdown - let shutdown_tx_clone = shutdown_tx.clone(); - tokio::spawn(async move { - tokio::signal::ctrl_c().await.expect("Failed to listen for ctrl+c"); - info!("Received Ctrl+C, initiating shutdown..."); - let _ = shutdown_tx_clone.send(()).await; - }); - - // Spawn the sync runner with engine factory - let runner_handle = spawn_sync_runner( - args.runner_id.clone(), - args.db_path, - args.redis_url, - shutdown_rx, - args.preserve_tasks, - create_osis_engine, - ); - - info!("OSIS Sync Runner '{}' started successfully", args.runner_id); - - // Wait for the runner to complete - match runner_handle.await { - Ok(Ok(())) => { - info!("OSIS Sync Runner '{}' shut down successfully", args.runner_id); - } - Ok(Err(e)) => { - error!("OSIS Sync Runner '{}' encountered an error: {}", args.runner_id, e); - return Err(e); - } - Err(e) => { - error!("Failed to join OSIS Sync Runner '{}' task: {}", args.runner_id, e); - return Err(Box::new(e)); - } - } - - Ok(()) -} diff --git a/src/client.rs b/src/client.rs index f4200c1..ecbf3c6 100644 --- a/src/client.rs +++ b/src/client.rs @@ -312,6 +312,21 @@ impl Client { Ok(result) } + /// Get job result from Redis + pub async fn get_error( + &self, + job_id: &str, + ) -> Result, JobError> { + let job_key = self.job_key(job_id); + let mut conn = self.redis_client + .get_multiplexed_async_connection() + .await + .map_err(|e| JobError::Redis(e))?; + let result: Option = conn.hget(&job_key, "error").await + .map_err(|e| JobError::Redis(e))?; + Ok(result) + } + /// Get a job ID from the work queue (blocking pop) pub async fn get_job_id(&self, queue_key: &str) -> Result, JobError> { let mut conn = self.redis_client diff --git a/src/sync_runner.rs b/src/sync_runner.rs index 99a8aa2..885f56c 100644 --- a/src/sync_runner.rs +++ b/src/sync_runner.rs @@ -9,9 +9,7 @@ use crate::runner_trait::Runner; #[derive(Debug, Clone)] pub struct SyncRunnerConfig { pub runner_id: String, - pub db_path: String, pub redis_url: String, - pub preserve_tasks: bool, } /// Synchronous runner that processes jobs sequentially @@ -39,11 +37,9 @@ impl SyncRunner { fn execute_job_with_engine( engine: &mut Engine, job: &Job, - db_path: &str, ) -> Result> { // Set up job context in the engine let mut db_config = rhai::Map::new(); - db_config.insert("DB_PATH".into(), db_path.to_string().into()); db_config.insert("CALLER_ID".into(), job.caller_id.clone().into()); db_config.insert("CONTEXT_ID".into(), job.context_id.clone().into()); @@ -53,12 +49,6 @@ impl SyncRunner { job.signatures.iter() .map(|sig| Dynamic::from(sig.public_key.clone())) .collect() - } else if let Some(sig_json) = job.env_vars.get("SIGNATORIES") { - // Fall back to SIGNATORIES from env_vars (for backward compatibility) - match serde_json::from_str::>(sig_json) { - Ok(sigs) => sigs.into_iter().map(Dynamic::from).collect(), - Err(_) => Vec::new(), - } } else { Vec::new() }; @@ -87,7 +77,7 @@ impl Runner for SyncRunner { let mut engine = (self.engine_factory)(); // Execute the script - match Self::execute_job_with_engine(&mut engine, &job, &self.config.db_path) { + match Self::execute_job_with_engine(&mut engine, &job) { Ok(result) => { let output_str = if result.is::() { result.into_string().unwrap() @@ -121,10 +111,8 @@ impl Runner for SyncRunner { /// Convenience function to spawn a synchronous runner using the trait interface pub fn spawn_sync_runner( runner_id: String, - db_path: String, redis_url: String, shutdown_rx: tokio::sync::mpsc::Receiver<()>, - preserve_tasks: bool, engine_factory: F, ) -> tokio::task::JoinHandle>> where @@ -132,9 +120,7 @@ where { let config = SyncRunnerConfig { runner_id, - db_path, redis_url, - preserve_tasks, }; let runner = Arc::new(SyncRunner::new(config, engine_factory));