feat: reorganize examples and add signature support to JobBuilder

- Reorganized examples into osiris/, sal/, and utils/ folders
- Moved hardcoded scripts to separate .rhai files
- Added signature() method to JobBuilder for job signing
- Updated OSIRIS context to use block_in_place instead of runtime
- Removed runtime field from OsirisContext
- Added typed save() methods for Note and Event objects
- Updated all examples to use new structure and APIs
This commit is contained in:
Timur Gordon
2025-10-27 13:49:39 +01:00
parent 90754cc4ac
commit 14ff6cae67
92 changed files with 904 additions and 268 deletions

117
src/bin/runner_osiris.rs Normal file
View File

@@ -0,0 +1,117 @@
use runner_rust::{spawn_sync_runner, script_mode::execute_script_mode};
use clap::Parser;
use log::{error, info};
use tokio::sync::mpsc;
use rhai::Engine;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Runner ID
runner_id: String,
/// Database path
#[arg(short, long, default_value = "/tmp/osis.db")]
db_path: String,
/// Redis URL
#[arg(short = 'r', long, default_value = "redis://localhost:6379")]
redis_url: String,
/// Preserve tasks after completion
#[arg(short, long, default_value_t = false)]
preserve_tasks: bool,
/// Script to execute in single-job mode (optional)
#[arg(short, long)]
script: Option<String>,
}
/// Create a new OSIRIS engine instance.
///
/// This creates an engine with dynamic context management via get_context():
/// - Registers all OSIRIS functions (Note, Event, etc.)
/// - Sets up get_context() for participant-based access control
/// - Configures the Rhai engine for OSIRIS scripts
fn create_osis_engine() -> Engine {
// Use the engine with manager for dynamic context creation
osiris::rhai::create_osiris_engine_with_manager("redis://localhost:6379", 1)
.expect("Failed to create OSIRIS engine")
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Initialize logging
env_logger::init();
let args = Args::parse();
// Check if we're in script mode
if let Some(script_content) = args.script {
info!("Running in script mode with runner ID: {}", args.runner_id);
let result = execute_script_mode(
&script_content,
&args.runner_id,
args.redis_url,
std::time::Duration::from_secs(300), // Default timeout for OSIS
create_osis_engine,
).await;
match result {
Ok(output) => {
println!("Script execution result:\n{}", output);
return Ok(());
}
Err(e) => {
error!("Script execution failed: {}", e);
return Err(e);
}
}
}
info!("Starting OSIS Sync Runner with ID: {}", args.runner_id);
info!("Database path: {}", args.db_path);
info!("Redis URL: {}", args.redis_url);
info!("Preserve tasks: {}", args.preserve_tasks);
// Create shutdown channel
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
// Setup signal handling for graceful shutdown
let shutdown_tx_clone = shutdown_tx.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.expect("Failed to listen for ctrl+c");
info!("Received Ctrl+C, initiating shutdown...");
let _ = shutdown_tx_clone.send(()).await;
});
// Spawn the sync runner with engine factory
let runner_handle = spawn_sync_runner(
args.runner_id.clone(),
args.db_path,
args.redis_url,
shutdown_rx,
args.preserve_tasks,
create_osis_engine,
);
info!("OSIS Sync Runner '{}' started successfully", args.runner_id);
// Wait for the runner to complete
match runner_handle.await {
Ok(Ok(())) => {
info!("OSIS Sync Runner '{}' shut down successfully", args.runner_id);
}
Ok(Err(e)) => {
error!("OSIS Sync Runner '{}' encountered an error: {}", args.runner_id, e);
return Err(e);
}
Err(e) => {
error!("Failed to join OSIS Sync Runner '{}' task: {}", args.runner_id, e);
return Err(Box::new(e));
}
}
Ok(())
}

View File

