Add get_error method to client for standardized error retrieval

- Implemented get_error() method to fetch job error messages from Redis
- Mirrors get_result() pattern for consistency
- Used by supervisor to retrieve job errors without manual Redis queries
- Cleanup: removed old runner_osis directory
This commit is contained in:
Timur Gordon
2025-10-28 03:32:57 +01:00
parent 268128f7fd
commit 0c918a8f5f
9 changed files with 43 additions and 295 deletions

18
Cargo.lock generated
View File

@@ -447,7 +447,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c"
dependencies = [ dependencies = [
"lazy_static", "lazy_static",
"windows-sys 0.59.0", "windows-sys 0.48.0",
] ]
[[package]] [[package]]
@@ -898,7 +898,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad"
dependencies = [ dependencies = [
"libc", "libc",
"windows-sys 0.60.2", "windows-sys 0.52.0",
] ]
[[package]] [[package]]
@@ -1579,7 +1579,7 @@ dependencies = [
"libc", "libc",
"percent-encoding", "percent-encoding",
"pin-project-lite", "pin-project-lite",
"socket2 0.6.0", "socket2 0.5.10",
"system-configuration 0.6.1", "system-configuration 0.6.1",
"tokio", "tokio",
"tower-service", "tower-service",
@@ -1822,7 +1822,7 @@ checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9"
dependencies = [ dependencies = [
"hermit-abi", "hermit-abi",
"libc", "libc",
"windows-sys 0.59.0", "windows-sys 0.52.0",
] ]
[[package]] [[package]]
@@ -1858,7 +1858,7 @@ dependencies = [
"portable-atomic", "portable-atomic",
"portable-atomic-util", "portable-atomic-util",
"serde", "serde",
"windows-sys 0.59.0", "windows-sys 0.52.0",
] ]
[[package]] [[package]]
@@ -3246,7 +3246,7 @@ dependencies = [
"errno", "errno",
"libc", "libc",
"linux-raw-sys 0.4.15", "linux-raw-sys 0.4.15",
"windows-sys 0.59.0", "windows-sys 0.52.0",
] ]
[[package]] [[package]]
@@ -3259,7 +3259,7 @@ dependencies = [
"errno", "errno",
"libc", "libc",
"linux-raw-sys 0.9.4", "linux-raw-sys 0.9.4",
"windows-sys 0.60.2", "windows-sys 0.52.0",
] ]
[[package]] [[package]]
@@ -4103,7 +4103,7 @@ dependencies = [
"getrandom 0.3.3", "getrandom 0.3.3",
"once_cell", "once_cell",
"rustix 1.0.8", "rustix 1.0.8",
"windows-sys 0.59.0", "windows-sys 0.52.0",
] ]
[[package]] [[package]]
@@ -4957,7 +4957,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [ dependencies = [
"windows-sys 0.59.0", "windows-sys 0.48.0",
] ]
[[package]] [[package]]

View File

@@ -56,7 +56,7 @@ heromodels_core = { git = "https://git.ourworld.tf/herocode/db.git" }
heromodels-derive = { git = "https://git.ourworld.tf/herocode/db.git" } heromodels-derive = { git = "https://git.ourworld.tf/herocode/db.git" }
rhailib_dsl = { git = "https://git.ourworld.tf/herocode/rhailib.git" } rhailib_dsl = { git = "https://git.ourworld.tf/herocode/rhailib.git" }
hero_logger = { git = "https://git.ourworld.tf/herocode/baobab.git", branch = "logger" } hero_logger = { git = "https://git.ourworld.tf/herocode/baobab.git", branch = "logger" }
osiris = { git = "https://git.ourworld.tf/herocode/osiris.git" } osiris = { path = "../osiris" }
# SAL modules for system engine # SAL modules for system engine
sal-os = { git = "https://git.ourworld.tf/herocode/herolib_rust.git" } sal-os = { git = "https://git.ourworld.tf/herocode/herolib_rust.git" }
sal-redisclient = { git = "https://git.ourworld.tf/herocode/herolib_rust.git" } sal-redisclient = { git = "https://git.ourworld.tf/herocode/herolib_rust.git" }

