diff --git a/Cargo.lock b/Cargo.lock index 4f36fbb..bc14e97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -583,6 +583,14 @@ dependencies = [ "tokio", ] +[[package]] +name = "derive" +version = "0.1.0" +dependencies = [ + "quote", + "syn 1.0.109", +] + [[package]] name = "dirs" version = "4.0.0" @@ -653,17 +661,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" -[[package]] -name = "engine" -version = "0.1.0" -dependencies = [ - "chrono", - "heromodels", - "heromodels-derive", - "heromodels_core", - "rhai", -] - [[package]] name = "env_logger" version = "0.10.2" @@ -2349,11 +2346,13 @@ dependencies = [ "anyhow", "chrono", "criterion", + "derive", "env_logger", "log", "redis", "rhai", "rhai_client", + "rhailib_engine", "rhailib_worker", "serde", "serde_json", @@ -2361,24 +2360,12 @@ dependencies = [ "uuid", ] -[[package]] -name = "rhailib-examples" -version = "0.1.0" -dependencies = [ - "chrono", - "env_logger", - "log", - "rhai", - "rhai_client", - "serde_json", - "tokio", -] - [[package]] name = "rhailib_dsl" version = "0.1.0" dependencies = [ "chrono", + "derive", "heromodels", "heromodels-derive", "heromodels_core", @@ -2389,19 +2376,31 @@ dependencies = [ "tempfile", ] +[[package]] +name = "rhailib_engine" +version = "0.1.0" +dependencies = [ + "chrono", + "heromodels", + "heromodels-derive", + "heromodels_core", + "rhai", + "rhailib_dsl", +] + [[package]] name = "rhailib_worker" version = "0.1.0" dependencies = [ "chrono", "clap", - "engine", "env_logger", "heromodels", "log", "redis", "rhai", "rhai_client", + "rhailib_engine", "serde", "serde_json", "tokio", @@ -2709,6 +2708,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" dependencies = [ "proc-macro2", + "quote", "unicode-ident", ] @@ -3060,11 +3060,11 @@ name = "ui_repl" version = "0.1.0" dependencies = [ "anyhow", - "engine", "heromodels", "log", "rhai", "rhai_client", + "rhailib_engine", "rhailib_worker", "rustyline", "tempfile", diff --git a/Cargo.toml b/Cargo.toml index e777d2d..bd4dc41 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,8 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread", "time", "sync" rhai = "1.21.0" rhailib_worker = { path = "src/worker" } rhai_client = { path = "src/client" } +rhailib_engine = { path = "src/engine" } +derive = { path = "src/derive" } [dev-dependencies] @@ -38,7 +40,6 @@ members = [ "src/worker", "src/monitor", # Added the new monitor package to workspace "src/repl", # Added the refactored REPL package - "examples", - "src/rhai_engine_ui", "src/macros", "src/dsl", + "src/rhai_engine_ui", "src/macros", "src/dsl", "src/derive", ] resolver = "2" # Recommended for new workspaces diff --git a/db/alice_pk/data/0.db b/db/alice_pk/data/0.db new file mode 100644 index 0000000..79888ad Binary files /dev/null and b/db/alice_pk/data/0.db differ diff --git a/db/alice_pk/data/lookup/.inc b/db/alice_pk/data/lookup/.inc new file mode 100644 index 0000000..0aeb548 --- /dev/null +++ b/db/alice_pk/data/lookup/.inc @@ -0,0 +1 @@ +74 \ No newline at end of file diff --git a/db/alice_pk/data/lookup/data b/db/alice_pk/data/lookup/data new file mode 100644 index 0000000..b183cfc Binary files /dev/null and b/db/alice_pk/data/lookup/data differ diff --git a/db/alice_pk/index/0.db b/db/alice_pk/index/0.db new file mode 100644 index 0000000..b1b6961 Binary files /dev/null and b/db/alice_pk/index/0.db differ diff --git a/db/alice_pk/index/lookup/.inc b/db/alice_pk/index/lookup/.inc new file mode 100644 index 0000000..8bc94cb --- /dev/null +++ b/db/alice_pk/index/lookup/.inc @@ -0,0 +1 @@ +276 \ No newline at end of file diff --git a/db/alice_pk/index/lookup/data b/db/alice_pk/index/lookup/data new file mode 100644 index 0000000..4d9572f Binary files /dev/null and b/db/alice_pk/index/lookup/data differ diff --git a/db/auth_worker_circle.db/auth_worker_circle/data/lookup/.inc b/db/auth_worker_circle.db/auth_worker_circle/data/lookup/.inc new file mode 100644 index 0000000..56a6051 --- /dev/null +++ b/db/auth_worker_circle.db/auth_worker_circle/data/lookup/.inc @@ -0,0 +1 @@ +1 \ No newline at end of file diff --git a/db/auth_worker_circle.db/auth_worker_circle/data/lookup/data b/db/auth_worker_circle.db/auth_worker_circle/data/lookup/data new file mode 100644 index 0000000..fe77ee9 Binary files /dev/null and b/db/auth_worker_circle.db/auth_worker_circle/data/lookup/data differ diff --git a/db/auth_worker_circle.db/auth_worker_circle/index/0.db b/db/auth_worker_circle.db/auth_worker_circle/index/0.db new file mode 100644 index 0000000..0ad682e Binary files /dev/null and b/db/auth_worker_circle.db/auth_worker_circle/index/0.db differ diff --git a/db/auth_worker_circle.db/auth_worker_circle/index/lookup/.inc b/db/auth_worker_circle.db/auth_worker_circle/index/lookup/.inc new file mode 100644 index 0000000..d8263ee --- /dev/null +++ b/db/auth_worker_circle.db/auth_worker_circle/index/lookup/.inc @@ -0,0 +1 @@ +2 \ No newline at end of file diff --git a/db/auth_worker_circle.db/auth_worker_circle/index/lookup/data b/db/auth_worker_circle.db/auth_worker_circle/index/lookup/data new file mode 100644 index 0000000..92d764d Binary files /dev/null and b/db/auth_worker_circle.db/auth_worker_circle/index/lookup/data differ diff --git a/examples/Cargo.toml b/examples/Cargo.toml deleted file mode 100644 index 48f6d5c..0000000 --- a/examples/Cargo.toml +++ /dev/null @@ -1,29 +0,0 @@ -[package] -name = "rhailib-examples" -version = "0.1.0" -edition = "2021" -publish = false # This is a package of examples, not meant to be published - -[dependencies] -# Local Rhailib crates -rhai_client = { path = "../src/client" } - -# External dependencies -rhai = "1.18.0" -tokio = { version = "1", features = ["full"] } -log = "0.4" -env_logger = "0.10" -serde_json = "1.0" -chrono = "0.4" - -[[bin]] -name = "example_math_worker" -path = "example_math_worker.rs" - -[[bin]] -name = "example_string_worker" -path = "example_string_worker.rs" - -[[bin]] -name = "dedicated_reply_queue_demo" -path = "dedicated_reply_queue_demo.rs" diff --git a/examples/end_to_end/alice.rhai b/examples/end_to_end/alice.rhai new file mode 100644 index 0000000..a59a65b --- /dev/null +++ b/examples/end_to_end/alice.rhai @@ -0,0 +1,45 @@ +let private_object = new_object() + .title("Alice's Private Object") + .description("This object can only be seen and modified by Alice") + .save_object(); + +let object_shared_with_bob = new_object() + .title("Alice's Shared Object") + .description("This object can be seen by Bob but modified only by Alice") + .save_object(); + +let new_access = new_access() + .object_id(object_shared_with_bob.id()) + .circle_public_key("bob_pk") + .save_access(); + +let book_private = new_book() + .title("Alice's private book") + .description("This book is prive to Alice") + .save_book(); + +let slides_shared = new_slides() + .title("Alice's shared slides") + .description("These slides, despite being in a private collection, are shared with Bob") + .save_slides(); + +let new_access = new_access() + .object_id(slides_shared.id) + .circle_public_key("bob_pk") + .save_access(); + +let collection_private = new_collection() + .title("Alice's private collection") + .description("This collection is only visible to Alice") + .add_book(book_private.id) + .add_slides(slides_shared.id) + .save_collection(); + + +let collection_shared = new_collection() + .title("Alice's shared collection") + .description("This collection is shared with Bob") + .save_collection(); + + + diff --git a/examples/end_to_end/auth_script.rhai b/examples/end_to_end/auth_script.rhai deleted file mode 100644 index e6facc1..0000000 --- a/examples/end_to_end/auth_script.rhai +++ /dev/null @@ -1,6 +0,0 @@ -// auth_script.rhai -// This script calls a custom registered function 'check_permission' -// and passes the CALLER_PUBLIC_KEY to it. -// CALLER_PUBLIC_KEY is injected into the script's scope by the rhailib_worker. - -check_permission(CALLER_PUBLIC_KEY) diff --git a/examples/end_to_end/bob.rhai b/examples/end_to_end/bob.rhai new file mode 100644 index 0000000..8df6954 --- /dev/null +++ b/examples/end_to_end/bob.rhai @@ -0,0 +1,16 @@ +let private_object = new_object() + .title("Alice's Private Object") + .description("This object can only be seen and modified by Alice") + .save_object(); + +let object_shared_with_bob = new_object() + .title("Alice's Shared Collection") + .description("This object can be seen by Bob but modified only by Alice") + .save_object(); + +let new_access = new_access() + .object_id(object_shared_with_bob.id()) + .circle_public_key("bob_pk") + .save_access(); + + diff --git a/examples/end_to_end/charlie.rhai b/examples/end_to_end/charlie.rhai new file mode 100644 index 0000000..8df6954 --- /dev/null +++ b/examples/end_to_end/charlie.rhai @@ -0,0 +1,16 @@ +let private_object = new_object() + .title("Alice's Private Object") + .description("This object can only be seen and modified by Alice") + .save_object(); + +let object_shared_with_bob = new_object() + .title("Alice's Shared Collection") + .description("This object can be seen by Bob but modified only by Alice") + .save_object(); + +let new_access = new_access() + .object_id(object_shared_with_bob.id()) + .circle_public_key("bob_pk") + .save_access(); + + diff --git a/examples/end_to_end/main.rs b/examples/end_to_end/main.rs index 222c80d..a58c3fb 100644 --- a/examples/end_to_end/main.rs +++ b/examples/end_to_end/main.rs @@ -1,137 +1,84 @@ use rhai::{Engine, EvalAltResult}; -use rhai_client::RhaiClient; +use rhai_client::RhaiClientBuilder; +use rhailib_engine::create_heromodels_engine; use rhailib_worker::spawn_rhai_worker; use std::{fs, path::Path, time::Duration}; use tokio::sync::mpsc; use uuid::Uuid; -// Custom Rhai function for authorization -// It takes the caller's public key as an argument. -fn check_permission(caller_pk: String) -> Result> { - log::info!("check_permission called with PK: {}", caller_pk); - if caller_pk == "admin_pk" { - Ok("Access Granted: Welcome Admin!".to_string()) - } else if caller_pk == "user_pk" { - Ok("Limited Access: Welcome User!".to_string()) - } else { - Ok(format!("Access Denied: Unknown public key '{}'", caller_pk)) - } -} +const ALICE_ID: &str = "alice_pk"; +const BOB_ID: &str = "bob_pk"; +const CHARLIE_ID: &str = "charlie_pk"; +const REDIS_URL: &str = "redis://127.0.0.1/"; +const DB_DIRECTORY: &str = "./db"; #[tokio::main] async fn main() -> Result<(), Box> { env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); - - let redis_url = "redis://127.0.0.1/"; - let worker_circle_pk = "auth_worker_circle".to_string(); - + // 1. Create a Rhai engine and register custom functionality - let mut engine = Engine::new(); - engine.register_fn("check_permission", check_permission); - log::info!("Custom 'check_permission' function registered with Rhai engine."); + let mut engine = rhailib_engine::create_heromodels_engine(); // 2. Spawn the Rhai worker let (shutdown_tx, shutdown_rx) = mpsc::channel(1); let worker_handle = tokio::spawn(spawn_rhai_worker( - 0, // worker_id - worker_circle_pk.clone(), + ALICE_ID.to_string(), + DB_DIRECTORY.to_string(), engine, - redis_url.to_string(), + REDIS_URL.to_string(), shutdown_rx, false, // use_sentinel )); - log::info!("Rhai worker spawned for circle: {}", worker_circle_pk); + + log::info!("Rhai worker spawned for circle: {}", ALICE_ID); // Give the worker a moment to start up tokio::time::sleep(Duration::from_secs(1)).await; - // 3. Create a Rhai client - let client = RhaiClient::new(redis_url)?; - log::info!("Rhai client created."); + // Alice populates her rhai worker + let client_alice = RhaiClientBuilder::new() + .redis_url(REDIS_URL) + .caller_id(ALICE_ID) + .build() + .unwrap(); - // 4. Load the Rhai script content - let script_path_str = "examples/end_to_end/auth_script.rhai"; // Relative to Cargo.toml / rhailib root - let script_content = match fs::read_to_string(script_path_str) { - Ok(content) => content, - Err(e) => { - log::error!("Failed to read script file '{}': {}", script_path_str, e); - // Attempt to read from an alternative path if run via `cargo run --example` - // where current dir might be the crate root. - let alt_script_path = Path::new(file!()) - .parent() - .unwrap() - .join("auth_script.rhai"); - log::info!("Attempting alternative script path: {:?}", alt_script_path); - fs::read_to_string(&alt_script_path)? - } - }; - log::info!("Loaded script content from '{}'", script_path_str); + client_alice.new_play_request() + .recipient_id(&ALICE_ID) + .script_path("examples/end_to_end/alice.rhai") + .timeout(Duration::from_secs(10)) + .await_response().await.unwrap(); + + log::info!("Alice's database populated."); - // Define different caller public keys - let admin_caller_pk = "admin_pk".to_string(); - let user_caller_pk = "user_pk".to_string(); - let unknown_caller_pk = "unknown_pk".to_string(); + // Bob queries Alice's rhai worker + let client_bob = RhaiClientBuilder::new() + .redis_url(REDIS_URL) + .caller_id(BOB_ID) + .build() + .unwrap(); + + client_bob.new_play_request() + .recipient_id(&ALICE_ID) + .script_path("examples/end_to_end/bob.rhai") + .timeout(Duration::from_secs(10)) + .await_response().await.unwrap(); + + log::info!("Bob's query to Alice's database completed."); - let callers = vec![ - ("Admin", admin_caller_pk), - ("User", user_caller_pk), - ("Unknown", unknown_caller_pk), - ]; - - for (caller_name, caller_pk) in callers { - let task_id = Uuid::new_v4().to_string(); - log::info!( - "Submitting script for caller '{}' (PK: {}) with task_id: {}", - caller_name, - caller_pk, - task_id - ); - - match client - .submit_script_and_await_result( - &worker_circle_pk, - task_id.clone(), // task_id (UUID) first - script_content.clone(), // script_content second - Duration::from_secs(10), - Some(caller_pk.clone()), // This is the CALLER_PUBLIC_KEY - ) - .await - { - Ok(details) => { - log::info!( - "Task {} for caller '{}' (PK: {}) completed. Status: {}, Output: {:?}, Error: {:?}", - task_id, - caller_name, - caller_pk, - details.status, - details.output, - details.error - ); - // Basic assertion for expected output - if caller_pk == "admin_pk" { - assert_eq!( - details.output, - Some("Access Granted: Welcome Admin!".to_string()) - ); - } else if caller_pk == "user_pk" { - assert_eq!( - details.output, - Some("Limited Access: Welcome User!".to_string()) - ); - } - } - Err(e) => { - log::error!( - "Task {} for caller '{}' (PK: {}) failed: {}", - task_id, - caller_name, - caller_pk, - e - ); - } - } - tokio::time::sleep(Duration::from_millis(100)).await; // Small delay between submissions - } + // Charlie queries Alice's rhai worker + let client_charlie = RhaiClientBuilder::new() + .redis_url(REDIS_URL) + .caller_id(CHARLIE_ID) + .build() + .unwrap(); + + client_charlie.new_play_request() + .recipient_id(&ALICE_ID) + .script_path("examples/end_to_end/charlie.rhai") + .timeout(Duration::from_secs(10)) + .await_response().await.unwrap(); + + log::info!("Charlie's query to Alice's database completed."); // 5. Shutdown the worker (optional, could also let it run until program exits) log::info!("Signaling worker to shutdown..."); diff --git a/examples/end_to_end/query.rhai b/examples/end_to_end/query.rhai new file mode 100644 index 0000000..8df6954 --- /dev/null +++ b/examples/end_to_end/query.rhai @@ -0,0 +1,16 @@ +let private_object = new_object() + .title("Alice's Private Object") + .description("This object can only be seen and modified by Alice") + .save_object(); + +let object_shared_with_bob = new_object() + .title("Alice's Shared Collection") + .description("This object can be seen by Bob but modified only by Alice") + .save_object(); + +let new_access = new_access() + .object_id(object_shared_with_bob.id()) + .circle_public_key("bob_pk") + .save_access(); + + diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index 4ec2c5c..ff7af4b 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -2,12 +2,11 @@ use chrono::Utc; use log::{debug, error, info, warn}; // Added error use redis::AsyncCommands; use serde::{Deserialize, Serialize}; +use tokio::time::timeout; use std::time::Duration; // Duration is still used, Instant and sleep were removed use uuid::Uuid; -const REDIS_TASK_DETAILS_PREFIX: &str = "rhai_task_details:"; -const REDIS_QUEUE_PREFIX: &str = "rhai_tasks:"; -const REDIS_REPLY_QUEUE_PREFIX: &str = "rhai_reply:"; +const NAMESPACE_PREFIX: &str = "rhailib:"; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct RhaiTaskDetails { @@ -23,10 +22,8 @@ pub struct RhaiTaskDetails { pub created_at: chrono::DateTime, #[serde(rename = "updatedAt")] pub updated_at: chrono::DateTime, - // reply_to_queue: Option is removed from the struct. - // It's passed to submit_script_to_worker_queue if needed and stored in Redis directly. - #[serde(rename = "publicKey")] - pub public_key: Option, + #[serde(rename = "callerId")] + pub caller_id: String, } #[derive(Debug)] @@ -68,60 +65,172 @@ impl std::error::Error for RhaiClientError {} pub struct RhaiClient { redis_client: redis::Client, + caller_id: String, +} + +pub struct RhaiClientBuilder { + redis_url: Option, + caller_id: String, +} + +impl RhaiClientBuilder { + pub fn new() -> Self { + Self { redis_url: None, caller_id: "".to_string() } + } + + pub fn caller_id(mut self, caller_id: &str) -> Self { + self.caller_id = caller_id.to_string(); + self + } + + pub fn redis_url(mut self, url: &str) -> Self { + self.redis_url = Some(url.to_string()); + self + } + + pub fn build(self) -> Result { + let url = self.redis_url.unwrap_or_else(|| "redis://127.0.0.1/".to_string()); + let client = redis::Client::open(url)?; + if self.caller_id.is_empty() { + return Err(RhaiClientError::RedisError(redis::RedisError::from((redis::ErrorKind::InvalidClientConfig, "Caller ID is empty")))); + } + Ok(RhaiClient { redis_client: client, caller_id: self.caller_id }) + } +} + +pub struct PlayRequest { + id: String, + recipient_id: String, + script: String, + timeout: Duration +} + +pub struct PlayRequestBuilder<'a> { + client: &'a RhaiClient, + request_id: String, + recipient_id: String, + script: String, + timeout: Duration +} + +impl<'a> PlayRequestBuilder<'a> { + pub fn new(client: &'a RhaiClient) -> Self { + Self { + client, + request_id: "".to_string(), + recipient_id: "".to_string(), + script: "".to_string(), + timeout: Duration::from_secs(10), + } + } + + pub fn request_id(mut self, request_id: &str) -> Self { + self.request_id = request_id.to_string(); + self + } + + pub fn recipient_id(mut self, recipient_id: &str) -> Self { + self.recipient_id = recipient_id.to_string(); + self + } + + pub fn script(mut self, script: &str) -> Self { + self.script = script.to_string(); + self + } + + pub fn script_path(mut self, script_path: &str) -> Self { + self.script = std::fs::read_to_string(script_path).unwrap(); + self + } + + pub fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } + + pub fn submit(self) -> Result<(), RhaiClientError> { + let request_id = if self.request_id.is_empty() { + // Generate a UUID for the request_id + Uuid::new_v4().to_string() + } else { + self.request_id.clone() + }; + // Build the request and submit using self.client + println!("Submitting request {} with timeout {:?}", self.request_id, self.timeout); + self.client.submit_play_request( + &PlayRequest { + id: request_id, + recipient_id: self.recipient_id.clone(), + script: self.script.clone(), + timeout: self.timeout, + } + ); + Ok(()) + } + + pub async fn await_response(self) -> Result { + let request_id = if self.request_id.is_empty() { + // Generate a UUID for the request_id + Uuid::new_v4().to_string() + } else { + self.request_id.clone() + }; + // Build the request and submit using self.client + println!("Awaiting response for request {} with timeout {:?}", self.request_id, self.timeout); + let result = self.client.submit_play_request_and_await_result( + &PlayRequest { + id: request_id, + recipient_id: self.recipient_id.clone(), + script: self.script.clone(), + timeout: self.timeout, + } + ).await; + result + } } impl RhaiClient { - pub fn new(redis_url: &str) -> Result { - let client = redis::Client::open(redis_url)?; - Ok(Self { - redis_client: client, - }) + pub fn new_play_request(&self) -> PlayRequestBuilder { + PlayRequestBuilder::new(self) } // Internal helper to submit script details and push to work queue - async fn submit_script_to_worker_queue( + async fn submit_play_request_using_connection( &self, conn: &mut redis::aio::MultiplexedConnection, - circle_name: &str, - task_id: &str, // This is the main task_id - script: String, - // client_rpc_id: Option is removed - reply_to_queue_name: Option, // Still needed to tell the worker where to reply, if applicable - public_key: Option, + play_request: &PlayRequest, ) -> Result<(), RhaiClientError> { let now = Utc::now(); - - let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id); + + let task_key = format!( + "{}{}", + NAMESPACE_PREFIX, + play_request.id + ); + let worker_queue_key = format!( "{}{}", - REDIS_QUEUE_PREFIX, - circle_name.replace(" ", "_").to_lowercase() + NAMESPACE_PREFIX, + play_request.recipient_id.replace(" ", "_").to_lowercase() ); debug!( - "Preparing task_id: {} for circle: {} to worker_queue: {}. Script: {}, replyToQueue: {:?}, publicKey: {:?}", - task_id, circle_name, worker_queue_key, script, reply_to_queue_name, public_key + "Submitting play request: {} to worker: {} with namespace prefix: {}", + play_request.id, + play_request.recipient_id, + NAMESPACE_PREFIX ); let mut hset_args: Vec<(String, String)> = vec![ - ("taskId".to_string(), task_id.to_string()), // Add taskId - ("script".to_string(), script), // script is moved here + ("taskId".to_string(), play_request.id.to_string()), // Add taskId + ("script".to_string(), play_request.script.clone()), // script is moved here + ("callerId".to_string(), self.caller_id.clone()), // script is moved here ("status".to_string(), "pending".to_string()), ("createdAt".to_string(), now.to_rfc3339()), ("updatedAt".to_string(), now.to_rfc3339()), ]; - // clientRpcId field and its corresponding hset_args logic are removed. - - if let Some(queue_name) = &reply_to_queue_name { - // Use the passed parameter - hset_args.push(("replyToQueue".to_string(), queue_name.clone())); - } - if let Some(pk) = &public_key { - // Use the passed parameter - hset_args.push(("publicKey".to_string(), pk.clone())); - } - // Ensure hset_args is a slice of tuples (String, String) // The redis crate's hset_multiple expects &[(K, V)] // conn.hset_multiple::<_, String, String, ()>(&task_key, &hset_args).await?; @@ -134,38 +243,122 @@ impl RhaiClient { // lpush also infers its types, RV is typically i64 (length of list) or () depending on exact command variant // For `redis::AsyncCommands::lpush`, it's `RedisResult` where R: FromRedisValue // Often this is the length of the list. Let's allow inference or specify if needed. - let _: redis::RedisResult = conn.lpush(&worker_queue_key, task_id).await; + let _: redis::RedisResult = conn.lpush(&worker_queue_key, play_request.id.clone()).await; Ok(()) } - // Public method for fire-and-forget submission (doesn't wait for result) - pub async fn submit_script( + // Internal helper to await response from worker + async fn await_response_from_connection( + &self, + conn: &mut redis::aio::MultiplexedConnection, + task_key: &String, + reply_queue_key: &String, + timeout: Duration + ) -> Result { + // BLPOP on the reply queue + // The timeout for BLPOP is in seconds (integer) + let blpop_timeout_secs = timeout.as_secs().max(1); // Ensure at least 1 second for BLPOP timeout + + match conn + .blpop::<&String, Option<(String, String)>>( + reply_queue_key, + blpop_timeout_secs as f64, + ) + .await + { + Ok(Some((_queue, result_message_str))) => { + // Attempt to deserialize the result message into RhaiTaskDetails or a similar structure + // For now, we assume the worker sends back a JSON string of RhaiTaskDetails + // or at least status, output, error. + // Let's refine what the worker sends. For now, assume it's a simplified result. + // The worker should ideally send a JSON string that can be parsed into RhaiTaskDetails. + // For this example, let's assume the worker sends a JSON string of a simplified result structure. + // A more robust approach would be for the worker to send the full RhaiTaskDetails (or relevant parts) + // and the client deserializes that. + // For now, let's assume the worker sends a JSON string of RhaiTaskDetails. + match serde_json::from_str::(&result_message_str) { + Ok(details) => { + info!("Task {} finished with status: {}", details.task_id, details.status); + // Optionally, delete the reply queue + let _: redis::RedisResult = conn.del(&reply_queue_key).await; + Ok(details) + } + Err(e) => { + error!("Failed to deserialize result message from reply queue: {}", e); + // Optionally, delete the reply queue + let _: redis::RedisResult = conn.del(&reply_queue_key).await; + Err(RhaiClientError::SerializationError(e)) + } + } + } + Ok(None) => { + // BLPOP timed out + warn!( + "Timeout waiting for result on reply queue {} for task {}", + reply_queue_key, task_key + ); + // Optionally, delete the reply queue + let _: redis::RedisResult = conn.del(&reply_queue_key).await; + Err(RhaiClientError::Timeout(task_key.clone())) + } + Err(e) => { + // Redis error + error!( + "Redis error on BLPOP for reply queue {}: {}", + reply_queue_key, e + ); + // Optionally, delete the reply queue + let _: redis::RedisResult = conn.del(&reply_queue_key).await; + Err(RhaiClientError::RedisError(e)) + } + } + } + + // New method using dedicated reply queue + pub async fn submit_play_request( &self, - circle_name: &str, - script: String, - public_key: Option, - ) -> Result { + play_request: &PlayRequest, + ) -> Result<(), RhaiClientError> { let mut conn = self.redis_client.get_multiplexed_async_connection().await?; - let task_id = Uuid::new_v4().to_string(); // task_id is generated here for fire-and-forget - debug!( - "Client submitting script (fire-and-forget) with new task_id: {} to circle: {}", - task_id, circle_name - ); - - self.submit_script_to_worker_queue( + self.submit_play_request_using_connection( &mut conn, - circle_name, - &task_id, - script, - // client_rpc_id argument removed - None, // No dedicated reply queue for fire-and-forget - public_key, + &play_request // Pass the task_id parameter + ) + .await?; + Ok(()) + } + + // New method using dedicated reply queue + pub async fn submit_play_request_and_await_result( + &self, + play_request: &PlayRequest, + ) -> Result { + let mut conn = self.redis_client.get_multiplexed_async_connection().await?; + + let reply_queue_key = format!("{}:reply:{}", NAMESPACE_PREFIX, play_request.id); // Derived from the passed task_id + + self.submit_play_request_using_connection( + &mut conn, + &play_request // Pass the task_id parameter ) .await?; - Ok(task_id) + info!( + "Task {} submitted. Waiting for result on queue {} with timeout {:?}...", + play_request.id, // This is the UUID + reply_queue_key, + play_request.timeout + ); + + self.await_response_from_connection( + &mut conn, + &play_request.id, + &reply_queue_key, + play_request.timeout + ) + .await } // Optional: A method to check task status, similar to what circle_server_ws polling does. @@ -175,7 +368,7 @@ impl RhaiClient { task_id: &str, ) -> Result, RhaiClientError> { let mut conn = self.redis_client.get_multiplexed_async_connection().await?; - let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id); + let task_key = format!("{}{}", NAMESPACE_PREFIX, task_id); let result_map: Option> = conn.hgetall(&task_key).await?; @@ -211,7 +404,7 @@ impl RhaiClient { Utc::now() }), // reply_to_queue is no longer a field in RhaiTaskDetails (it's stored in Redis but not in this struct) - public_key: map.get("publicKey").cloned(), + caller_id: map.get("callerId").cloned().expect("callerId field missing from Redis hash"), }; // It's important to also check if the 'taskId' field exists in the map and matches the input task_id // for data integrity, though the struct construction above uses the input task_id directly. @@ -227,96 +420,6 @@ impl RhaiClient { None => Ok(None), } } - - // New method using dedicated reply queue - pub async fn submit_script_and_await_result( - &self, - circle_name: &str, - task_id: String, // task_id is now a mandatory parameter provided by the caller - script: String, - timeout: Duration, - public_key: Option, - ) -> Result { - let mut conn = self.redis_client.get_multiplexed_async_connection().await?; - // let task_id = Uuid::new_v4().to_string(); // Removed, task_id is a parameter - let reply_to_queue_name = format!("{}{}", REDIS_REPLY_QUEUE_PREFIX, task_id); // Derived from the passed task_id - - self.submit_script_to_worker_queue( - &mut conn, - circle_name, - &task_id, // Pass the task_id parameter - script, - // client_rpc_id argument removed - Some(reply_to_queue_name.clone()), // Pass the derived reply_to_queue_name - public_key, - ) - .await?; - - info!( - "Task {} submitted. Waiting for result on queue {} with timeout {:?}...", - task_id, // This is the UUID - reply_to_queue_name, - timeout - ); - - // BLPOP on the reply queue - // The timeout for BLPOP is in seconds (integer) - let blpop_timeout_secs = timeout.as_secs().max(1); // Ensure at least 1 second for BLPOP timeout - - match conn - .blpop::<&String, Option<(String, String)>>( - &reply_to_queue_name, - blpop_timeout_secs as f64, - ) - .await - { - Ok(Some((_queue, result_message_str))) => { - // Attempt to deserialize the result message into RhaiTaskDetails or a similar structure - // For now, we assume the worker sends back a JSON string of RhaiTaskDetails - // or at least status, output, error. - // Let's refine what the worker sends. For now, assume it's a simplified result. - // The worker should ideally send a JSON string that can be parsed into RhaiTaskDetails. - // For this example, let's assume the worker sends a JSON string of a simplified result structure. - // A more robust approach would be for the worker to send the full RhaiTaskDetails (or relevant parts) - // and the client deserializes that. - // For now, let's assume the worker sends a JSON string of RhaiTaskDetails. - match serde_json::from_str::(&result_message_str) { - Ok(details) => { - info!("Task {} finished with status: {}", task_id, details.status); - // Optionally, delete the reply queue - let _: redis::RedisResult = conn.del(&reply_to_queue_name).await; - Ok(details) - } - Err(e) => { - error!("Task {}: Failed to deserialize result message from reply queue: {}. Message: {}", task_id, e, result_message_str); - // Optionally, delete the reply queue - let _: redis::RedisResult = conn.del(&reply_to_queue_name).await; - Err(RhaiClientError::SerializationError(e)) - } - } - } - Ok(None) => { - // BLPOP timed out - warn!( - "Timeout waiting for result on reply queue {} for task {}", - reply_to_queue_name, task_id - ); - // Optionally, delete the reply queue - let _: redis::RedisResult = conn.del(&reply_to_queue_name).await; - Err(RhaiClientError::Timeout(task_id)) - } - Err(e) => { - // Redis error - error!( - "Redis error on BLPOP for reply queue {}: {}", - reply_to_queue_name, e - ); - // Optionally, delete the reply queue - let _: redis::RedisResult = conn.del(&reply_to_queue_name).await; - Err(RhaiClientError::RedisError(e)) - } - } - } } #[cfg(test)] diff --git a/src/derive/Cargo.toml b/src/derive/Cargo.toml new file mode 100644 index 0000000..6896610 --- /dev/null +++ b/src/derive/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "derive" +version = "0.1.0" +edition = "2024" + +[lib] +proc-macro = true + +[dependencies] +syn = { version = "1.0", features = ["full"] } +quote = "1.0" diff --git a/src/derive/src/lib.rs b/src/derive/src/lib.rs new file mode 100644 index 0000000..1bf969d --- /dev/null +++ b/src/derive/src/lib.rs @@ -0,0 +1,34 @@ +extern crate proc_macro; +use proc_macro::TokenStream; +use quote::quote; +use syn::{parse_macro_input, Data, DeriveInput, Fields}; + +#[proc_macro_derive(FromVec)] +pub fn from_vec_derive(input: TokenStream) -> TokenStream { + let input = parse_macro_input!(input as DeriveInput); + let name = input.ident; + + let inner_type = match input.data { + Data::Struct(s) => match s.fields { + Fields::Unnamed(mut fields) => { + if fields.unnamed.len() != 1 { + panic!("FromVec can only be derived for tuple structs with one field."); + } + let field = fields.unnamed.pop().unwrap().into_value(); + field.ty + } + _ => panic!("FromVec can only be derived for tuple structs."), + }, + _ => panic!("FromVec can only be derived for structs."), + }; + + let expanded = quote! { + impl From<#inner_type> for #name { + fn from(vec: #inner_type) -> Self { + #name(vec) + } + } + }; + + TokenStream::from(expanded) +} diff --git a/src/dsl/Cargo.toml b/src/dsl/Cargo.toml index 5757462..ddb01f3 100644 --- a/src/dsl/Cargo.toml +++ b/src/dsl/Cargo.toml @@ -11,6 +11,7 @@ heromodels_core = { path = "../../../db/heromodels_core" } chrono = "0.4" heromodels-derive = { path = "../../../db/heromodels-derive" } macros = { path = "../macros"} +derive = { path = "../derive"} serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/src/dsl/src/access.rs b/src/dsl/src/access.rs index c58f2bc..b7275b3 100644 --- a/src/dsl/src/access.rs +++ b/src/dsl/src/access.rs @@ -1,4 +1,5 @@ use heromodels::db::Db; +use macros::{register_authorized_create_by_id_fn, register_authorized_delete_by_id_fn, register_authorized_get_by_id_fn}; use rhai::plugin::*; use rhai::{Array, Dynamic, Engine, EvalAltResult, INT, Module, Position}; use std::mem; @@ -9,16 +10,6 @@ type RhaiAccess = Access; use heromodels::db::Collection; use heromodels::db::hero::OurDB; -// Helper to convert i64 from Rhai to u32 for IDs -fn id_from_i64_to_u32(id_i64: i64) -> Result> { - u32::try_from(id_i64).map_err(|_| { - Box::new(EvalAltResult::ErrorArithmetic( - format!("Failed to convert ID '{}' to u32", id_i64).into(), - Position::NONE, - )) - }) -} - #[export_module] mod rhai_access_module { // --- Access Functions --- @@ -29,19 +20,19 @@ mod rhai_access_module { } /// Sets the access name - #[rhai_fn(name = "object_id", return_raw, global, pure)] + #[rhai_fn(name = "object_id", return_raw)] pub fn set_object_id( access: &mut RhaiAccess, object_id: i64, ) -> Result> { - let id = id_from_i64_to_u32(object_id)?; + let id = macros::id_from_i64_to_u32(object_id)?; let owned_access = std::mem::take(access); *access = owned_access.object_id(id); Ok(access.clone()) } /// Sets the access name - #[rhai_fn(name = "circle_pk", return_raw, global, pure)] + #[rhai_fn(name = "circle_public_key", return_raw, global, pure)] pub fn set_circle_pk( access: &mut RhaiAccess, circle_pk: String, @@ -57,7 +48,7 @@ mod rhai_access_module { access: &mut RhaiAccess, group_id: i64, ) -> Result> { - let id = id_from_i64_to_u32(group_id)?; + let id = macros::id_from_i64_to_u32(group_id)?; let owned_access = std::mem::take(access); *access = owned_access.group_id(id); Ok(access.clone()) @@ -68,7 +59,7 @@ mod rhai_access_module { access: &mut RhaiAccess, contact_id: i64, ) -> Result> { - let id = id_from_i64_to_u32(contact_id)?; + let id = macros::id_from_i64_to_u32(contact_id)?; let owned_access = std::mem::take(access); *access = owned_access.contact_id(id); Ok(access.clone()) @@ -127,137 +118,33 @@ mod rhai_access_module { } } -// // A function that takes the call context and an integer argument. -// fn save_access(context: NativeCallContext, access: RhaiAccess) -> Result<(), Box> { -// let optional_tag_ref: Option<&Dynamic> = context.tag(); -// // Ensure the tag exists -// let tag_ref: &Dynamic = optional_tag_ref.ok_or_else(|| { -// Box::new(EvalAltResult::ErrorRuntime( -// "Custom tag not set for this evaluation run.".into(), -// context.position(), // Use context.position() if available and relevant -// )) -// })?; - -// // Initialize database with OurDB for the Rhai engine -// // Using a temporary/in-memory like database for the worker -// let tag_map = tag_ref.read_lock::().ok_or_else(|| { -// Box::new(EvalAltResult::ErrorRuntime( -// "Tag is not a Map or is locked".into(), -// Position::NONE, -// )) -// })?; - -// let db_path = tag_map.get("CIRCLE_DB_PATH").expect("CIRCLE_DB_PATH not found").as_str().to_string(); -// let db = Arc::new( -// OurDB::new(db_path, false) -// .expect("Failed to create temporary DB for Rhai engine"), -// ); - -// let result = db.set(&access).map_err(|e| { -// Box::new(EvalAltResult::ErrorRuntime( -// format!("DB Error set_access: {}", e).into(), -// Position::NONE, -// )) -// })?; - -// // Return the updated access with the correct ID -// Ok(result) -// } - - -pub fn register_access_rhai_module(engine: &mut Engine, db: Arc) { +pub fn register_access_rhai_module(engine: &mut Engine) { // Register the exported module globally - let module = exported_module!(rhai_access_module); + let mut module = exported_module!(rhai_access_module); + + register_authorized_create_by_id_fn!( + module: &mut module, + rhai_fn_name: "save_access", + resource_type_str: "Access", + rhai_return_rust_type: heromodels::models::access::Access + ); + + register_authorized_get_by_id_fn!( + module: &mut module, + rhai_fn_name: "get_access", + resource_type_str: "Access", + rhai_return_rust_type: heromodels::models::access::Access + ); + + register_authorized_delete_by_id_fn!( + module: &mut module, + rhai_fn_name: "delete_access", + resource_type_str: "Access", + rhai_return_rust_type: heromodels::models::access::Access + ); + engine.register_global_module(module.into()); - // Create a module for database functions - let mut db_module = Module::new(); - - // let db_clone_set_access = db.clone(); - // db_module.set_native_fn( - // "save_access", - // move |access: Access| -> Result> { - // // Use the Collection trait method directly - // let result = db_clone_set_access.set(&access).map_err(|e| { - // Box::new(EvalAltResult::ErrorRuntime( - // format!("DB Error set_access: {}", e).into(), - // Position::NONE, - // )) - // })?; - - // // Return the updated access with the correct ID - // Ok(result.1) - // }, - // ); - - // Manually register database functions as they need to capture 'db' - let db_clone_delete_access = db.clone(); - db_module.set_native_fn( - "delete_access", - move |access: Access| -> Result<(), Box> { - // Use the Collection trait method directly - let result = db_clone_delete_access - .collection::() - .expect("can open access collection") - .delete_by_id(access.base_data.id) - .expect("can delete event"); - - // Return the updated event with the correct ID - Ok(result) - }, - ); - - let db_clone_get_access = db.clone(); - db_module.set_native_fn( - "get_access_by_id", - move |id_i64: INT| -> Result> { - let id_u32 = id_from_i64_to_u32(id_i64)?; - // Use the Collection trait method directly - db_clone_get_access - .get_by_id(id_u32) - .map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("DB Error get_access_by_id: {}", e).into(), - Position::NONE, - )) - })? - .ok_or_else(|| { - Box::new(EvalAltResult::ErrorRuntime( - format!("Access with ID {} not found", id_u32).into(), - Position::NONE, - )) - }) - }, - ); - - // Add list_accesss function to get all accesss - let db_clone_list_accesss = db.clone(); - db_module.set_native_fn( - "list_accesss", - move || -> Result> { - let collection = db_clone_list_accesss.collection::().map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("Failed to get access collection: {:?}", e).into(), - Position::NONE, - )) - })?; - let accesss = collection.get_all().map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("Failed to get all accesss: {:?}", e).into(), - Position::NONE, - )) - })?; - let mut array = Array::new(); - for access in accesss { - array.push(Dynamic::from(access)); - } - Ok(Dynamic::from(array)) - }, - ); - - // Register the database module globally - engine.register_global_module(db_module.into()); - println!("Successfully registered access Rhai module using export_module approach."); } diff --git a/src/dsl/src/lib.rs b/src/dsl/src/lib.rs index bb74a08..23c258a 100644 --- a/src/dsl/src/lib.rs +++ b/src/dsl/src/lib.rs @@ -1,6 +1,16 @@ +use rhai::Engine; pub mod library; pub mod access; +pub mod object; pub use macros::register_authorized_get_by_id_fn; pub use macros::register_authorized_list_fn; -pub use macros::id_from_i64_to_u32; \ No newline at end of file +pub use macros::id_from_i64_to_u32; + +/// Register all Rhai modules with the engine +pub fn register_dsl_modules(engine: &mut Engine) { + access::register_access_rhai_module(engine); + library::register_library_rhai_module(engine); + object::register_object_rhai_module(engine); + println!("Rhailib Domain Specific Language modules registered successfully."); +} \ No newline at end of file diff --git a/src/dsl/src/library.rs b/src/dsl/src/library.rs index 57c8dc9..809d967 100644 --- a/src/dsl/src/library.rs +++ b/src/dsl/src/library.rs @@ -1,3 +1,5 @@ +use derive::FromVec; +use macros::{register_authorized_create_by_id_fn, register_authorized_delete_by_id_fn, register_authorized_get_by_id_fn, register_authorized_list_fn}; use rhai::plugin::*; use rhai::{CustomType, Dynamic, Engine, EvalAltResult, Module, Position, TypeBuilder}; use serde::Serialize; @@ -6,7 +8,6 @@ use serde_json; use std::mem; use std::sync::Arc; - use heromodels::models::library::collection::Collection as RhaiCollection; use heromodels::models::library::items::{ Book as RhaiBook, Image as RhaiImage, Markdown as RhaiMarkdown, Pdf as RhaiPdf, @@ -15,18 +16,6 @@ use heromodels::models::library::items::{ use heromodels::db::Collection as DbCollectionTrait; use heromodels::db::hero::OurDB; - -// Helper to convert i64 from Rhai to u32 for IDs -fn id_from_i64_to_u32(id_i64: i64) -> Result> { - u32::try_from(id_i64).map_err(|_| { - Box::new(EvalAltResult::ErrorMismatchDataType( - "u32".to_string(), // Expected type - format!("i64 value ({}) that cannot be represented as u32", id_i64), // Actual type/value description - Position::NONE, - )) - }) -} - /// Registers a `.json()` method for any type `T` that implements the required traits. fn register_json_method(engine: &mut Engine) where @@ -45,10 +34,34 @@ where } // Wrapper type for a list of collections to enable .json() method via register_json_method -#[derive(Debug, Clone, Serialize, CustomType)] +#[derive(Debug, Clone, Serialize, CustomType, FromVec)] #[rhai_type(name = "CollectionArray")] pub struct RhaiCollectionArray(pub Vec); +#[derive(Debug, Clone, Serialize, CustomType, FromVec)] +#[rhai_type(name = "ImageArray")] +pub struct RhaiImageArray(pub Vec); + +#[derive(Debug, Clone, Serialize, CustomType, FromVec)] +#[rhai_type(name = "PdfArray")] +pub struct RhaiPdfArray(pub Vec); + +#[derive(Debug, Clone, Serialize, CustomType, FromVec)] +#[rhai_type(name = "MarkdownArray")] +pub struct RhaiMarkdownArray(pub Vec); + +#[derive(Debug, Clone, Serialize, CustomType, FromVec)] +#[rhai_type(name = "BookArray")] +pub struct RhaiBookArray(pub Vec); + +#[derive(Debug, Clone, Serialize, CustomType, FromVec)] +#[rhai_type(name = "SlidesArray")] +pub struct RhaiSlidesArray(pub Vec); + +#[derive(Debug, Clone, Serialize, CustomType, FromVec)] +#[rhai_type(name = "TocEntryArray")] +pub struct RhaiTocEntryArray(pub Vec); + #[export_module] mod rhai_library_module { // --- Collection Functions --- @@ -82,7 +95,7 @@ mod rhai_library_module { collection: &mut RhaiCollection, image_id: i64, ) -> Result> { - let id = id_from_i64_to_u32(image_id)?; + let id = macros::id_from_i64_to_u32(image_id)?; let owned = mem::take(collection); *collection = owned.add_image(id); Ok(collection.clone()) @@ -93,7 +106,7 @@ mod rhai_library_module { collection: &mut RhaiCollection, pdf_id: i64, ) -> Result> { - let id = id_from_i64_to_u32(pdf_id)?; + let id = macros::id_from_i64_to_u32(pdf_id)?; let owned = mem::take(collection); *collection = owned.add_pdf(id); Ok(collection.clone()) @@ -104,7 +117,7 @@ mod rhai_library_module { collection: &mut RhaiCollection, markdown_id: i64, ) -> Result> { - let id = id_from_i64_to_u32(markdown_id)?; + let id = macros::id_from_i64_to_u32(markdown_id)?; let owned = mem::take(collection); *collection = owned.add_markdown(id); Ok(collection.clone()) @@ -115,7 +128,7 @@ mod rhai_library_module { collection: &mut RhaiCollection, book_id: i64, ) -> Result> { - let id = id_from_i64_to_u32(book_id)?; + let id = macros::id_from_i64_to_u32(book_id)?; let owned = mem::take(collection); *collection = owned.add_book(id); Ok(collection.clone()) @@ -126,7 +139,7 @@ mod rhai_library_module { collection: &mut RhaiCollection, slides_id: i64, ) -> Result> { - let id = id_from_i64_to_u32(slides_id)?; + let id = macros::id_from_i64_to_u32(slides_id)?; let owned = mem::take(collection); *collection = owned.add_slides(id); Ok(collection.clone()) @@ -573,6 +586,11 @@ mod rhai_library_module { RhaiSlides::new() } + #[rhai_fn(get = "id", pure)] + pub fn get_slides_id(slides: &mut RhaiSlides) -> i64 { + slides.base_data.id as i64 + } + #[rhai_fn(name = "title", return_raw, global, pure)] pub fn slides_title( slides: &mut RhaiSlides, @@ -615,11 +633,6 @@ mod rhai_library_module { Ok(slides.clone()) } - #[rhai_fn(get = "id", pure)] - pub fn get_slides_id(slides: &mut RhaiSlides) -> i64 { - slides.base_data.id as i64 - } - #[rhai_fn(get = "created_at", pure)] pub fn get_slides_created_at(slides: &mut RhaiSlides) -> i64 { slides.base_data.created_at @@ -651,11 +664,8 @@ mod rhai_library_module { } } -pub fn register_library_rhai_module(engine: &mut Engine, db: Arc) { - let module = exported_module!(rhai_library_module); - engine.register_global_module(module.into()); - - let mut db_module = Module::new(); +pub fn register_library_rhai_module(engine: &mut Engine) { + let mut module = exported_module!(rhai_library_module); register_json_method::(engine); register_json_method::(engine); @@ -664,368 +674,174 @@ pub fn register_library_rhai_module(engine: &mut Engine, db: Arc) { register_json_method::(engine); register_json_method::(engine); register_json_method::(engine); - - // Register .json() method for our custom CollectionArray type register_json_method::(engine); - // --- Collection DB Functions --- - let db_clone = db.clone(); - db_module.set_native_fn( - "save_collection", - move |collection: RhaiCollection| -> Result> { - let result = db_clone.set(&collection).map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("DB Error: {:?}", e).into(), - Position::NONE, - )) - })?; - Ok(result.1) - }, + register_authorized_create_by_id_fn!( + module: &mut module, + rhai_fn_name: "save_collection", + resource_type_str: "Collection", + rhai_return_rust_type: heromodels::models::library::collection::Collection ); - let db_clone = db.clone(); - db_module.set_native_fn( - "get_collection", - move |id: i64| -> Result> { - let collection_id = id_from_i64_to_u32(id)?; - db_clone - .get_by_id(collection_id) - .map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("DB Error: {:?}", e).into(), - Position::NONE, - )) - })? - .ok_or_else(|| { - Box::new(EvalAltResult::ErrorRuntime( - format!("Collection with ID {} not found", collection_id).into(), - Position::NONE, - )) - }) - }, + register_authorized_get_by_id_fn!( + module: &mut module, + rhai_fn_name: "get_collection", + resource_type_str: "Collection", + rhai_return_rust_type: heromodels::models::library::collection::Collection ); - let db_clone_list_collections = db.clone(); - db_module.set_native_fn( - "list_collections", - move || -> Result> { - let collections_vec: Vec = db_clone_list_collections - .collection::() - .map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("DB Error (list_collections - access): {:?}", e).into(), - Position::NONE, - )) - })? - .get_all() - .map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("DB Error (list_collections - get_all): {:?}", e).into(), - Position::NONE, - )) - })?; - Ok(RhaiCollectionArray(collections_vec)) // Wrap in RhaiCollectionArray - }, + register_authorized_delete_by_id_fn!( + module: &mut module, + rhai_fn_name: "delete_collection", + resource_type_str: "Collection", + rhai_return_rust_type: heromodels::models::library::collection::Collection ); - let db_clone = db.clone(); - db_module.set_native_fn( - "delete_collection", - move |id: i64| -> Result<(), Box> { - let collection_id = id_from_i64_to_u32(id)?; - db_clone - .collection::() - .unwrap() - .delete_by_id(collection_id) - .map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("DB Error: {:?}", e).into(), - Position::NONE, - )) - })?; - Ok(()) - }, + register_authorized_list_fn!( + module: &mut module, + rhai_fn_name: "list_collections", + resource_type_str: "Collection", + rhai_return_rust_type: heromodels::models::library::collection::Collection, + rhai_return_wrapper_type: RhaiCollectionArray ); - // --- Image DB Functions --- - let db_clone = db.clone(); - db_module.set_native_fn( - "save_image", - move |image: RhaiImage| -> Result> { - let result = db_clone.set(&image).map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("DB Error: {:?}", e).into(), - Position::NONE, - )) - })?; - Ok(result.1) - }, + register_authorized_create_by_id_fn!( + module: &mut module, + rhai_fn_name: "save_image", + resource_type_str: "Image", + rhai_return_rust_type: heromodels::models::library::items::Image ); - let db_clone = db.clone(); - db_module.set_native_fn( - "get_image", - move |id: i64| -> Result> { - let image_id = id_from_i64_to_u32(id)?; - db_clone - .get_by_id(image_id) - .map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("DB Error: {:?}", e).into(), - Position::NONE, - )) - })? - .ok_or_else(|| { - Box::new(EvalAltResult::ErrorRuntime( - format!("Image with ID {} not found", image_id).into(), - Position::NONE, - )) - }) - }, + register_authorized_get_by_id_fn!( + module: &mut module, + rhai_fn_name: "get_image", + resource_type_str: "Image", + rhai_return_rust_type: heromodels::models::library::items::Image ); - let db_clone = db.clone(); - db_module.set_native_fn( - "delete_image", - move |id: i64| -> Result<(), Box> { - let image_id = id_from_i64_to_u32(id)?; - db_clone - .collection::() - .unwrap() - .delete_by_id(image_id) - .map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("DB Error: {:?}", e).into(), - Position::NONE, - )) - })?; - Ok(()) - }, + register_authorized_delete_by_id_fn!( + module: &mut module, + rhai_fn_name: "delete_image", + resource_type_str: "Image", + rhai_return_rust_type: heromodels::models::library::items::Image ); - // --- Pdf DB Functions --- - let db_clone = db.clone(); - db_module.set_native_fn( - "save_pdf", - move |pdf: RhaiPdf| -> Result> { - let result = db_clone.set(&pdf).map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("DB Error: {:?}", e).into(), - Position::NONE, - )) - })?; - Ok(result.1) - }, + register_authorized_list_fn!( + module: &mut module, + rhai_fn_name: "list_images", + resource_type_str: "Image", + rhai_return_rust_type: heromodels::models::library::items::Image, + rhai_return_wrapper_type: RhaiImageArray ); - let db_clone = db.clone(); - db_module.set_native_fn( - "get_pdf", - move |id: i64| -> Result> { - let pdf_id = id_from_i64_to_u32(id)?; - db_clone - .get_by_id(pdf_id) - .map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("DB Error: {:?}", e).into(), - Position::NONE, - )) - })? - .ok_or_else(|| { - Box::new(EvalAltResult::ErrorRuntime( - format!("Pdf with ID {} not found", pdf_id).into(), - Position::NONE, - )) - }) - }, + register_authorized_create_by_id_fn!( + module: &mut module, + rhai_fn_name: "save_pdf", + resource_type_str: "Pdf", + rhai_return_rust_type: heromodels::models::library::items::Pdf ); - let db_clone = db.clone(); - db_module.set_native_fn( - "delete_pdf", - move |id: i64| -> Result<(), Box> { - let pdf_id = id_from_i64_to_u32(id)?; - db_clone - .collection::() - .unwrap() - .delete_by_id(pdf_id) - .map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("DB Error: {:?}", e).into(), - Position::NONE, - )) - })?; - Ok(()) - }, + register_authorized_get_by_id_fn!( + module: &mut module, + rhai_fn_name: "get_pdf", + resource_type_str: "Pdf", + rhai_return_rust_type: heromodels::models::library::items::Pdf ); - // --- Markdown DB Functions --- - let db_clone = db.clone(); - db_module.set_native_fn( - "save_markdown", - move |markdown: RhaiMarkdown| -> Result> { - let result = db_clone.set(&markdown).map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("DB Error: {:?}", e).into(), - Position::NONE, - )) - })?; - Ok(result.1) - }, + register_authorized_delete_by_id_fn!( + module: &mut module, + rhai_fn_name: "delete_pdf", + resource_type_str: "Pdf", + rhai_return_rust_type: heromodels::models::library::items::Pdf ); - let db_clone = db.clone(); - db_module.set_native_fn( - "get_markdown", - move |id: i64| -> Result> { - let markdown_id = id_from_i64_to_u32(id)?; - db_clone - .get_by_id(markdown_id) - .map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("DB Error: {:?}", e).into(), - Position::NONE, - )) - })? - .ok_or_else(|| { - Box::new(EvalAltResult::ErrorRuntime( - format!("Markdown with ID {} not found", markdown_id).into(), - Position::NONE, - )) - }) - }, + register_authorized_list_fn!( + module: &mut module, + rhai_fn_name: "list_pdfs", + resource_type_str: "Pdf", + rhai_return_rust_type: heromodels::models::library::items::Pdf, + rhai_return_wrapper_type: RhaiPdfArray + ); + + register_authorized_get_by_id_fn!( + module: &mut module, + rhai_fn_name: "get_markdown", + resource_type_str: "Markdown", + rhai_return_rust_type: heromodels::models::library::items::Markdown ); - let db_clone = db.clone(); - db_module.set_native_fn( - "delete_markdown", - move |id: i64| -> Result<(), Box> { - let markdown_id = id_from_i64_to_u32(id)?; - db_clone - .collection::() - .unwrap() - .delete_by_id(markdown_id) - .map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("DB Error: {:?}", e).into(), - Position::NONE, - )) - })?; - Ok(()) - }, + register_authorized_delete_by_id_fn!( + module: &mut module, + rhai_fn_name: "delete_markdown", + resource_type_str: "Markdown", + rhai_return_rust_type: heromodels::models::library::items::Markdown ); - // --- Book DB Functions --- - let db_clone = db.clone(); - db_module.set_native_fn( - "save_book", - move |book: RhaiBook| -> Result> { - let result = db_clone.set(&book).map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("DB Error: {:?}", e).into(), - Position::NONE, - )) - })?; - Ok(result.1) - }, + register_authorized_list_fn!( + module: &mut module, + rhai_fn_name: "list_markdowns", + resource_type_str: "Markdown", + rhai_return_rust_type: heromodels::models::library::items::Markdown, + rhai_return_wrapper_type: RhaiMarkdownArray ); - let db_clone = db.clone(); - db_module.set_native_fn( - "get_book", - move |id: i64| -> Result> { - let book_id = id_from_i64_to_u32(id)?; - db_clone - .get_by_id(book_id) - .map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("DB Error: {:?}", e).into(), - Position::NONE, - )) - })? - .ok_or_else(|| { - Box::new(EvalAltResult::ErrorRuntime( - format!("Book with ID {} not found", book_id).into(), - Position::NONE, - )) - }) - }, + register_authorized_create_by_id_fn!( + module: &mut module, + rhai_fn_name: "save_book", + resource_type_str: "Book", + rhai_return_rust_type: heromodels::models::library::items::Book ); - let db_clone = db.clone(); - db_module.set_native_fn( - "delete_book", - move |id: i64| -> Result<(), Box> { - let book_id = id_from_i64_to_u32(id)?; - db_clone - .collection::() - .unwrap() - .delete_by_id(book_id) - .map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("DB Error: {:?}", e).into(), - Position::NONE, - )) - })?; - Ok(()) - }, + register_authorized_get_by_id_fn!( + module: &mut module, + rhai_fn_name: "get_book", + resource_type_str: "Book", + rhai_return_rust_type: heromodels::models::library::items::Book ); - // --- Slides DB Functions --- - let db_clone = db.clone(); - db_module.set_native_fn( - "save_slides", - move |slides: RhaiSlides| -> Result> { - let result = db_clone.set(&slides).map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("DB Error: {:?}", e).into(), - Position::NONE, - )) - })?; - Ok(result.1) - }, + register_authorized_delete_by_id_fn!( + module: &mut module, + rhai_fn_name: "delete_book", + resource_type_str: "Book", + rhai_return_rust_type: heromodels::models::library::items::Book ); - let db_clone = db.clone(); - db_module.set_native_fn( - "get_slides", - move |id: i64| -> Result> { - let slides_id = id_from_i64_to_u32(id)?; - db_clone - .get_by_id(slides_id) - .map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("DB Error: {:?}", e).into(), - Position::NONE, - )) - })? - .ok_or_else(|| { - Box::new(EvalAltResult::ErrorRuntime( - format!("Slides with ID {} not found", slides_id).into(), - Position::NONE, - )) - }) - }, + register_authorized_list_fn!( + module: &mut module, + rhai_fn_name: "list_books", + resource_type_str: "Book", + rhai_return_rust_type: heromodels::models::library::items::Book, + rhai_return_wrapper_type: RhaiBookArray ); - let db_clone = db.clone(); - db_module.set_native_fn( - "delete_slides", - move |id: i64| -> Result<(), Box> { - let slides_id = id_from_i64_to_u32(id)?; - db_clone - .collection::() - .unwrap() - .delete_by_id(slides_id) - .map_err(|e| { - Box::new(EvalAltResult::ErrorRuntime( - format!("DB Error: {:?}", e).into(), - Position::NONE, - )) - })?; - Ok(()) - }, + register_authorized_create_by_id_fn!( + module: &mut module, + rhai_fn_name: "save_slides", + resource_type_str: "Slides", + rhai_return_rust_type: heromodels::models::library::items::Slides ); - engine.register_global_module(db_module.into()); + register_authorized_get_by_id_fn!( + module: &mut module, + rhai_fn_name: "get_slides", + resource_type_str: "Slides", + rhai_return_rust_type: heromodels::models::library::items::Slides + ); + + register_authorized_delete_by_id_fn!( + module: &mut module, + rhai_fn_name: "delete_slides", + resource_type_str: "Slides", + rhai_return_rust_type: heromodels::models::library::items::Slides + ); + + register_authorized_list_fn!( + module: &mut module, + rhai_fn_name: "list_slides", + resource_type_str: "Slides", + rhai_return_rust_type: heromodels::models::library::items::Slides, + rhai_return_wrapper_type: RhaiSlidesArray + ); + + engine.register_global_module(module.into()); } diff --git a/src/dsl/src/object.rs b/src/dsl/src/object.rs new file mode 100644 index 0000000..87a43d5 --- /dev/null +++ b/src/dsl/src/object.rs @@ -0,0 +1,67 @@ +use heromodels::db::Db; +use macros::{register_authorized_create_by_id_fn, register_authorized_get_by_id_fn}; +use rhai::plugin::*; +use rhai::{Array, Dynamic, Engine, EvalAltResult, INT, Module, Position}; +use std::mem; +use std::sync::Arc; + +use heromodels::db::Collection; +use heromodels::models::object::Object; +type RhaiObject = Object; +use heromodels::db::hero::OurDB; + +#[export_module] +mod rhai_access_module { + // --- Access Functions --- + #[rhai_fn(name = "new_object", return_raw)] + pub fn new_object() -> Result> { + let object = Object::new(); + Ok(object) + } + + #[rhai_fn(name = "id", return_raw, global, pure)] + pub fn object_id(object: &mut RhaiObject) -> Result> { + Ok(object.id() as i64) + } + + #[rhai_fn(name = "title", return_raw, global, pure)] + pub fn object_title( + object: &mut RhaiObject, + title: String, + ) -> Result> { + let owned_object = std::mem::take(object); + *object = owned_object.title(title); + Ok(object.clone()) + } + + /// Sets the access name + #[rhai_fn(name = "description", return_raw, global, pure)] + pub fn object_description( + object: &mut RhaiObject, + description: String, + ) -> Result> { + let owned_object = std::mem::take(object); + *object = owned_object.description(description); + Ok(object.clone()) + } +} + +pub fn register_object_rhai_module(engine: &mut Engine) { + let mut module = exported_module!(rhai_access_module); + + register_authorized_create_by_id_fn!( + module: &mut module, + rhai_fn_name: "save_object", + resource_type_str: "Object", + rhai_return_rust_type: heromodels::models::object::Object + ); + + register_authorized_get_by_id_fn!( + module: &mut module, + rhai_fn_name: "get_object", + resource_type_str: "Object", + rhai_return_rust_type: heromodels::models::object::Object + ); + + engine.register_global_module(module.into()); +} \ No newline at end of file diff --git a/src/engine/Cargo.toml b/src/engine/Cargo.toml index cbdf4de..363917e 100644 --- a/src/engine/Cargo.toml +++ b/src/engine/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "engine" +name = "rhailib_engine" version = "0.1.0" edition = "2021" description = "Central Rhai engine for heromodels" @@ -10,6 +10,7 @@ heromodels = { path = "../../../db/heromodels", features = ["rhai"] } heromodels_core = { path = "../../../db/heromodels_core" } chrono = "0.4" heromodels-derive = { path = "../../../db/heromodels-derive" } +rhailib_dsl = { path = "../dsl" } [features] default = ["calendar", "finance"] diff --git a/src/engine/src/lib.rs b/src/engine/src/lib.rs index 45f13f3..b71ae12 100644 --- a/src/engine/src/lib.rs +++ b/src/engine/src/lib.rs @@ -5,11 +5,11 @@ use std::sync::Arc; use heromodels::db::hero::OurDB; use std::fs; // For file operations use std::path::Path; // For path handling - +use rhailib_dsl; // Export the mock database module pub mod mock_db; -pub fn create_heromodels_engine(db: Arc) -> Engine { +pub fn create_heromodels_engine() -> Engine { let mut engine = Engine::new(); // Configure engine settings @@ -19,43 +19,43 @@ pub fn create_heromodels_engine(db: Arc) -> Engine { engine.set_max_map_size(10 * 1024); // 10K elements // Register all heromodels Rhai modules - register_all_modules(&mut engine, db); + rhailib_dsl::register_dsl_modules(&mut engine); engine } -/// Register all heromodels Rhai modules with the engine -pub fn register_all_modules(engine: &mut Engine, db: Arc) { - // Register the calendar module if the feature is enabled - heromodels::models::access::register_access_rhai_module(engine, db.clone()); - #[cfg(feature = "calendar")] - heromodels::models::calendar::register_calendar_rhai_module(engine, db.clone()); - heromodels::models::contact::register_contact_rhai_module(engine, db.clone()); - heromodels::models::library::register_library_rhai_module(engine, db.clone()); - heromodels::models::circle::register_circle_rhai_module(engine, db.clone()); +// /// Register all heromodels Rhai modules with the engine +// pub fn register_all_modules(engine: &mut Engine, db: Arc) { +// // Register the calendar module if the feature is enabled +// heromodels::models::access::register_access_rhai_module(engine, db.clone()); +// #[cfg(feature = "calendar")] +// heromodels::models::calendar::register_calendar_rhai_module(engine, db.clone()); +// heromodels::models::contact::register_contact_rhai_module(engine, db.clone()); +// heromodels::models::library::register_library_rhai_module(engine, db.clone()); +// heromodels::models::circle::register_circle_rhai_module(engine, db.clone()); - // Register the flow module if the feature is enabled - #[cfg(feature = "flow")] - heromodels::models::flow::register_flow_rhai_module(engine, db.clone()); +// // Register the flow module if the feature is enabled +// #[cfg(feature = "flow")] +// heromodels::models::flow::register_flow_rhai_module(engine, db.clone()); - // // Register the finance module if the feature is enabled - // #[cfg(feature = "finance")] - // heromodels::models::finance::register_finance_rhai_module(engine, db.clone()); +// // // Register the finance module if the feature is enabled +// // #[cfg(feature = "finance")] +// // heromodels::models::finance::register_finance_rhai_module(engine, db.clone()); - // Register the legal module if the feature is enabled - #[cfg(feature = "legal")] - heromodels::models::legal::register_legal_rhai_module(engine, db.clone()); +// // Register the legal module if the feature is enabled +// #[cfg(feature = "legal")] +// heromodels::models::legal::register_legal_rhai_module(engine, db.clone()); - // Register the projects module if the feature is enabled - #[cfg(feature = "projects")] - heromodels::models::projects::register_projects_rhai_module(engine, db.clone()); +// // Register the projects module if the feature is enabled +// #[cfg(feature = "projects")] +// heromodels::models::projects::register_projects_rhai_module(engine, db.clone()); - // Register the biz module if the feature is enabled - #[cfg(feature = "biz")] - heromodels::models::biz::register_biz_rhai_module(engine, db.clone()); +// // Register the biz module if the feature is enabled +// #[cfg(feature = "biz")] +// heromodels::models::biz::register_biz_rhai_module(engine, db.clone()); - println!("Heromodels Rhai modules registered successfully."); -} +// println!("Heromodels Rhai modules registered successfully."); +// } /// Evaluate a Rhai script string pub fn eval_script( diff --git a/src/macros/Cargo.toml b/src/macros/Cargo.toml index f3c4587..e2d69ff 100644 --- a/src/macros/Cargo.toml +++ b/src/macros/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2024" [dependencies] -rhai = { version = "=1.21.0", features = ["std", "sync", "decimal", "internals"] } +rhai = { version = "1.21.0", features = ["std", "sync", "decimal", "internals"] } heromodels = { path = "../../../db/heromodels" } heromodels_core = { path = "../../../db/heromodels_core" } serde = { version = "1.0", features = ["derive"] } diff --git a/src/macros/src/lib.rs b/src/macros/src/lib.rs index cb2f6af..e002afe 100644 --- a/src/macros/src/lib.rs +++ b/src/macros/src/lib.rs @@ -82,7 +82,7 @@ macro_rules! register_authorized_get_by_id_fn { ) => { FuncRegistration::new($rhai_fn_name).set_into_module( $module, - move |context: rhai::NativeCallContext, id_val: i64| -> Result, Box> { + move |context: rhai::NativeCallContext, id_val: i64| -> Result<$rhai_return_rust_type, Box> { let actual_id: u32 = $crate::id_from_i64_to_u32(id_val)?; // Inlined logic to get caller public key @@ -98,8 +98,6 @@ macro_rules! register_authorized_get_by_id_fn { .ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'DB_PATH' not found in context tag Map.".into(), context.position())))?; let db_path = db_path.clone().into_string()?; - - println!("DB Path: {}", db_path); let circle_pk = tag_map.get("CIRCLE_PUBLIC_KEY") .ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'CIRCLE_PUBLIC_KEY' not found in context tag Map.".into(), context.position())))?; @@ -122,37 +120,165 @@ macro_rules! register_authorized_get_by_id_fn { ); if !has_access { - return Ok(None); + return Err(Box::new(EvalAltResult::ErrorRuntime( + format!("Access denied for public key: {}", caller_pk_str).into(), + context.position(), + ))); } } - let all_items: Vec<$rhai_return_rust_type> = db - .collection::<$rhai_return_rust_type>() - .map_err(|e| Box::new(EvalAltResult::ErrorRuntime(format!("{:?}", e).into(), Position::NONE)))? - .get_all() - .map_err(|e| Box::new(EvalAltResult::ErrorRuntime(format!("{:?}", e).into(), Position::NONE)))?; - - for item in all_items { - println!("{} with ID: {}", $resource_type_str, item.id()); - } - println!("Fetching {} with ID: {}", $resource_type_str, actual_id); - - - - let result = db.get_by_id(actual_id).map_err(|e| { + let result = db + .collection::<$rhai_return_rust_type>() + .unwrap() + .get_by_id(actual_id) + .map_err(|e| { println!("Database error fetching {} with ID: {}", $resource_type_str, actual_id); Box::new(EvalAltResult::ErrorRuntime( format!("Database error fetching {}: {:?}", $resource_type_str, e).into(), context.position(), )) - })?; - println!("Database fetched"); + })? + .ok_or_else(|| { + Box::new(EvalAltResult::ErrorRuntime( + format!("Database error fetching {} with ID: {}", $resource_type_str, actual_id).into(), + context.position(), + )) + })?; Ok(result) }, ); }; } +// Macro to register a Rhai function that retrieves a single resource by its ID, with authorization. +#[macro_export] +macro_rules! register_authorized_create_by_id_fn { + ( + module: $module:expr, + rhai_fn_name: $rhai_fn_name:expr, // String literal for the Rhai function name (e.g., "get_collection") + resource_type_str: $resource_type_str:expr, // String literal for the resource type (e.g., "Collection") + rhai_return_rust_type: $rhai_return_rust_type:ty // Rust type of the resource returned (e.g., `RhaiCollection`) + ) => { + FuncRegistration::new($rhai_fn_name).set_into_module( + $module, + move |context: rhai::NativeCallContext, object: $rhai_return_rust_type| -> Result<$rhai_return_rust_type, Box> { + + // Inlined logic to get caller public key + let tag_map = context + .tag() + .and_then(|tag| tag.read_lock::()) + .ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("Context tag must be a Map.".into(), context.position())))?; + + let pk_dynamic = tag_map.get("CALLER_PUBLIC_KEY") + .ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'CALLER_PUBLIC_KEY' not found in context tag Map.".into(), context.position())))?; + + let db_path = tag_map.get("DB_PATH") + .ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'DB_PATH' not found in context tag Map.".into(), context.position())))?; + + let db_path = db_path.clone().into_string()?; + + let circle_pk = tag_map.get("CIRCLE_PUBLIC_KEY") + .ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'CIRCLE_PUBLIC_KEY' not found in context tag Map.".into(), context.position())))?; + + let circle_pk = circle_pk.clone().into_string()?; + + let db_path = format!("{}/{}", db_path, circle_pk); + let db = Arc::new(OurDB::new(db_path, false).expect("Failed to create DB")); + + let caller_pk_str = pk_dynamic.clone().into_string()?; + + if circle_pk != caller_pk_str { + // TODO: check if caller pk is member of circle + return Err(Box::new(EvalAltResult::ErrorRuntime( + format!("Insufficient authorization. Caller public key {} does not match circle public key {}", caller_pk_str, circle_pk).into(), + context.position(), + ))); + } + + let result = db.set(&object).map_err(|e| { + Box::new(EvalAltResult::ErrorRuntime( + format!("Database error creating {}: {:?}", $resource_type_str, e).into(), + context.position(), + )) + })?; + Ok(result.1) + }, + ); + }; +} + +// Macro to register a Rhai function that retrieves a single resource by its ID, with authorization. +#[macro_export] +macro_rules! register_authorized_delete_by_id_fn { + ( + module: $module:expr, + rhai_fn_name: $rhai_fn_name:expr, // String literal for the Rhai function name (e.g., "get_collection") + resource_type_str: $resource_type_str:expr, // String literal for the resource type (e.g., "Collection") + rhai_return_rust_type: $rhai_return_rust_type:ty // Rust type of the resource returned (e.g., `RhaiCollection`) + ) => { + FuncRegistration::new($rhai_fn_name).set_into_module( + $module, + move |context: rhai::NativeCallContext, id_val: i64| -> Result<(), Box> { + let actual_id: u32 = $crate::id_from_i64_to_u32(id_val)?; + + // Inlined logic to get caller public key + let tag_map = context + .tag() + .and_then(|tag| tag.read_lock::()) + .ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("Context tag must be a Map.".into(), context.position())))?; + + let pk_dynamic = tag_map.get("CALLER_PUBLIC_KEY") + .ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'CALLER_PUBLIC_KEY' not found in context tag Map.".into(), context.position())))?; + + let db_path = tag_map.get("DB_PATH") + .ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'DB_PATH' not found in context tag Map.".into(), context.position())))?; + + let db_path = db_path.clone().into_string()?; + + let circle_pk = tag_map.get("CIRCLE_PUBLIC_KEY") + .ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'CIRCLE_PUBLIC_KEY' not found in context tag Map.".into(), context.position())))?; + + let circle_pk = circle_pk.clone().into_string()?; + + let db_path = format!("{}/{}", db_path, circle_pk); + let db = Arc::new(OurDB::new(db_path, false).expect("Failed to create DB")); + + let caller_pk_str = pk_dynamic.clone().into_string()?; + + if circle_pk != caller_pk_str { + // Use the standalone can_access_resource function from heromodels + let has_access = heromodels::models::access::access::can_access_resource( + db.clone(), + &caller_pk_str, + actual_id, + $resource_type_str, + ); + + if !has_access { + return Err(Box::new(EvalAltResult::ErrorRuntime( + format!("Access denied for public key: {}", caller_pk_str).into(), + context.position(), + ))); + } + } + + let result = db + .collection::<$rhai_return_rust_type>() + .unwrap() + .delete_by_id(actual_id) + .map_err(|e| { + Box::new(EvalAltResult::ErrorRuntime( + format!("Database error deleting {}: {:?}", $resource_type_str, e).into(), + context.position(), + )) + })?; + Ok(()) + }, + ); + }; +} + + /// Macro to register a Rhai function that lists all resources of a certain type, with authorization. /// /// The macro handles: @@ -164,7 +290,6 @@ macro_rules! register_authorized_get_by_id_fn { /// /// # Arguments /// * `module`: Mutable reference to the Rhai `Module`. -/// * `db_clone`: Cloned `Arc` for database access. /// * `rhai_fn_name`: String literal for the Rhai function name (e.g., "list_collections"). /// * `resource_type_str`: String literal for the resource type (e.g., "Collection"), used in authorization checks. /// * `rhai_return_rust_type`: Rust type of the resource item (e.g., `RhaiCollection`). @@ -174,16 +299,11 @@ macro_rules! register_authorized_get_by_id_fn { macro_rules! register_authorized_list_fn { ( module: $module:expr, - db_clone: $db_instance:expr, rhai_fn_name: $rhai_fn_name:expr, resource_type_str: $resource_type_str:expr, rhai_return_rust_type: $rhai_return_rust_type:ty, - item_id_accessor: $item_id_accessor:ident, rhai_return_wrapper_type: $rhai_return_wrapper_type:ty ) => { - let db_instance_auth_outer = $db_instance.clone(); - let db_instance_fetch = $db_instance.clone(); - FuncRegistration::new($rhai_fn_name).set_into_module( $module, move |context: rhai::NativeCallContext| -> Result<$rhai_return_wrapper_type, Box> { @@ -198,7 +318,20 @@ macro_rules! register_authorized_list_fn { let caller_pk_str = pk_dynamic.clone().into_string()?; - let all_items: Vec<$rhai_return_rust_type> = db_instance_fetch + let db_path = tag_map.get("DB_PATH") + .ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'DB_PATH' not found in context tag Map.".into(), context.position())))?; + + let db_path = db_path.clone().into_string()?; + + let circle_pk = tag_map.get("CIRCLE_PUBLIC_KEY") + .ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'CIRCLE_PUBLIC_KEY' not found in context tag Map.".into(), context.position())))?; + + let circle_pk = circle_pk.clone().into_string()?; + + let db_path = format!("{}/{}", db_path, circle_pk); + let db = Arc::new(OurDB::new(db_path, false).expect("Failed to create DB")); + + let all_items: Vec<$rhai_return_rust_type> = db .collection::<$rhai_return_rust_type>() .map_err(|e| Box::new(EvalAltResult::ErrorRuntime(format!("{:?}", e).into(), Position::NONE)))? .get_all() @@ -207,9 +340,9 @@ macro_rules! register_authorized_list_fn { let authorized_items: Vec<$rhai_return_rust_type> = all_items .into_iter() .filter(|item| { - let resource_id = item.$item_id_accessor(); + let resource_id = item.id(); heromodels::models::access::access::can_access_resource( - db_instance_auth_outer.clone(), + db.clone(), &caller_pk_str, resource_id, $resource_type_str, @@ -221,4 +354,4 @@ macro_rules! register_authorized_list_fn { }, ); }; -} \ No newline at end of file +} diff --git a/src/repl/Cargo.toml b/src/repl/Cargo.toml index 22b1f6d..f23ff54 100644 --- a/src/repl/Cargo.toml +++ b/src/repl/Cargo.toml @@ -16,6 +16,6 @@ rhai_client = { path = "../client" } anyhow = "1.0" # For simpler error handling rhailib_worker = { path = "../worker", package = "rhailib_worker" } -engine = { path = "../engine" } +rhailib_engine = { path = "../engine" } heromodels = { path = "../../../db/heromodels", features = ["rhai"] } rhai = { version = "1.18.0" } # Match version used by worker/engine diff --git a/src/worker/Cargo.toml b/src/worker/Cargo.toml index f4d3c55..a111d24 100644 --- a/src/worker/Cargo.toml +++ b/src/worker/Cargo.toml @@ -25,5 +25,5 @@ clap = { version = "4.4", features = ["derive"] } uuid = { version = "1.6", features = ["v4", "serde"] } # Though task_id is string, uuid might be useful chrono = { version = "0.4", features = ["serde"] } rhai_client = { path = "../client" } -engine = { path = "../engine" } +rhailib_engine = { path = "../engine" } heromodels = { path = "../../../db/heromodels", features = ["rhai"] } diff --git a/src/worker/src/lib.rs b/src/worker/src/lib.rs index 9d1effb..b5a5772 100644 --- a/src/worker/src/lib.rs +++ b/src/worker/src/lib.rs @@ -1,15 +1,14 @@ use chrono::Utc; use log::{debug, error, info}; use redis::AsyncCommands; -use rhai::{Dynamic, Engine, Scope}; +use rhai::{Dynamic, Engine}; use rhai_client::RhaiTaskDetails; // Import for constructing the reply message use serde_json; use std::collections::HashMap; use tokio::sync::mpsc; // For shutdown signal use tokio::task::JoinHandle; // For serializing the reply message -const REDIS_TASK_DETAILS_PREFIX: &str = "rhai_task_details:"; -const REDIS_QUEUE_PREFIX: &str = "rhai_tasks:"; +const NAMESPACE_PREFIX: &str = "rhailib:"; const BLPOP_TIMEOUT_SECONDS: usize = 5; // This function updates specific fields in the Redis hash. @@ -21,7 +20,7 @@ async fn update_task_status_in_redis( output: Option, error_msg: Option, ) -> redis::RedisResult<()> { - let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id); + let task_key = format!("{}{}", NAMESPACE_PREFIX, task_id); let mut updates: Vec<(&str, String)> = vec![ ("status", status.to_string()), ("updatedAt", Utc::now().timestamp().to_string()), @@ -42,7 +41,6 @@ async fn update_task_status_in_redis( } pub fn spawn_rhai_worker( - _circle_id: u32, // For logging or specific logic if needed in the future circle_public_key: String, db_path: String, mut engine: Engine, @@ -51,7 +49,7 @@ pub fn spawn_rhai_worker( preserve_tasks: bool, // Flag to control task cleanup ) -> JoinHandle>> { tokio::spawn(async move { - let queue_key = format!("{}{}", REDIS_QUEUE_PREFIX, circle_public_key); + let queue_key = format!("{}{}", NAMESPACE_PREFIX, circle_public_key); info!( "Rhai Worker for Circle Public Key '{}' starting. Connecting to Redis at {}. Listening on queue: {}. Waiting for tasks or shutdown signal.", circle_public_key, redis_url, queue_key @@ -85,97 +83,96 @@ pub fn spawn_rhai_worker( loop { let blpop_keys = vec![queue_key.clone()]; tokio::select! { - // Listen for shutdown signal - _ = shutdown_rx.recv() => { - info!("Worker for Circle Public Key '{}': Shutdown signal received. Terminating loop.", circle_public_key.clone()); - break; - } - // Listen for tasks from Redis - blpop_result = redis_conn.blpop(&blpop_keys, BLPOP_TIMEOUT_SECONDS as f64) => { - debug!("Worker for Circle Public Key '{}': Attempting BLPOP on queue: {}", circle_public_key.clone(), queue_key); - let response: Option<(String, String)> = match blpop_result { - Ok(resp) => resp, - Err(e) => { - error!("Worker for Circle Public Key '{}': Redis BLPOP error on queue {}: {}. Worker for this circle might stop.", circle_public_key, queue_key, e); - return Err(Box::new(e) as Box); - } - }; + // Listen for shutdown signal + _ = shutdown_rx.recv() => { + info!("Worker for Circle Public Key '{}': Shutdown signal received. Terminating loop.", circle_public_key.clone()); + break; + } + // Listen for tasks from Redis + blpop_result = redis_conn.blpop(&blpop_keys, BLPOP_TIMEOUT_SECONDS as f64) => { + debug!("Worker for Circle Public Key '{}': Attempting BLPOP on queue: {}", circle_public_key.clone(), queue_key); + let response: Option<(String, String)> = match blpop_result { + Ok(resp) => resp, + Err(e) => { + error!("Worker for Circle Public Key '{}': Redis BLPOP error on queue {}: {}. Worker for this circle might stop.", circle_public_key, queue_key, e); + return Err(Box::new(e) as Box); + } + }; - if let Some((_queue_name_recv, task_id)) = response { - info!("Worker for Circle Public Key '{}' received task_id: {} from queue: {}", circle_public_key, task_id, _queue_name_recv); - debug!("Worker for Circle Public Key '{}', Task {}: Processing started.", circle_public_key, task_id); + if let Some((_queue_name_recv, task_id)) = response { + info!("Worker for Circle Public Key '{}' received task_id: {} from queue: {}", circle_public_key, task_id, _queue_name_recv); + debug!("Worker for Circle Public Key '{}', Task {}: Processing started.", circle_public_key, task_id); - let task_details_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id); - debug!("Worker for Circle Public Key '{}', Task {}: Attempting HGETALL from key: {}", circle_public_key, task_id, task_details_key); + let task_details_key = format!("{}{}", NAMESPACE_PREFIX, task_id); + debug!("Worker for Circle Public Key '{}', Task {}: Attempting HGETALL from key: {}", circle_public_key, task_id, task_details_key); - let task_details_map_result: Result, _> = - redis_conn.hgetall(&task_details_key).await; + let task_details_map_result: Result, _> = + redis_conn.hgetall(&task_details_key).await; - match task_details_map_result { - Ok(details_map) => { - debug!("Worker for Circle Public Key '{}', Task {}: HGETALL successful. Details: {:?}", circle_public_key, task_id, details_map); - let script_content_opt = details_map.get("script").cloned(); - let reply_to_queue_opt = details_map.get("replyToQueue").cloned(); - let created_at_str_opt = details_map.get("createdAt").cloned(); - let public_key_opt = details_map.get("publicKey").cloned(); + match task_details_map_result { + Ok(details_map) => { + debug!("Worker for Circle Public Key '{}', Task {}: HGETALL successful. Details: {:?}", circle_public_key, task_id, details_map); + let script_content_opt = details_map.get("script").cloned(); + let created_at_str_opt = details_map.get("createdAt").cloned(); + let caller_id = details_map.get("callerId").cloned().expect("callerId field missing from Redis hash"); - if let Some(script_content) = script_content_opt { - info!("Worker for Circle Public Key '{}' processing task_id: {}. Script: {:.50}...", circle_public_key, task_id, script_content); - debug!("Worker for Circle Public Key '{}', Task {}: Attempting to update status to 'processing'.", circle_public_key, task_id); - if let Err(e) = update_task_status_in_redis(&mut redis_conn, &task_id, "processing", None, None).await { - error!("Worker for Circle Public Key '{}', Task {}: Failed to update status to 'processing': {}", circle_public_key, task_id, e); - } else { - debug!("Worker for Circle Public Key '{}', Task {}: Status updated to 'processing'.", circle_public_key, task_id); - } + if let Some(script_content) = script_content_opt { + info!("Worker for Circle Public Key '{}' processing task_id: {}. Script: {:.50}...", circle_public_key, task_id, script_content); + debug!("Worker for Circle Public Key '{}', Task {}: Attempting to update status to 'processing'.", circle_public_key, task_id); + if let Err(e) = update_task_status_in_redis(&mut redis_conn, &task_id, "processing", None, None).await { + error!("Worker for Circle Public Key '{}', Task {}: Failed to update status to 'processing': {}", circle_public_key, task_id, e); + } else { + debug!("Worker for Circle Public Key '{}', Task {}: Status updated to 'processing'.", circle_public_key, task_id); + } - let mut db_config = rhai::Map::new(); - db_config.insert("DB_PATH".into(), db_path.clone().into()); - db_config.insert("CALLER_PUBLIC_KEY".into(), public_key_opt.unwrap_or_default().into()); - db_config.insert("CIRCLE_PUBLIC_KEY".into(), circle_public_key.clone().into()); - engine.set_default_tag(Dynamic::from(db_config)); // Or pass via CallFnOptions - - debug!("Worker for Circle Public Key '{}', Task {}: Evaluating script with Rhai engine.", circle_public_key, task_id); + let mut db_config = rhai::Map::new(); + db_config.insert("DB_PATH".into(), db_path.clone().into()); + db_config.insert("CALLER_PUBLIC_KEY".into(), caller_id.clone().into()); + db_config.insert("CIRCLE_PUBLIC_KEY".into(), circle_public_key.clone().into()); + engine.set_default_tag(Dynamic::from(db_config)); // Or pass via CallFnOptions + + debug!("Worker for Circle Public Key '{}', Task {}: Evaluating script with Rhai engine.", circle_public_key, task_id); - let mut final_status = "error".to_string(); // Default to error - let mut final_output: Option = None; - let mut final_error_msg: Option = None; + let mut final_status = "error".to_string(); // Default to error + let mut final_output: Option = None; + let mut final_error_msg: Option = None; - match engine.eval::(&script_content) { - Ok(result) => { - let output_str = if result.is::() { - // If the result is a string, we can unwrap it directly. - // This moves `result`, which is fine because it's the last time we use it in this branch. - result.into_string().unwrap() - } else { - result.to_string() - }; + match engine.eval::(&script_content) { + Ok(result) => { + let output_str = if result.is::() { + // If the result is a string, we can unwrap it directly. + // This moves `result`, which is fine because it's the last time we use it in this branch. + result.into_string().unwrap() + } else { + result.to_string() + }; info!("Worker for Circle Public Key '{}' task {} completed. Output: {}", circle_public_key, task_id, output_str); final_status = "completed".to_string(); final_output = Some(output_str); } - Err(e) => { - let error_str = format!("{:?}", *e); - error!("Worker for Circle Public Key '{}' task {} script evaluation failed. Error: {}", circle_public_key, task_id, error_str); - final_error_msg = Some(error_str); - // final_status remains "error" + Err(e) => { + let error_str = format!("{:?}", *e); + error!("Worker for Circle Public Key '{}' task {} script evaluation failed. Error: {}", circle_public_key, task_id, error_str); + final_error_msg = Some(error_str); + // final_status remains "error" + } } - } - debug!("Worker for Circle Public Key '{}', Task {}: Attempting to update status to '{}'.", circle_public_key, task_id, final_status); - if let Err(e) = update_task_status_in_redis( - &mut redis_conn, - &task_id, - &final_status, - final_output.clone(), // Clone for task hash update - final_error_msg.clone(), // Clone for task hash update - ).await { - error!("Worker for Circle Public Key '{}', Task {}: Failed to update final status to '{}': {}", circle_public_key, task_id, final_status, e); - } else { - debug!("Worker for Circle Public Key '{}', Task {}: Final status updated to '{}'.", circle_public_key, task_id, final_status); - } + debug!("Worker for Circle Public Key '{}', Task {}: Attempting to update status to '{}'.", circle_public_key, task_id, final_status); + if let Err(e) = update_task_status_in_redis( + &mut redis_conn, + &task_id, + &final_status, + final_output.clone(), // Clone for task hash update + final_error_msg.clone(), // Clone for task hash update + ).await { + error!("Worker for Circle Public Key '{}', Task {}: Failed to update final status to '{}': {}", circle_public_key, task_id, final_status, e); + } else { + debug!("Worker for Circle Public Key '{}', Task {}: Final status updated to '{}'.", circle_public_key, task_id, final_status); + } + + // Send to reply queue if specified - // Send to reply queue if specified - if let Some(reply_q) = reply_to_queue_opt { let created_at = created_at_str_opt .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok()) .map(|dt| dt.with_timezone(&Utc)) @@ -185,45 +182,43 @@ pub fn spawn_rhai_worker( task_id: task_id.to_string(), // Add the task_id script: script_content.clone(), // Include script for context in reply status: final_status, // The final status - // client_rpc_id is no longer a field output: final_output, // The final output error: final_error_msg, // The final error created_at, // Original creation time updated_at: Utc::now(), // Time of this final update/reply - // reply_to_queue is no longer a field - public_key: public_key_opt.clone(), + caller_id: caller_id.clone(), }; + let reply_queue_key = format!("{}:reply:{}", NAMESPACE_PREFIX, task_id); match serde_json::to_string(&reply_details) { Ok(reply_json) => { - let lpush_result: redis::RedisResult = redis_conn.lpush(&reply_q, &reply_json).await; + let lpush_result: redis::RedisResult = redis_conn.lpush(&reply_queue_key, &reply_json).await; match lpush_result { - Ok(_) => debug!("Worker for Circle Public Key '{}', Task {}: Successfully sent result to reply queue {}", circle_public_key, task_id, reply_q), - Err(e_lpush) => error!("Worker for Circle Public Key '{}', Task {}: Failed to LPUSH result to reply queue {}: {}", circle_public_key, task_id, reply_q, e_lpush), + Ok(_) => debug!("Worker for Circle Public Key '{}', Task {}: Successfully sent result to reply queue {}", circle_public_key, task_id, reply_queue_key), + Err(e_lpush) => error!("Worker for Circle Public Key '{}', Task {}: Failed to LPUSH result to reply queue {}: {}", circle_public_key, task_id, reply_queue_key, e_lpush), } } Err(e_json) => { - error!("Worker for Circle Public Key '{}', Task {}: Failed to serialize reply details for queue {}: {}", circle_public_key, task_id, reply_q, e_json); + error!("Worker for Circle Public Key '{}', Task {}: Failed to serialize reply details for queue {}: {}", circle_public_key, task_id, reply_queue_key, e_json); } } - } - // Clean up task details based on preserve_tasks flag - if !preserve_tasks { - // The worker is responsible for cleaning up the task details hash. - if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await { - error!("Worker for Circle Public Key '{}', Task {}: Failed to delete task details key '{}': {}", circle_public_key, task_id, task_details_key, e); + // Clean up task details based on preserve_tasks flag + if !preserve_tasks { + // The worker is responsible for cleaning up the task details hash. + if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await { + error!("Worker for Circle Public Key '{}', Task {}: Failed to delete task details key '{}': {}", circle_public_key, task_id, task_details_key, e); + } else { + debug!("Worker for Circle Public Key '{}', Task {}: Cleaned up task details key '{}'.", circle_public_key, task_id, task_details_key); + } } else { - debug!("Worker for Circle Public Key '{}', Task {}: Cleaned up task details key '{}'.", circle_public_key, task_id, task_details_key); + debug!("Worker for Circle Public Key '{}', Task {}: Preserving task details (preserve_tasks=true)", circle_public_key, task_id); } - } else { - debug!("Worker for Circle Public Key '{}', Task {}: Preserving task details (preserve_tasks=true)", circle_public_key, task_id); - } - } else { // Script content not found in hash - error!( - "Worker for Circle Public Key '{}', Task {}: Script content not found in Redis hash. Details map: {:?}", - circle_public_key, task_id, details_map - ); - // Clean up invalid task details based on preserve_tasks flag - if !preserve_tasks { + } else { // Script content not found in hash + error!( + "Worker for Circle Public Key '{}', Task {}: Script content not found in Redis hash. Details map: {:?}", + circle_public_key, task_id, details_map + ); + // Clean up invalid task details based on preserve_tasks flag + if !preserve_tasks { // Even if the script is not found, the worker should clean up the invalid task hash. if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await { error!("Worker for Circle Public Key '{}', Task {}: Failed to delete invalid task details key '{}': {}", circle_public_key, task_id, task_details_key, e); @@ -233,23 +228,23 @@ pub fn spawn_rhai_worker( } } } - Err(e) => { - error!( - "Worker for Circle Public Key '{}', Task {}: Failed to fetch details (HGETALL) from Redis for key {}. Error: {:?}", - circle_public_key, task_id, task_details_key, e - ); - } + Err(e) => { + error!( + "Worker for Circle Public Key '{}', Task {}: Failed to fetch details (HGETALL) from Redis for key {}. Error: {:?}", + circle_public_key, task_id, task_details_key, e + ); } - } else { - debug!("Worker for Circle Public Key '{}': BLPOP timed out on queue {}. No new tasks. Checking for shutdown signal again.", &circle_public_key, &queue_key); - } - } // End of blpop_result match - } // End of tokio::select! + } + } else { + debug!("Worker for Circle Public Key '{}': BLPOP timed out on queue {}. No new tasks. Checking for shutdown signal again.", &circle_public_key, &queue_key); + } + } // End of blpop_result match + } // End of tokio::select! } // End of loop info!( "Worker for Circle Public Key '{}' has shut down.", circle_public_key ); - Ok(()) + Ok(()) }) }