@@ -1,123 +1,14 @@
use rhai::Engine;
use rhailib_dsl;
use std::sync::{Arc, OnceLock};
/// Engine factory for creating and sharing Rhai engines with DSL modules.
pub struct EngineFactory {
engine: Arc<Engine>,
}
impl EngineFactory {
/// Create a new engine factory with a configured Rhai engine.
pub fn new() -> Self {
let mut engine = Engine::new();
register_dsl_modules(&mut engine);
// Logger
hero_logger::rhai_integration::configure_rhai_logging(&mut engine, "osis_runner");
Self {
engine: Arc::new(engine),
}
}
/// Get a shared reference to the engine.
pub fn get_engine(&self) -> Arc<Engine> {
Arc::clone(&self.engine)
}
/// Get the global singleton engine factory.
pub fn global() -> &'static EngineFactory {
static FACTORY: OnceLock<EngineFactory> = OnceLock::new();
FACTORY.get_or_init(|| EngineFactory::new())
}
}
/// Register basic object functions directly in the engine.
/// This provides object functionality without relying on the problematic rhailib_dsl object module.
fn register_object_functions(engine: &mut Engine) {
use heromodels::models::object::Object;
// Register the Object type
engine.register_type_with_name::<Object>("Object");
// Register constructor function
engine.register_fn("new_object", || Object::new());
// Register setter functions
engine.register_fn("object_title", |obj: &mut Object, title: String| {
obj.title = title;
obj.clone()
});
engine.register_fn(
"object_description",
|obj: &mut Object, description: String| {
obj.description = description;
obj.clone()
},
);
// Register getter functions
engine.register_fn("get_object_id", |obj: &mut Object| obj.id() as i64);
engine.register_fn("get_object_title", |obj: &mut Object| obj.title.clone());
engine.register_fn("get_object_description", |obj: &mut Object| {
obj.description.clone()
});
}
/// Registers all DSL modules with the provided Rhai engine.
///
/// This function is the main entry point for integrating the rhailib DSL with a Rhai engine.
/// It registers all business domain modules, making their functions available to Rhai scripts.
///
/// # Arguments
///
/// * `engine` - A mutable reference to the Rhai engine to register modules with
///
/// # Registered Modules
///
/// This function registers the following domain modules:
/// - Access control functions
/// - Business operation functions (companies, products, sales, shareholders)
/// - Calendar and scheduling functions
/// - Circle and community management functions
/// - Company management functions
/// - Contact management functions
/// - Core utility functions
/// - Financial operation functions (accounts, assets, marketplace)
/// - Workflow management functions (flows, steps, signatures)
/// - Library and content management functions
/// - Generic object manipulation functions (custom implementation)
pub fn register_dsl_modules(engine: &mut Engine) {
rhailib_dsl::access::register_access_rhai_module(engine);
rhailib_dsl::biz::register_biz_rhai_module(engine);
rhailib_dsl::calendar::register_calendar_rhai_module(engine);
rhailib_dsl::circle::register_circle_rhai_module(engine);
rhailib_dsl::company::register_company_rhai_module(engine);
rhailib_dsl::contact::register_contact_rhai_module(engine);
rhailib_dsl::core::register_core_rhai_module(engine);
rhailib_dsl::finance::register_finance_rhai_modules(engine);
// rhailib_dsl::flow::register_flow_rhai_modules(engine);
rhailib_dsl::library::register_library_rhai_module(engine);
// Skip problematic object module for now - can be implemented separately if needed
// rhailib_dsl::object::register_object_fns(engine);
rhailib_dsl::payment::register_payment_rhai_module(engine);
// Register basic object functionality directly
register_object_functions(engine);
println!("Rhailib Domain Specific Language modules registered successfully.");
}
/// Create a new osis engine instance.
/// Create a new OSIRIS engine instance.
///
/// This simply delegates to osiris::rhai::create_osiris_engine which:
/// - Registers all OSIRIS functions (Note, Event, etc.)
/// - Sets up HeroDB context management
/// - Configures the Rhai engine for OSIRIS scripts
pub fn create_osis_engine() -> Engine {
let mut engine = Engine::new();
register_dsl_modules(&mut engine);
hero_logger::rhai_integration::configure_rhai_logging(&mut engine, "osis_runner");
engine
}
/// Create a shared osis engine using the factory.
pub fn create_shared_osis_engine() -> Arc<Engine> {
EngineFactory::global().get_engine()
// Use the osiris engine creation - it handles everything
osiris::rhai::create_osiris_engine("default_owner", "redis://localhost:6379", 1)
.expect("Failed to create OSIRIS engine")
.0 // Return just the engine, not the scope
}

View File