View File

@@ -80,7 +80,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let job1 = JobBuilder::new() let job1 = JobBuilder::new()
.caller_id("example_client") .caller_id("example_client")
.context_id("demo_context") .context_id("demo_context")
.payload(create_note_script) .payload(&create_note_script)
.runner("demo_runner") .runner("demo_runner")
.executor("rhai") .executor("rhai")
.timeout(30) .timeout(30)
@@ -114,7 +114,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let job2 = JobBuilder::new() let job2 = JobBuilder::new()
.caller_id("example_client") .caller_id("example_client")
.context_id("demo_context") .context_id("demo_context")
.payload(create_event_script) .payload(&create_event_script)
.runner("demo_runner") .runner("demo_runner")
.executor("rhai") .executor("rhai")
.timeout(30) .timeout(30)
@@ -148,7 +148,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let job3 = JobBuilder::new() let job3 = JobBuilder::new()
.caller_id("example_client") .caller_id("example_client")
.context_id("demo_context") .context_id("demo_context")
.payload(query_script) .payload(&query_script)
.runner("demo_runner") .runner("demo_runner")
.executor("rhai") .executor("rhai")
.timeout(30) .timeout(30)
@@ -182,7 +182,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let job4 = JobBuilder::new() let job4 = JobBuilder::new()
.caller_id("example_client") .caller_id("example_client")
.context_id("demo_context") .context_id("demo_context")
.payload(access_denied_script) .payload(&access_denied_script)
.runner("demo_runner") .runner("demo_runner")
.executor("rhai") .executor("rhai")
.timeout(30) .timeout(30)

View File

@@ -2,7 +2,7 @@ use runner_rust::{spawn_sync_runner, script_mode::execute_script_mode};
use clap::Parser; use clap::Parser;
use log::{error, info}; use log::{error, info};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use rhai::Engine; use osiris::rhai::create_osiris_engine;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)] #[command(author, version, about, long_about = None)]
@@ -10,35 +10,19 @@ struct Args {
/// Runner ID /// Runner ID
runner_id: String, runner_id: String,
/// Database path /// Redis URL (also used as HeroDB URL)
#[arg(short, long, default_value = "/tmp/osis.db")]
db_path: String,
/// Redis URL
#[arg(short = 'r', long, default_value = "redis://localhost:6379")] #[arg(short = 'r', long, default_value = "redis://localhost:6379")]
redis_url: String, redis_url: String,
/// Preserve tasks after completion /// Base database ID for OSIRIS contexts
#[arg(short, long, default_value_t = false)] #[arg(long, default_value_t = 1)]
preserve_tasks: bool, base_db_id: u16,
/// Script to execute in single-job mode (optional) /// Script to execute in single-job mode (optional)
#[arg(short, long)] #[arg(short, long)]
script: Option<String>, script: Option<String>,
} }
/// Create a new OSIRIS engine instance.
///
/// This creates an engine with dynamic context management via get_context():
/// - Registers all OSIRIS functions (Note, Event, etc.)
/// - Sets up get_context() for participant-based access control
/// - Configures the Rhai engine for OSIRIS scripts
fn create_osis_engine() -> Engine {
// Create a basic Rhai engine
// TODO: Add OSIRIS-specific registrations when available
Engine::new()
}
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Initialize logging // Initialize logging
@@ -50,12 +34,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
if let Some(script_content) = args.script { if let Some(script_content) = args.script {
info!("Running in script mode with runner ID: {}", args.runner_id); info!("Running in script mode with runner ID: {}", args.runner_id);
let redis_url = args.redis_url.clone();
let base_db_id = args.base_db_id;
let result = execute_script_mode( let result = execute_script_mode(
&script_content, &script_content,
&args.runner_id, &args.runner_id,
args.redis_url, args.redis_url,
std::time::Duration::from_secs(300), // Default timeout for OSIS std::time::Duration::from_secs(300), // Default timeout for OSIS
create_osis_engine, move || create_osiris_engine(&redis_url, base_db_id)
.expect("Failed to create OSIRIS engine"),
).await; ).await;
match result { match result {
@@ -71,9 +58,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
} }
info!("Starting OSIS Sync Runner with ID: {}", args.runner_id); info!("Starting OSIS Sync Runner with ID: {}", args.runner_id);
info!("Database path: {}", args.db_path);
info!("Redis URL: {}", args.redis_url); info!("Redis URL: {}", args.redis_url);
info!("Preserve tasks: {}", args.preserve_tasks);
// Create shutdown channel // Create shutdown channel
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
@@ -87,13 +72,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
}); });
// Spawn the sync runner with engine factory // Spawn the sync runner with engine factory
let redis_url = args.redis_url.clone();
let base_db_id = args.base_db_id;
let runner_handle = spawn_sync_runner( let runner_handle = spawn_sync_runner(
args.runner_id.clone(), args.runner_id.clone(),
args.db_path,
args.redis_url, args.redis_url,
shutdown_rx, shutdown_rx,
args.preserve_tasks, move || create_osiris_engine(&redis_url, base_db_id)
create_osis_engine, .expect("Failed to create OSIRIS engine"),
); );
info!("OSIS Sync Runner '{}' started successfully", args.runner_id); info!("OSIS Sync Runner '{}' started successfully", args.runner_id);

