From 061aee6f1d347e78412bd3beea0d0bbec28bf88a Mon Sep 17 00:00:00 2001 From: timurgordon Date: Sun, 1 Jun 2025 02:10:58 +0300 Subject: [PATCH] rhai rpc queue worker and client --- rhai_client/.gitignore | 1 + rhai_client/Cargo.toml | 17 ++ rhai_client/src/lib.rs | 212 ++++++++++++++++++ rhai_worker/.gitignore | 1 + rhai_worker/Cargo.toml | 27 +++ rhai_worker/examples/example_math_worker.rs | 76 +++++++ rhai_worker/examples/example_string_worker.rs | 76 +++++++ rhai_worker/src/lib.rs | 144 ++++++++++++ rhai_worker/src/main.rs | 18 ++ 9 files changed, 572 insertions(+) create mode 100644 rhai_client/.gitignore create mode 100644 rhai_client/Cargo.toml create mode 100644 rhai_client/src/lib.rs create mode 100644 rhai_worker/.gitignore create mode 100644 rhai_worker/Cargo.toml create mode 100644 rhai_worker/examples/example_math_worker.rs create mode 100644 rhai_worker/examples/example_string_worker.rs create mode 100644 rhai_worker/src/lib.rs create mode 100644 rhai_worker/src/main.rs diff --git a/rhai_client/.gitignore b/rhai_client/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/rhai_client/.gitignore @@ -0,0 +1 @@ +/target diff --git a/rhai_client/Cargo.toml b/rhai_client/Cargo.toml new file mode 100644 index 0000000..ad24359 --- /dev/null +++ b/rhai_client/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "rhai_client" +version = "0.1.0" +edition = "2021" + +[dependencies] +redis = { version = "0.25.0", features = ["tokio-comp"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +uuid = { version = "1.6", features = ["v4", "serde"] } +chrono = { version = "0.4", features = ["serde"] } +log = "0.4" +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } # For async main in examples, and general async + +[dev-dependencies] # For examples later +env_logger = "0.10" +rhai = "1.18.0" # For examples that might need to show engine setup diff --git a/rhai_client/src/lib.rs b/rhai_client/src/lib.rs new file mode 100644 index 0000000..31be671 --- /dev/null +++ b/rhai_client/src/lib.rs @@ -0,0 +1,212 @@ +use chrono::Utc; +use log::{debug, info, warn, error}; // Added error +use redis::AsyncCommands; +use tokio::time::{sleep, Instant}; // For polling with timeout +use std::time::Duration; +use serde::{Deserialize, Serialize}; +use serde_json::Value; // For client_rpc_id, though not directly used by this client's submit method +use uuid::Uuid; + +const REDIS_TASK_DETAILS_PREFIX: &str = "rhai_task_details:"; +const REDIS_QUEUE_PREFIX: &str = "rhai_tasks:"; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct RhaiTaskDetails { + pub script: String, + pub status: String, // "pending", "processing", "completed", "error" + #[serde(rename = "clientRpcId")] + pub client_rpc_id: Option, // Kept for compatibility with worker/server, but optional for client + pub output: Option, + pub error: Option, // Renamed from error_message for consistency + #[serde(rename = "createdAt")] + pub created_at: chrono::DateTime, + #[serde(rename = "updatedAt")] + pub updated_at: chrono::DateTime, +} + +#[derive(Debug)] +pub enum RhaiClientError { + RedisError(redis::RedisError), + SerializationError(serde_json::Error), + Timeout(String), // task_id that timed out + TaskNotFound(String), // task_id not found after submission (should be rare) +} + +impl From for RhaiClientError { + fn from(err: redis::RedisError) -> Self { + RhaiClientError::RedisError(err) + } +} + +impl From for RhaiClientError { + fn from(err: serde_json::Error) -> Self { + RhaiClientError::SerializationError(err) + } +} + +impl std::fmt::Display for RhaiClientError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RhaiClientError::RedisError(e) => write!(f, "Redis error: {}", e), + RhaiClientError::SerializationError(e) => write!(f, "Serialization error: {}", e), + RhaiClientError::Timeout(task_id) => write!(f, "Timeout waiting for task {} to complete", task_id), + RhaiClientError::TaskNotFound(task_id) => write!(f, "Task {} not found after submission", task_id), + } + } +} + +impl std::error::Error for RhaiClientError {} + +pub struct RhaiClient { + redis_client: redis::Client, +} + +impl RhaiClient { + pub fn new(redis_url: &str) -> Result { + let client = redis::Client::open(redis_url)?; + Ok(Self { redis_client: client }) + } + + pub async fn submit_script( + &self, + circle_name: &str, + script: String, + client_rpc_id: Option, // Optional: if the caller has an RPC ID to associate + ) -> Result { + let mut conn = self.redis_client.get_multiplexed_async_connection().await?; + + let task_id = Uuid::new_v4().to_string(); + let now = Utc::now(); + + let task_details = RhaiTaskDetails { + script, + status: "pending".to_string(), + client_rpc_id, + output: None, + error: None, + created_at: now, + updated_at: now, + }; + + let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id); + let queue_key = format!("{}{}", REDIS_QUEUE_PREFIX, circle_name.replace(" ", "_").to_lowercase()); + + debug!( + "Submitting task_id: {} for circle: {} to queue: {}. Details: {:?}", + task_id, circle_name, queue_key, task_details + ); + + // Using HSET_MULTIPLE for efficiency if redis-rs supports it directly for struct fields. + // Otherwise, individual HSETs are fine. + // For simplicity and directness with redis-rs async, individual HSETs are used here. + conn.hset::<_, _, _, ()>(&task_key, "script", &task_details.script).await?; + conn.hset::<_, _, _, ()>(&task_key, "status", &task_details.status).await?; + if let Some(rpc_id_val) = &task_details.client_rpc_id { + conn.hset::<_, _, _, ()>(&task_key, "clientRpcId", serde_json::to_string(rpc_id_val)?).await?; + } else { + // Ensure the field exists even if null, or decide if it should be omitted + conn.hset::<_, _, _, ()>(&task_key, "clientRpcId", Value::Null.to_string()).await?; + } + conn.hset::<_, _, _, ()>(&task_key, "createdAt", task_details.created_at.to_rfc3339()).await?; + conn.hset::<_, _, _, ()>(&task_key, "updatedAt", task_details.updated_at.to_rfc3339()).await?; + // output and error fields are initially None, so they might not be set here or set as empty strings/null + + conn.lpush::<_, _, ()>(&queue_key, &task_id).await?; + + Ok(task_id) + } + + // Optional: A method to check task status, similar to what circle_server_ws polling does. + // This could be useful for a client that wants to poll for results itself. + pub async fn get_task_status(&self, task_id: &str) -> Result, RhaiClientError> { + let mut conn = self.redis_client.get_multiplexed_async_connection().await?; + let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id); + + let result_map: Option> = conn.hgetall(&task_key).await?; + + match result_map { + Some(map) => { + // Reconstruct RhaiTaskDetails from HashMap + // This is a simplified reconstruction; ensure all fields are handled robustly + let details = RhaiTaskDetails { + script: map.get("script").cloned().unwrap_or_default(), + status: map.get("status").cloned().unwrap_or_default(), + client_rpc_id: map.get("clientRpcId") + .and_then(|s| serde_json::from_str(s).ok()) + .or(Some(Value::Null)), // Default to Value::Null if missing or parse error + output: map.get("output").cloned(), + error: map.get("error").cloned(), + created_at: map.get("createdAt") + .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) + .map(|dt| dt.with_timezone(&Utc)) + .unwrap_or_else(Utc::now), // Provide a default + updated_at: map.get("updatedAt") + .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) + .map(|dt| dt.with_timezone(&Utc)) + .unwrap_or_else(Utc::now), // Provide a default + }; + Ok(Some(details)) + } + None => Ok(None), + } + } + + pub async fn submit_script_and_await_result( + &self, + circle_name: &str, + script: String, + client_rpc_id: Option, + timeout: Duration, + poll_interval: Duration, + ) -> Result { + let task_id = self.submit_script(circle_name, script, client_rpc_id).await?; + info!("Task {} submitted. Polling for result with timeout {:?}...", task_id, timeout); + + let start_time = Instant::now(); + loop { + if start_time.elapsed() > timeout { + warn!("Timeout waiting for task {}", task_id); + return Err(RhaiClientError::Timeout(task_id.clone())); + } + + match self.get_task_status(&task_id).await { + Ok(Some(details)) => { + debug!("Polled task {}: status = {}", task_id, details.status); + if details.status == "completed" || details.status == "error" { + info!("Task {} finished with status: {}", task_id, details.status); + return Ok(details); + } + // else status is "pending" or "processing", continue polling + } + Ok(None) => { + // This case should ideally not happen if submit_script succeeded and worker is running, + // unless the task details were manually deleted from Redis. + warn!("Task {} not found during polling. This might indicate an issue.", task_id); + // Depending on desired robustness, could retry a few times or return an error immediately. + // For now, let it continue polling up to timeout, or return a specific error. + // If it persists, it's effectively a timeout or a lost task. + // Let's consider it a lost task if it's not found after a short while post-submission. + if start_time.elapsed() > Duration::from_secs(5) { // Arbitrary short duration + return Err(RhaiClientError::TaskNotFound(task_id.clone())); + } + } + Err(e) => { + // Log error but continue polling unless it's a critical Redis error + error!("Error polling task {}: {}. Will retry.", task_id, e); + } + } + sleep(poll_interval).await; + } + } +} + +#[cfg(test)] +mod tests { + // use super::*; + // Basic tests can be added later, especially once examples are in place. + // For now, ensuring it compiles is the priority. + #[test] + fn it_compiles() { + assert_eq!(2 + 2, 4); + } +} diff --git a/rhai_worker/.gitignore b/rhai_worker/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/rhai_worker/.gitignore @@ -0,0 +1 @@ +/target diff --git a/rhai_worker/Cargo.toml b/rhai_worker/Cargo.toml new file mode 100644 index 0000000..78e3982 --- /dev/null +++ b/rhai_worker/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "rhai_worker" +version = "0.1.0" +edition = "2021" + +[lib] +name = "rhai_worker_lib" # Can be different from package name, or same +path = "src/lib.rs" + +[[bin]] +name = "rhai_worker" +path = "src/main.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +redis = { version = "0.25.0", features = ["tokio-comp"] } +rhai = { version = "1.18.0", features = ["sync", "decimal"] } # Added "decimal" for broader script support +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] } +log = "0.4" +env_logger = "0.10" +clap = { version = "4.4", features = ["derive"] } +uuid = { version = "1.6", features = ["v4", "serde"] } # Though task_id is string, uuid might be useful +chrono = { version = "0.4", features = ["serde"] } +rhai_client = { path = "../rhai_client" } diff --git a/rhai_worker/examples/example_math_worker.rs b/rhai_worker/examples/example_math_worker.rs new file mode 100644 index 0000000..693d94c --- /dev/null +++ b/rhai_worker/examples/example_math_worker.rs @@ -0,0 +1,76 @@ +use rhai::Engine; +use rhai_client::RhaiClient; // To submit tasks +use rhai_worker_lib::{run_worker_loop, Args as WorkerArgs}; // To run the worker +use std::time::Duration; +use tokio::time::sleep; + +// Custom function for Rhai +fn add(a: i64, b: i64) -> i64 { + a + b +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + log::info!("Starting Math Worker Example..."); + + // 1. Configure and start the Rhai Worker with a custom engine + let mut math_engine = Engine::new(); + math_engine.register_fn("add", add); + log::info!("Custom 'add' function registered with Rhai engine for Math Worker."); + + let worker_args = WorkerArgs { + redis_url: "redis://127.0.0.1/".to_string(), + circles: vec!["math_circle".to_string()], // Worker listens on a specific queue + }; + let worker_args_clone = worker_args.clone(); // Clone for the worker task + + tokio::spawn(async move { + log::info!("Math Worker task starting..."); + if let Err(e) = run_worker_loop(math_engine, worker_args_clone).await { + log::error!("Math Worker loop failed: {}", e); + } + }); + + // Give the worker a moment to start and connect + sleep(Duration::from_secs(1)).await; + + // 2. Use RhaiClient to submit a script to the "math_circle" + let client = RhaiClient::new("redis://127.0.0.1/")?; + let script_content = r#" + let x = 10; + let y = add(x, 32); // Use the custom registered function + print("Math script: 10 + 32 = " + y); + y // Return the result + "#; + + log::info!("Submitting math script to 'math_circle' and awaiting result..."); + + let timeout_duration = Duration::from_secs(10); + let poll_interval = Duration::from_millis(500); + + match client.submit_script_and_await_result( + "math_circle", + script_content.to_string(), + None, + timeout_duration, + poll_interval + ).await { + Ok(details) => { + log::info!("Math Worker Example: Task finished. Status: {}, Output: {:?}, Error: {:?}", + details.status, details.output, details.error); + if details.status == "completed" { + assert_eq!(details.output, Some("42".to_string())); + log::info!("Math Worker Example: Assertion for output 42 passed!"); + Ok(()) + } else { + log::error!("Math Worker Example: Task completed with error: {:?}", details.error); + Err(format!("Task failed with error: {:?}", details.error).into()) + } + } + Err(e) => { + log::error!("Math Worker Example: Failed to get task result: {}", e); + Err(e.into()) + } + } +} \ No newline at end of file diff --git a/rhai_worker/examples/example_string_worker.rs b/rhai_worker/examples/example_string_worker.rs new file mode 100644 index 0000000..dbd18ce --- /dev/null +++ b/rhai_worker/examples/example_string_worker.rs @@ -0,0 +1,76 @@ +use rhai::Engine; +use rhai_client::RhaiClient; // To submit tasks +use rhai_worker_lib::{run_worker_loop, Args as WorkerArgs}; // To run the worker +use std::time::Duration; +use tokio::time::sleep; + +// Custom function for Rhai +fn reverse_string(s: String) -> String { + s.chars().rev().collect() +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + log::info!("Starting String Worker Example..."); + + // 1. Configure and start the Rhai Worker with a custom engine + let mut string_engine = Engine::new(); + string_engine.register_fn("reverse_it", reverse_string); + log::info!("Custom 'reverse_it' function registered with Rhai engine for String Worker."); + + let worker_args = WorkerArgs { + redis_url: "redis://127.0.0.1/".to_string(), + circles: vec!["string_circle".to_string()], // Worker listens on a specific queue + }; + let worker_args_clone = worker_args.clone(); + + tokio::spawn(async move { + log::info!("String Worker task starting..."); + if let Err(e) = run_worker_loop(string_engine, worker_args_clone).await { + log::error!("String Worker loop failed: {}", e); + } + }); + + // Give the worker a moment to start and connect + sleep(Duration::from_secs(1)).await; + + // 2. Use RhaiClient to submit a script to the "string_circle" + let client = RhaiClient::new("redis://127.0.0.1/")?; + let script_content = r#" + let original = "hello world"; + let reversed = reverse_it(original); + print("String script: original = '" + original + "', reversed = '" + reversed + "'"); + reversed // Return the result + "#; + + log::info!("Submitting string script to 'string_circle' and awaiting result..."); + + let timeout_duration = Duration::from_secs(10); + let poll_interval = Duration::from_millis(500); + + match client.submit_script_and_await_result( + "string_circle", + script_content.to_string(), + None, + timeout_duration, + poll_interval + ).await { + Ok(details) => { + log::info!("String Worker Example: Task finished. Status: {}, Output: {:?}, Error: {:?}", + details.status, details.output, details.error); + if details.status == "completed" { + assert_eq!(details.output, Some("\"dlrow olleh\"".to_string())); // Rhai strings include quotes in `debug` format + log::info!("String Worker Example: Assertion for output \"dlrow olleh\" passed!"); + Ok(()) + } else { + log::error!("String Worker Example: Task completed with error: {:?}", details.error); + Err(format!("Task failed with error: {:?}", details.error).into()) + } + } + Err(e) => { + log::error!("String Worker Example: Failed to get task result: {}", e); + Err(e.into()) + } + } +} \ No newline at end of file diff --git a/rhai_worker/src/lib.rs b/rhai_worker/src/lib.rs new file mode 100644 index 0000000..45d02c7 --- /dev/null +++ b/rhai_worker/src/lib.rs @@ -0,0 +1,144 @@ +use chrono::Utc; +use clap::Parser; +use log::{debug, error, info}; // Removed warn as it wasn't used in the loop +use redis::AsyncCommands; +use rhai::{Engine, Scope}; // EvalAltResult is not directly returned by the loop +use std::collections::HashMap; // For hgetall result + +// Re-export RhaiTaskDetails from rhai_client if needed by examples, +// or examples can depend on rhai_client directly. +// For now, the worker logic itself just interacts with the hash fields. + +const REDIS_TASK_DETAILS_PREFIX: &str = "rhai_task_details:"; +const REDIS_QUEUE_PREFIX: &str = "rhai_tasks:"; +const BLPOP_TIMEOUT_SECONDS: usize = 5; + +#[derive(Parser, Debug, Clone)] // Added Clone for potential use in examples +#[clap(author, version, about, long_about = None)] +pub struct Args { + #[clap(long, value_parser, default_value = "redis://127.0.0.1/")] + pub redis_url: String, + + #[clap(short, long, value_parser, required = true, num_args = 1..)] + pub circles: Vec, +} + +// This function updates specific fields in the Redis hash. +// It doesn't need to know the full RhaiTaskDetails struct, only the field names. +async fn update_task_status_in_redis( + conn: &mut redis::aio::MultiplexedConnection, + task_id: &str, + status: &str, + output: Option, + error_msg: Option, +) -> redis::RedisResult<()> { + let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id); + let mut updates: Vec<(&str, String)> = vec![ + ("status", status.to_string()), + ("updatedAt", Utc::now().to_rfc3339()), // Ensure this field name matches what rhai_client sets/expects + ]; + if let Some(out) = output { + updates.push(("output", out)); // Ensure this field name matches + } + if let Some(err) = error_msg { + updates.push(("error", err)); // Ensure this field name matches + } + debug!("Updating task {} in Redis with status: {}, updates: {:?}", task_id, status, updates); + conn.hset_multiple::<_, _, _, ()>(&task_key, &updates).await?; + Ok(()) +} + +pub async fn run_worker_loop(engine: Engine, args: Args) -> Result<(), Box> { + info!("Rhai Worker Loop starting. Connecting to Redis at {}", args.redis_url); + info!("Worker Loop will listen for tasks for circles: {:?}", args.circles); + + let redis_client = redis::Client::open(args.redis_url.as_str())?; + let mut redis_conn = redis_client.get_multiplexed_async_connection().await?; + info!("Worker Loop successfully connected to Redis."); + + let queue_keys: Vec = args + .circles + .iter() + .map(|name| format!("{}{}", REDIS_QUEUE_PREFIX, name.replace(" ", "_").to_lowercase())) + .collect(); + + info!("Worker Loop listening on Redis queues: {:?}", queue_keys); + + loop { + let response: Option<(String, String)> = redis_conn + .blpop(&queue_keys, BLPOP_TIMEOUT_SECONDS as f64) + .await?; + + if let Some((queue_name, task_id)) = response { + info!("Worker Loop received task_id: {} from queue: {}", task_id, queue_name); + + let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id); + + let task_details_map: Result, _> = + redis_conn.hgetall(&task_key).await; + + match task_details_map { + Ok(details_map) => { + let script_content_opt = details_map.get("script").cloned(); + + if let Some(script_content) = script_content_opt { + info!("Worker Loop processing task_id: {}. Script: {:.50}...", task_id, script_content); + update_task_status_in_redis(&mut redis_conn, &task_id, "processing", None, None).await?; + + let mut scope = Scope::new(); + // Examples can show how to pre-populate the scope via the engine or here + + match engine.eval_with_scope::(&mut scope, &script_content) { + Ok(result) => { + let output_str = format!("{:?}", result); + info!("Worker Loop task {} completed. Output: {}", task_id, output_str); + update_task_status_in_redis( + &mut redis_conn, + &task_id, + "completed", + Some(output_str), + None, + ) + .await?; + } + Err(e) => { + let error_str = format!("{:?}", *e); // Dereference EvalAltResult + error!("Worker Loop task {} failed. Error: {}", task_id, error_str); + update_task_status_in_redis( + &mut redis_conn, + &task_id, + "error", + None, + Some(error_str), + ) + .await?; + } + } + } else { + error!( + "Worker Loop: Could not find script content for task_id: {} in Redis hash: {}", + task_id, task_key + ); + update_task_status_in_redis( + &mut redis_conn, + &task_id, + "error", + None, + Some("Script content not found in Redis hash".to_string()), + ) + .await?; + } + } + Err(e) => { + error!( + "Worker Loop: Failed to fetch details for task_id: {} from Redis. Error: {:?}", + task_id, e + ); + } + } + } else { + debug!("Worker Loop: BLPOP timed out. No new tasks."); + } + } + // Loop is infinite, Ok(()) is effectively unreachable unless loop breaks +} \ No newline at end of file diff --git a/rhai_worker/src/main.rs b/rhai_worker/src/main.rs new file mode 100644 index 0000000..703caaa --- /dev/null +++ b/rhai_worker/src/main.rs @@ -0,0 +1,18 @@ +use rhai::Engine; +use rhai_worker_lib::{run_worker_loop, Args}; // Use the library name defined in Cargo.toml +use clap::Parser; // Required for Args::parse() to be in scope + +#[tokio::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + + let args = Args::parse(); + + log::info!("Rhai Worker (binary) starting with default engine."); + + let engine = Engine::new(); + // If specific default configurations are needed for the binary's engine, set them up here. + // For example: engine.set_max_operations(1_000_000); + + run_worker_loop(engine, args).await +}