//! # Rhai Client Library //! //! A Redis-based client library for submitting Rhai scripts to distributed worker services //! and awaiting their execution results. This crate implements a request-reply pattern //! using Redis as the message broker. //! //! ## Quick Start //! //! ```rust //! use rhai_dispatcher::{RhaiDispatcherBuilder, RhaiDispatcherError}; //! use std::time::Duration; //! //! #[tokio::main] //! async fn main() -> Result<(), Box> { //! // Build the client //! let client = RhaiDispatcherBuilder::new() //! .caller_id("my-app-instance-1") //! .redis_url("redis://127.0.0.1/") //! .build()?; //! //! // Submit a script and await the result //! let result = client //! .new_play_request() //! .worker_id("worker-1") //! .script(r#""Hello, World!""#) //! .timeout(Duration::from_secs(5)) //! .await_response() //! .await?; //! //! println!("Result: {:?}", result); //! Ok(()) //! } //! ``` use chrono::Utc; use log::{debug, error, info, warn}; // Added error use redis::AsyncCommands; use serde::{Deserialize, Serialize}; use std::time::Duration; // Duration is still used, Instant and sleep were removed use uuid::Uuid; /// Redis namespace prefix for all rhailib-related keys const NAMESPACE_PREFIX: &str = "rhailib:"; /// Represents the complete details and state of a Rhai task execution. /// /// This structure contains all information about a task throughout its lifecycle, /// from submission to completion. It's used for both storing task state in Redis /// and returning results to clients. /// /// # Fields /// /// * `task_id` - Unique identifier for the task (UUID) /// * `script` - The Rhai script content to execute /// * `status` - Current execution status: "pending", "processing", "completed", or "error" /// * `output` - Script execution output (if successful) /// * `error` - Error message (if execution failed) /// * `created_at` - Timestamp when the task was created /// * `updated_at` - Timestamp when the task was last modified /// * `caller_id` - Identifier of the client that submitted the task #[derive(Debug, Serialize, Deserialize, Clone)] pub struct RhaiTaskDetails { #[serde(rename = "taskId")] // Ensure consistent naming with other fields pub task_id: String, pub script: String, pub status: String, // "pending", "processing", "completed", "error" // client_rpc_id: Option is removed. // Worker responses should ideally not include it, or Serde will ignore unknown fields by default. pub output: Option, pub error: Option, // Renamed from error_message for consistency #[serde(rename = "createdAt")] pub created_at: chrono::DateTime, #[serde(rename = "updatedAt")] pub updated_at: chrono::DateTime, #[serde(rename = "callerId")] pub caller_id: String, #[serde(rename = "contextId")] pub context_id: String, #[serde(rename = "workerId")] pub worker_id: String, } /// Comprehensive error type for all possible failures in the Rhai client. /// /// This enum covers all error scenarios that can occur during client operations, /// from Redis connectivity issues to task execution timeouts. #[derive(Debug)] pub enum RhaiDispatcherError { /// Redis connection or operation error RedisError(redis::RedisError), /// JSON serialization/deserialization error SerializationError(serde_json::Error), /// Task execution timeout - contains the task_id that timed out Timeout(String), /// Task not found after submission - contains the task_id (rare occurrence) TaskNotFound(String), /// Context ID is missing ContextIdMissing, } impl From for RhaiDispatcherError { fn from(err: redis::RedisError) -> Self { RhaiDispatcherError::RedisError(err) } } impl From for RhaiDispatcherError { fn from(err: serde_json::Error) -> Self { RhaiDispatcherError::SerializationError(err) } } impl std::fmt::Display for RhaiDispatcherError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { RhaiDispatcherError::RedisError(e) => write!(f, "Redis error: {}", e), RhaiDispatcherError::SerializationError(e) => write!(f, "Serialization error: {}", e), RhaiDispatcherError::Timeout(task_id) => { write!(f, "Timeout waiting for task {} to complete", task_id) } RhaiDispatcherError::TaskNotFound(task_id) => { write!(f, "Task {} not found after submission", task_id) } RhaiDispatcherError::ContextIdMissing => { write!(f, "Context ID is missing") } } } } impl std::error::Error for RhaiDispatcherError {} /// The main client for interacting with the Rhai task execution system. /// /// This client manages Redis connections and provides factory methods for creating /// script execution requests. It maintains a caller ID for task attribution and /// handles all low-level Redis operations. /// /// # Example /// /// ```rust /// use rhai_dispatcher::RhaiDispatcherBuilder; /// /// let client = RhaiDispatcherBuilder::new() /// .caller_id("my-service") /// .redis_url("redis://localhost/") /// .build()?; /// ``` pub struct RhaiDispatcher { redis_client: redis::Client, caller_id: String, worker_id: String, context_id: String, } /// Builder for constructing `RhaiDispatcher` instances with proper configuration. /// /// This builder ensures that all required configuration is provided before /// creating a client instance. It validates the configuration and provides /// sensible defaults where appropriate. /// /// # Required Configuration /// /// - `caller_id`: A unique identifier for this client instance /// /// # Optional Configuration /// /// - `redis_url`: Redis connection URL (defaults to "redis://127.0.0.1/") pub struct RhaiDispatcherBuilder { redis_url: Option, caller_id: String, worker_id: String, context_id: String, } impl RhaiDispatcherBuilder { /// Creates a new `RhaiDispatcherBuilder` with default settings. /// /// The builder starts with no Redis URL (will default to "redis://127.0.0.1/") /// and an empty caller ID (which must be set before building). pub fn new() -> Self { Self { redis_url: None, caller_id: "".to_string(), worker_id: "".to_string(), context_id: "".to_string(), } } /// Sets the caller ID for this client instance. /// /// The caller ID is used to identify which client submitted a task and is /// included in task metadata. This is required and the build will fail if /// not provided. /// /// # Arguments /// /// * `caller_id` - A unique identifier for this client instance pub fn caller_id(mut self, caller_id: &str) -> Self { self.caller_id = caller_id.to_string(); self } /// Sets the circle ID for this client instance. /// /// The circle ID is used to identify which circle's context a task should be executed in. /// This is required at the time the client dispatches a script, but can be set on construction or on script dispatch. /// /// # Arguments /// /// * `context_id` - A unique identifier for this client instance pub fn context_id(mut self, context_id: &str) -> Self { self.context_id = context_id.to_string(); self } /// Sets the worker ID for this client instance. /// /// The worker ID is used to identify which worker a task should be executed on. /// This is required at the time the client dispatches a script, but can be set on construction or on script dispatch. /// /// # Arguments /// /// * `worker_id` - A unique identifier for this client instance pub fn worker_id(mut self, worker_id: &str) -> Self { self.worker_id = worker_id.to_string(); self } /// Sets the Redis connection URL. /// /// If not provided, defaults to "redis://127.0.0.1/". /// /// # Arguments /// /// * `url` - Redis connection URL (e.g., "redis://localhost:6379/0") pub fn redis_url(mut self, url: &str) -> Self { self.redis_url = Some(url.to_string()); self } /// Builds the final `RhaiDispatcher` instance. /// /// This method validates the configuration and creates the Redis client. /// It will return an error if the caller ID is empty or if the Redis /// connection cannot be established. /// /// # Returns /// /// * `Ok(RhaiDispatcher)` - Successfully configured client /// * `Err(RhaiDispatcherError)` - Configuration or connection error pub fn build(self) -> Result { let url = self .redis_url .unwrap_or_else(|| "redis://127.0.0.1/".to_string()); let client = redis::Client::open(url)?; Ok(RhaiDispatcher { redis_client: client, caller_id: self.caller_id, worker_id: self.worker_id, context_id: self.context_id, }) } } /// Representation of a script execution request. /// /// This structure contains all the information needed to execute a Rhai script /// on a worker service, including the script content, target worker, and timeout. #[derive(Debug, Clone)] pub struct PlayRequest { pub id: String, pub worker_id: String, pub context_id: String, pub script: String, pub timeout: Duration, } /// Builder for constructing and submitting script execution requests. /// /// This builder provides a fluent interface for configuring script execution /// parameters and offers two submission modes: fire-and-forget (`submit()`) /// and request-reply (`await_response()`). /// /// # Example /// /// ```rust /// use std::time::Duration; /// /// let result = client /// .new_play_request() /// .worker_id("worker-1") /// .script(r#"print("Hello, World!");"#) /// .timeout(Duration::from_secs(30)) /// .await_response() /// .await?; /// ``` pub struct PlayRequestBuilder<'a> { client: &'a RhaiDispatcher, request_id: String, worker_id: String, context_id: String, caller_id: String, script: String, timeout: Duration, retries: u32, } impl<'a> PlayRequestBuilder<'a> { pub fn new(client: &'a RhaiDispatcher) -> Self { Self { client, request_id: "".to_string(), worker_id: client.worker_id.clone(), context_id: client.context_id.clone(), caller_id: client.caller_id.clone(), script: "".to_string(), timeout: Duration::from_secs(5), retries: 0, } } pub fn request_id(mut self, request_id: &str) -> Self { self.request_id = request_id.to_string(); self } pub fn worker_id(mut self, worker_id: &str) -> Self { self.worker_id = worker_id.to_string(); self } pub fn context_id(mut self, context_id: &str) -> Self { self.context_id = context_id.to_string(); self } pub fn script(mut self, script: &str) -> Self { self.script = script.to_string(); self } pub fn script_path(mut self, script_path: &str) -> Self { self.script = std::fs::read_to_string(script_path).unwrap(); self } pub fn timeout(mut self, timeout: Duration) -> Self { self.timeout = timeout; self } pub fn build(self) -> Result { let request_id = if self.request_id.is_empty() { // Generate a UUID for the request_id Uuid::new_v4().to_string() } else { self.request_id.clone() }; if self.context_id.is_empty() { return Err(RhaiDispatcherError::ContextIdMissing); } if self.caller_id.is_empty() { return Err(RhaiDispatcherError::ContextIdMissing); } let play_request = PlayRequest { id: request_id, worker_id: self.worker_id.clone(), context_id: self.context_id.clone(), script: self.script.clone(), timeout: self.timeout, }; Ok(play_request) } pub async fn submit(self) -> Result<(), RhaiDispatcherError> { // Build the request and submit using self.client println!( "Submitting request {} with timeout {:?}", self.request_id, self.timeout ); self.client.submit_play_request(&self.build()?).await?; Ok(()) } pub async fn await_response(self) -> Result { // Build the request and submit using self.client let result = self .client .submit_play_request_and_await_result(&self.build()?) .await; result } } impl RhaiDispatcher { pub fn new_play_request(&self) -> PlayRequestBuilder { PlayRequestBuilder::new(self) } // Internal helper to submit script details and push to work queue async fn submit_play_request_using_connection( &self, conn: &mut redis::aio::MultiplexedConnection, play_request: &PlayRequest, ) -> Result<(), RhaiDispatcherError> { let now = Utc::now(); let task_key = format!("{}{}", NAMESPACE_PREFIX, play_request.id); let worker_queue_key = format!( "{}{}", NAMESPACE_PREFIX, play_request.worker_id.replace(" ", "_").to_lowercase() ); debug!( "Submitting play request: {} to worker: {} with namespace prefix: {}", play_request.id, play_request.worker_id, NAMESPACE_PREFIX ); let hset_args: Vec<(String, String)> = vec![ ("taskId".to_string(), play_request.id.to_string()), // Add taskId ("script".to_string(), play_request.script.clone()), // script is moved here ("callerId".to_string(), self.caller_id.clone()), // script is moved here ("contextId".to_string(), play_request.context_id.clone()), // script is moved here ("status".to_string(), "pending".to_string()), ("createdAt".to_string(), now.to_rfc3339()), ("updatedAt".to_string(), now.to_rfc3339()), ]; // Ensure hset_args is a slice of tuples (String, String) // The redis crate's hset_multiple expects &[(K, V)] // conn.hset_multiple::<_, String, String, ()>(&task_key, &hset_args).await?; // Simpler: // Explicitly type K, F, V for hset_multiple if inference is problematic. // RV (return value of the command itself) is typically () for HSET type commands. conn.hset_multiple::<_, _, _, ()>(&task_key, &hset_args) .await?; // lpush also infers its types, RV is typically i64 (length of list) or () depending on exact command variant // For `redis::AsyncCommands::lpush`, it's `RedisResult` where R: FromRedisValue // Often this is the length of the list. Let's allow inference or specify if needed. let _: redis::RedisResult = conn.lpush(&worker_queue_key, play_request.id.clone()).await; Ok(()) } // Internal helper to await response from worker async fn await_response_from_connection( &self, conn: &mut redis::aio::MultiplexedConnection, task_key: &String, reply_queue_key: &String, timeout: Duration, ) -> Result { // BLPOP on the reply queue // The timeout for BLPOP is in seconds (integer) let blpop_timeout_secs = timeout.as_secs().max(1); // Ensure at least 1 second for BLPOP timeout match conn .blpop::<&String, Option<(String, String)>>(reply_queue_key, blpop_timeout_secs as f64) .await { Ok(Some((_queue, result_message_str))) => { // Attempt to deserialize the result message into RhaiTaskDetails or a similar structure // For now, we assume the worker sends back a JSON string of RhaiTaskDetails // or at least status, output, error. // Let's refine what the worker sends. For now, assume it's a simplified result. // The worker should ideally send a JSON string that can be parsed into RhaiTaskDetails. // For this example, let's assume the worker sends a JSON string of a simplified result structure. // A more robust approach would be for the worker to send the full RhaiTaskDetails (or relevant parts) // and the client deserializes that. // For now, let's assume the worker sends a JSON string of RhaiTaskDetails. match serde_json::from_str::(&result_message_str) { Ok(details) => { info!( "Task {} finished with status: {}", details.task_id, details.status ); // Optionally, delete the reply queue let _: redis::RedisResult = conn.del(&reply_queue_key).await; Ok(details) } Err(e) => { error!( "Failed to deserialize result message from reply queue: {}", e ); // Optionally, delete the reply queue let _: redis::RedisResult = conn.del(&reply_queue_key).await; Err(RhaiDispatcherError::SerializationError(e)) } } } Ok(None) => { // BLPOP timed out warn!( "Timeout waiting for result on reply queue {} for task {}", reply_queue_key, task_key ); // Optionally, delete the reply queue let _: redis::RedisResult = conn.del(&reply_queue_key).await; Err(RhaiDispatcherError::Timeout(task_key.clone())) } Err(e) => { // Redis error error!( "Redis error on BLPOP for reply queue {}: {}", reply_queue_key, e ); // Optionally, delete the reply queue let _: redis::RedisResult = conn.del(&reply_queue_key).await; Err(RhaiDispatcherError::RedisError(e)) } } } // New method using dedicated reply queue pub async fn submit_play_request( &self, play_request: &PlayRequest, ) -> Result<(), RhaiDispatcherError> { let mut conn = self.redis_client.get_multiplexed_async_connection().await?; self.submit_play_request_using_connection( &mut conn, &play_request, // Pass the task_id parameter ) .await?; Ok(()) } // New method using dedicated reply queue pub async fn submit_play_request_and_await_result( &self, play_request: &PlayRequest, ) -> Result { let mut conn = self.redis_client.get_multiplexed_async_connection().await?; let reply_queue_key = format!("{}:reply:{}", NAMESPACE_PREFIX, play_request.id); // Derived from the passed task_id self.submit_play_request_using_connection( &mut conn, &play_request, // Pass the task_id parameter ) .await?; info!( "Task {} submitted. Waiting for result on queue {} with timeout {:?}...", play_request.id, // This is the UUID reply_queue_key, play_request.timeout ); self.await_response_from_connection( &mut conn, &play_request.id, &reply_queue_key, play_request.timeout, ) .await } // Method to get task status pub async fn get_task_status( &self, task_id: &str, ) -> Result, RhaiDispatcherError> { let mut conn = self.redis_client.get_multiplexed_async_connection().await?; let task_key = format!("{}{}", NAMESPACE_PREFIX, task_id); let result_map: Option> = conn.hgetall(&task_key).await?; match result_map { Some(map) => { // Reconstruct RhaiTaskDetails from HashMap let details = RhaiTaskDetails { task_id: task_id.to_string(), // Use the task_id parameter passed to the function script: map.get("script").cloned().unwrap_or_else(|| { warn!("Task {}: 'script' field missing from Redis hash, defaulting to empty.", task_id); String::new() }), status: map.get("status").cloned().unwrap_or_else(|| { warn!("Task {}: 'status' field missing from Redis hash, defaulting to empty.", task_id); String::new() }), // client_rpc_id is no longer a field in RhaiTaskDetails output: map.get("output").cloned(), error: map.get("error").cloned(), created_at: map.get("createdAt") .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) .map(|dt| dt.with_timezone(&Utc)) .unwrap_or_else(|| { warn!("Task {}: 'createdAt' field missing or invalid in Redis hash, defaulting to Utc::now().", task_id); Utc::now() }), updated_at: map.get("updatedAt") .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) .map(|dt| dt.with_timezone(&Utc)) .unwrap_or_else(|| { warn!("Task {}: 'updatedAt' field missing or invalid in Redis hash, defaulting to Utc::now().", task_id); Utc::now() }), caller_id: map.get("callerId").cloned().expect("callerId field missing from Redis hash"), worker_id: map.get("workerId").cloned().expect("workerId field missing from Redis hash"), context_id: map.get("contextId").cloned().expect("contextId field missing from Redis hash"), }; // It's important to also check if the 'taskId' field exists in the map and matches the input task_id // for data integrity, though the struct construction above uses the input task_id directly. if let Some(redis_task_id) = map.get("taskId") { if redis_task_id != task_id { warn!("Task {}: Mismatch between requested task_id and taskId found in Redis hash ('{}'). Proceeding with requested task_id.", task_id, redis_task_id); } } else { warn!("Task {}: 'taskId' field missing from Redis hash.", task_id); } Ok(Some(details)) } None => Ok(None), } } } #[cfg(test)] mod tests { // use super::*; // Basic tests can be added later, especially once examples are in place. // For now, ensuring it compiles is the priority. #[test] fn it_compiles() { assert_eq!(2 + 2, 4); } }