View File

@@ -1,118 +0,0 @@
# OSIS Runner
The OSIS (Object Storage Information System) Runner is a synchronous job processing engine that executes Rhai scripts with access to OSIS-specific operations and data management capabilities.
## Features
- **Synchronous Processing**: Processes jobs sequentially, ensuring deterministic execution order
- **Redis Integration**: Uses Redis for job queue management and coordination
- **OSIS Operations**: Access to object storage, metadata management, and information system operations
- **Task Persistence**: Optional task preservation for debugging and audit purposes
- **Graceful Shutdown**: Responds to SIGINT (Ctrl+C) for clean termination
- **SQLite Database**: Local database storage for job state and metadata
## Usage
```bash
cargo run --bin runner_osis -- <RUNNER_ID> [OPTIONS]
```
### Arguments
- `<RUNNER_ID>`: Unique identifier for this runner instance (required, positional)
### Options
- `-d, --db-path <PATH>`: SQLite database file path (default: `/tmp/osis.db`)
- `-r, --redis-url <URL>`: Redis connection URL (default: `redis://localhost:6379`)
- `-p, --preserve-tasks`: Preserve completed tasks in database for debugging (default: `false`)
### Examples
```bash
# Basic usage with default settings
cargo run --bin runner_osis -- myrunner
# Custom Redis URL and database path
cargo run --bin runner_osis -- osis-prod -r redis://prod-redis:6379 -d /var/lib/osis.db
# Enable task preservation for debugging
cargo run --bin runner_osis -- debug-runner -p
```
## Available OSIS Modules
The OSIS runner provides access to specialized modules for information system operations:
- **Object Storage**: File and object management operations
- **Metadata Management**: Information indexing and retrieval
- **Data Processing**: Content analysis and transformation
- **System Integration**: Interface with external information systems
- **Audit and Logging**: Comprehensive operation tracking
## Architecture
The OSIS runner uses a synchronous architecture that:
1. Connects to Redis for job queue management
2. Initializes SQLite database for local state management
3. Creates a Rhai engine with OSIS modules registered
4. Processes jobs sequentially in FIFO order
5. Optionally preserves task history for debugging
6. Handles graceful shutdown on SIGINT
## Synchronous vs Asynchronous
Unlike the SAL runner, the OSIS runner processes jobs synchronously:
- **Sequential Processing**: Jobs are processed one at a time
- **Deterministic Order**: Ensures predictable execution sequence
- **Resource Safety**: Prevents resource conflicts in data operations
- **Debugging Friendly**: Easier to trace and debug job execution
## Database Schema
The runner maintains a SQLite database with the following structure:
- **Jobs Table**: Active and completed job records
- **Task History**: Optional preservation of task execution details
- **Metadata**: Runner configuration and state information
## Error Handling
The runner provides detailed error messages for:
- Redis connection failures
- Database initialization and access problems
- Script execution errors
- Resource cleanup issues
- Shutdown sequence problems
## Logging
Set the `RUST_LOG` environment variable to control logging levels:
```bash
RUST_LOG=info cargo run --bin runner_osis -- myrunner
```
Available log levels: `error`, `warn`, `info`, `debug`, `trace`
## Task Preservation
When `--preserve-tasks` is enabled:
- Completed tasks remain in the database
- Useful for debugging and audit trails
- May require periodic cleanup for long-running instances
- Increases database size over time
## Use Cases
The OSIS runner is ideal for:
- Data processing pipelines requiring strict ordering
- Information system operations with dependencies
- Batch processing jobs that must complete sequentially
- Debugging scenarios where task history is important
- Operations requiring transactional consistency