@@ -346,4 +346,97 @@ impl Client {
Ok(())
}
/// Run a job: dispatch it, wait for completion, and return the result
///
/// This is a convenience method that:
/// 1. Stores the job in Redis
/// 2. Dispatches it to the runner's queue
/// 3. Waits for the job to complete (polls status)
/// 4. Returns the result or error
///
/// # Arguments
/// * `job` - The job to run
/// * `runner_name` - The name of the runner to dispatch to
/// * `timeout_secs` - Maximum time to wait for job completion (in seconds)
///
/// # Returns
/// * `Ok(String)` - The job result if successful
/// * `Err(JobError)` - If the job fails, times out, or encounters an error
pub async fn run_job(
&self,
job: &crate::job::Job,
runner_name: &str,
timeout_secs: u64,
) -> Result<String, JobError> {
use tokio::time::{Duration, timeout};
// Store the job in Redis
self.store_job_in_redis(job).await?;
// Dispatch to runner queue
self.dispatch_job(&job.id, runner_name).await?;
// Wait for job to complete with timeout
let result = timeout(
Duration::from_secs(timeout_secs),
self.wait_for_job_completion(&job.id)
).await;
match result {
Ok(Ok(job_result)) => Ok(job_result),
Ok(Err(e)) => Err(e),
Err(_) => Err(JobError::Timeout(format!(
"Job {} did not complete within {} seconds",
job.id, timeout_secs
))),
}
}
/// Wait for a job to complete by polling its status
///
/// This polls the job status every 500ms until it reaches a terminal state
/// (Finished or Error), then returns the result or error.
async fn wait_for_job_completion(&self, job_id: &str) -> Result<String, JobError> {
use tokio::time::{sleep, Duration};
loop {
// Check job status
let status = self.get_status(job_id).await?;
match status {
JobStatus::Finished => {
// Job completed successfully, get the result
let result = self.get_result(job_id).await?;
return result.ok_or_else(|| {
JobError::InvalidData(format!("Job {} finished but has no result", job_id))
});
}
JobStatus::Error => {
// Job failed, get the error message
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
let error_msg: Option<String> = conn
.hget(&self.job_key(job_id), "error")
.await
.map_err(|e| JobError::Redis(e))?;
return Err(JobError::InvalidData(
error_msg.unwrap_or_else(|| format!("Job {} failed with unknown error", job_id))
));
}
JobStatus::Stopping => {
return Err(JobError::InvalidData(format!("Job {} was stopped", job_id)));
}
// Job is still running (Dispatched, WaitingForPrerequisites, Started)
_ => {
// Wait before polling again
sleep(Duration::from_millis(500)).await;
}
}
}
}
}

View File

@@ -2,16 +2,13 @@
///
/// This module provides two different engine configurations:
/// - `system`: SAL modules for system operations (async worker)
/// - `osis`: DSL modules for business operations (sync worker)
/// - `osiris`: DSL modules for business operations (sync worker)
/// - `osis`: OSIRIS engine for business operations (sync worker)
pub mod system;
pub mod osis;
pub mod osiris;
pub use osis::create_osis_engine;
pub use system::create_system_engine;
pub use osiris::{create_osiris_engine, run_osiris_script};
// Re-export common Rhai types for convenience
pub use rhai::{Array, Dynamic, Engine, EvalAltResult, Map};

View File

@@ -208,6 +208,7 @@ pub struct JobBuilder {
executor: String,
timeout: u64, // timeout in seconds
env_vars: HashMap<String, String>,
signatures: Vec<JobSignature>,
}
impl JobBuilder {
@@ -220,6 +221,7 @@ impl JobBuilder {
executor: "".to_string(),
timeout: 300, // 5 minutes default
env_vars: HashMap::new(),
signatures: Vec::new(),
}
}
@@ -277,6 +279,27 @@ impl JobBuilder {
self
}
/// Add a signature (public key and signature)
pub fn signature(mut self, public_key: &str, signature: &str) -> Self {
self.signatures.push(JobSignature {
public_key: public_key.to_string(),
signature: signature.to_string(),
});
self
}
/// Set multiple signatures
pub fn signatures(mut self, signatures: Vec<JobSignature>) -> Self {
self.signatures = signatures;
self
}
/// Clear all signatures
pub fn clear_signatures(mut self) -> Self {
self.signatures.clear();
self
}
/// Build the job
pub fn build(self) -> Result<Job, JobError> {
if self.caller_id.is_empty() {
@@ -305,6 +328,7 @@ impl JobBuilder {
job.timeout = self.timeout;
job.env_vars = self.env_vars;
job.signatures = self.signatures;
Ok(job)
}

View File

@@ -30,7 +30,6 @@ where
// Create the job using JobBuilder
let job = JobBuilder::new()
.caller_id("script_mode")
.context_id("single_job")
.payload(script_content)
.runner(runner_id)
.executor("rhai")

View File

@@ -46,6 +46,24 @@ impl SyncRunner {
db_config.insert("DB_PATH".into(), db_path.to_string().into());
db_config.insert("CALLER_ID".into(), job.caller_id.clone().into());
db_config.insert("CONTEXT_ID".into(), job.context_id.clone().into());
// Extract signatories from job signatures, or fall back to env_vars
let signatories: Vec<Dynamic> = if !job.signatures.is_empty() {
// Use signatures from the job
job.signatures.iter()
.map(|sig| Dynamic::from(sig.public_key.clone()))
.collect()
} else if let Some(sig_json) = job.env_vars.get("SIGNATORIES") {
// Fall back to SIGNATORIES from env_vars (for backward compatibility)
match serde_json::from_str::<Vec<String>>(sig_json) {
Ok(sigs) => sigs.into_iter().map(Dynamic::from).collect(),
Err(_) => Vec::new(),
}
} else {
Vec::new()
};
db_config.insert("SIGNATORIES".into(), Dynamic::from(signatories));
engine.set_default_tag(Dynamic::from(db_config));
debug!("Sync Runner for Context ID '{}': Evaluating script with Rhai engine (job context set).", job.context_id);