View File

@@ -1,14 +0,0 @@
use rhai::Engine;
/// Create a new OSIRIS engine instance.
///
/// This simply delegates to osiris::rhai::create_osiris_engine which:
/// - Registers all OSIRIS functions (Note, Event, etc.)
/// - Sets up HeroDB context management
/// - Configures the Rhai engine for OSIRIS scripts
pub fn create_osis_engine() -> Engine {
// Use the osiris engine creation - it handles everything
osiris::rhai::create_osiris_engine("default_owner", "redis://localhost:6379", 1)
.expect("Failed to create OSIRIS engine")
.0 // Return just the engine, not the scope
}

View File

@@ -1,107 +0,0 @@
use runner_rust::{spawn_sync_runner, script_mode::execute_script_mode};
use clap::Parser;
use log::{error, info};
use tokio::sync::mpsc;
mod engine;
use engine::create_osis_engine;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Runner ID
runner_id: String,
/// Database path
#[arg(short, long, default_value = "/tmp/osis.db")]
db_path: String,
/// Redis URL
#[arg(short = 'r', long, default_value = "redis://localhost:6379")]
redis_url: String,
/// Preserve tasks after completion
#[arg(short, long, default_value_t = false)]
preserve_tasks: bool,
/// Script to execute in single-job mode (optional)
#[arg(short, long)]
script: Option<String>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Initialize logging
env_logger::init();
let args = Args::parse();
// Check if we're in script mode
if let Some(script_content) = args.script {
info!("Running in script mode with runner ID: {}", args.runner_id);
let result = execute_script_mode(
&script_content,
&args.runner_id,
args.redis_url,
std::time::Duration::from_secs(300), // Default timeout for OSIS
create_osis_engine,
).await;
match result {
Ok(output) => {
println!("Script execution result:\n{}", output);
return Ok(());
}
Err(e) => {
error!("Script execution failed: {}", e);
return Err(e);
}
}
}
info!("Starting OSIS Sync Runner with ID: {}", args.runner_id);
info!("Database path: {}", args.db_path);
info!("Redis URL: {}", args.redis_url);
info!("Preserve tasks: {}", args.preserve_tasks);
// Create shutdown channel
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
// Setup signal handling for graceful shutdown
let shutdown_tx_clone = shutdown_tx.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.expect("Failed to listen for ctrl+c");
info!("Received Ctrl+C, initiating shutdown...");
let _ = shutdown_tx_clone.send(()).await;
});
// Spawn the sync runner with engine factory
let runner_handle = spawn_sync_runner(
args.runner_id.clone(),
args.db_path,
args.redis_url,
shutdown_rx,
args.preserve_tasks,
create_osis_engine,
);
info!("OSIS Sync Runner '{}' started successfully", args.runner_id);
// Wait for the runner to complete
match runner_handle.await {
Ok(Ok(())) => {
info!("OSIS Sync Runner '{}' shut down successfully", args.runner_id);
}
Ok(Err(e)) => {
error!("OSIS Sync Runner '{}' encountered an error: {}", args.runner_id, e);
return Err(e);
}
Err(e) => {
error!("Failed to join OSIS Sync Runner '{}' task: {}", args.runner_id, e);
return Err(Box::new(e));
}
}
Ok(())
}

View File

@@ -312,6 +312,21 @@ impl Client {
Ok(result) Ok(result)
} }
/// Get job result from Redis
pub async fn get_error(
&self,
job_id: &str,
) -> Result<Option<String>, JobError> {
let job_key = self.job_key(job_id);
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
let result: Option<String> = conn.hget(&job_key, "error").await
.map_err(|e| JobError::Redis(e))?;
Ok(result)
}
/// Get a job ID from the work queue (blocking pop) /// Get a job ID from the work queue (blocking pop)
pub async fn get_job_id(&self, queue_key: &str) -> Result<Option<String>, JobError> { pub async fn get_job_id(&self, queue_key: &str) -> Result<Option<String>, JobError> {
let mut conn = self.redis_client let mut conn = self.redis_client

View File

@@ -9,9 +9,7 @@ use crate::runner_trait::Runner;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct SyncRunnerConfig { pub struct SyncRunnerConfig {
pub runner_id: String, pub runner_id: String,
pub db_path: String,
pub redis_url: String, pub redis_url: String,
pub preserve_tasks: bool,
} }
/// Synchronous runner that processes jobs sequentially /// Synchronous runner that processes jobs sequentially
@@ -39,11 +37,9 @@ impl SyncRunner {
fn execute_job_with_engine( fn execute_job_with_engine(
engine: &mut Engine, engine: &mut Engine,
job: &Job, job: &Job,
db_path: &str,
) -> Result<Dynamic, Box<rhai::EvalAltResult>> { ) -> Result<Dynamic, Box<rhai::EvalAltResult>> {
// Set up job context in the engine // Set up job context in the engine
let mut db_config = rhai::Map::new(); let mut db_config = rhai::Map::new();
db_config.insert("DB_PATH".into(), db_path.to_string().into());
db_config.insert("CALLER_ID".into(), job.caller_id.clone().into()); db_config.insert("CALLER_ID".into(), job.caller_id.clone().into());
db_config.insert("CONTEXT_ID".into(), job.context_id.clone().into()); db_config.insert("CONTEXT_ID".into(), job.context_id.clone().into());
@@ -53,12 +49,6 @@ impl SyncRunner {
job.signatures.iter() job.signatures.iter()
.map(|sig| Dynamic::from(sig.public_key.clone())) .map(|sig| Dynamic::from(sig.public_key.clone()))
.collect() .collect()
} else if let Some(sig_json) = job.env_vars.get("SIGNATORIES") {
// Fall back to SIGNATORIES from env_vars (for backward compatibility)
match serde_json::from_str::<Vec<String>>(sig_json) {
Ok(sigs) => sigs.into_iter().map(Dynamic::from).collect(),
Err(_) => Vec::new(),
}
} else { } else {
Vec::new() Vec::new()
}; };
@@ -87,7 +77,7 @@ impl Runner for SyncRunner {
let mut engine = (self.engine_factory)(); let mut engine = (self.engine_factory)();
// Execute the script // Execute the script
match Self::execute_job_with_engine(&mut engine, &job, &self.config.db_path) { match Self::execute_job_with_engine(&mut engine, &job) {
Ok(result) => { Ok(result) => {
let output_str = if result.is::<String>() { let output_str = if result.is::<String>() {
result.into_string().unwrap() result.into_string().unwrap()
@@ -121,10 +111,8 @@ impl Runner for SyncRunner {
/// Convenience function to spawn a synchronous runner using the trait interface /// Convenience function to spawn a synchronous runner using the trait interface
pub fn spawn_sync_runner<F>( pub fn spawn_sync_runner<F>(
runner_id: String, runner_id: String,
db_path: String,
redis_url: String, redis_url: String,
shutdown_rx: tokio::sync::mpsc::Receiver<()>, shutdown_rx: tokio::sync::mpsc::Receiver<()>,
preserve_tasks: bool,
engine_factory: F, engine_factory: F,
) -> tokio::task::JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> ) -> tokio::task::JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>>
where where
@@ -132,9 +120,7 @@ where
{ {
let config = SyncRunnerConfig { let config = SyncRunnerConfig {
runner_id, runner_id,
db_path,
redis_url, redis_url,
preserve_tasks,
}; };
let runner = Arc::new(SyncRunner::new(config, engine_factory)); let runner = Arc::new(SyncRunner::new(config, engine_factory));