rhai rpc queue worker and client

This commit is contained in:
timurgordon 2025-06-01 02:10:58 +03:00
parent ec4769a6b0
commit 061aee6f1d
9 changed files with 572 additions and 0 deletions

1
rhai_client/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

17
rhai_client/Cargo.toml Normal file
View File

@ -0,0 +1,17 @@
[package]
name = "rhai_client"
version = "0.1.0"
edition = "2021"
[dependencies]
redis = { version = "0.25.0", features = ["tokio-comp"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
uuid = { version = "1.6", features = ["v4", "serde"] }
chrono = { version = "0.4", features = ["serde"] }
log = "0.4"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] } # For async main in examples, and general async
[dev-dependencies] # For examples later
env_logger = "0.10"
rhai = "1.18.0" # For examples that might need to show engine setup

212
rhai_client/src/lib.rs Normal file
View File

@ -0,0 +1,212 @@
use chrono::Utc;
use log::{debug, info, warn, error}; // Added error
use redis::AsyncCommands;
use tokio::time::{sleep, Instant}; // For polling with timeout
use std::time::Duration;
use serde::{Deserialize, Serialize};
use serde_json::Value; // For client_rpc_id, though not directly used by this client's submit method
use uuid::Uuid;
const REDIS_TASK_DETAILS_PREFIX: &str = "rhai_task_details:";
const REDIS_QUEUE_PREFIX: &str = "rhai_tasks:";
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RhaiTaskDetails {
pub script: String,
pub status: String, // "pending", "processing", "completed", "error"
#[serde(rename = "clientRpcId")]
pub client_rpc_id: Option<Value>, // Kept for compatibility with worker/server, but optional for client
pub output: Option<String>,
pub error: Option<String>, // Renamed from error_message for consistency
#[serde(rename = "createdAt")]
pub created_at: chrono::DateTime<chrono::Utc>,
#[serde(rename = "updatedAt")]
pub updated_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug)]
pub enum RhaiClientError {
RedisError(redis::RedisError),
SerializationError(serde_json::Error),
Timeout(String), // task_id that timed out
TaskNotFound(String), // task_id not found after submission (should be rare)
}
impl From<redis::RedisError> for RhaiClientError {
fn from(err: redis::RedisError) -> Self {
RhaiClientError::RedisError(err)
}
}
impl From<serde_json::Error> for RhaiClientError {
fn from(err: serde_json::Error) -> Self {
RhaiClientError::SerializationError(err)
}
}
impl std::fmt::Display for RhaiClientError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RhaiClientError::RedisError(e) => write!(f, "Redis error: {}", e),
RhaiClientError::SerializationError(e) => write!(f, "Serialization error: {}", e),
RhaiClientError::Timeout(task_id) => write!(f, "Timeout waiting for task {} to complete", task_id),
RhaiClientError::TaskNotFound(task_id) => write!(f, "Task {} not found after submission", task_id),
}
}
}
impl std::error::Error for RhaiClientError {}
pub struct RhaiClient {
redis_client: redis::Client,
}
impl RhaiClient {
pub fn new(redis_url: &str) -> Result<Self, RhaiClientError> {
let client = redis::Client::open(redis_url)?;
Ok(Self { redis_client: client })
}
pub async fn submit_script(
&self,
circle_name: &str,
script: String,
client_rpc_id: Option<Value>, // Optional: if the caller has an RPC ID to associate
) -> Result<String, RhaiClientError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let task_id = Uuid::new_v4().to_string();
let now = Utc::now();
let task_details = RhaiTaskDetails {
script,
status: "pending".to_string(),
client_rpc_id,
output: None,
error: None,
created_at: now,
updated_at: now,
};
let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id);
let queue_key = format!("{}{}", REDIS_QUEUE_PREFIX, circle_name.replace(" ", "_").to_lowercase());
debug!(
"Submitting task_id: {} for circle: {} to queue: {}. Details: {:?}",
task_id, circle_name, queue_key, task_details
);
// Using HSET_MULTIPLE for efficiency if redis-rs supports it directly for struct fields.
// Otherwise, individual HSETs are fine.
// For simplicity and directness with redis-rs async, individual HSETs are used here.
conn.hset::<_, _, _, ()>(&task_key, "script", &task_details.script).await?;
conn.hset::<_, _, _, ()>(&task_key, "status", &task_details.status).await?;
if let Some(rpc_id_val) = &task_details.client_rpc_id {
conn.hset::<_, _, _, ()>(&task_key, "clientRpcId", serde_json::to_string(rpc_id_val)?).await?;
} else {
// Ensure the field exists even if null, or decide if it should be omitted
conn.hset::<_, _, _, ()>(&task_key, "clientRpcId", Value::Null.to_string()).await?;
}
conn.hset::<_, _, _, ()>(&task_key, "createdAt", task_details.created_at.to_rfc3339()).await?;
conn.hset::<_, _, _, ()>(&task_key, "updatedAt", task_details.updated_at.to_rfc3339()).await?;
// output and error fields are initially None, so they might not be set here or set as empty strings/null
conn.lpush::<_, _, ()>(&queue_key, &task_id).await?;
Ok(task_id)
}
// Optional: A method to check task status, similar to what circle_server_ws polling does.
// This could be useful for a client that wants to poll for results itself.
pub async fn get_task_status(&self, task_id: &str) -> Result<Option<RhaiTaskDetails>, RhaiClientError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id);
let result_map: Option<std::collections::HashMap<String, String>> = conn.hgetall(&task_key).await?;
match result_map {
Some(map) => {
// Reconstruct RhaiTaskDetails from HashMap
// This is a simplified reconstruction; ensure all fields are handled robustly
let details = RhaiTaskDetails {
script: map.get("script").cloned().unwrap_or_default(),
status: map.get("status").cloned().unwrap_or_default(),
client_rpc_id: map.get("clientRpcId")
.and_then(|s| serde_json::from_str(s).ok())
.or(Some(Value::Null)), // Default to Value::Null if missing or parse error
output: map.get("output").cloned(),
error: map.get("error").cloned(),
created_at: map.get("createdAt")
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(Utc::now), // Provide a default
updated_at: map.get("updatedAt")
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(Utc::now), // Provide a default
};
Ok(Some(details))
}
None => Ok(None),
}
}
pub async fn submit_script_and_await_result(
&self,
circle_name: &str,
script: String,
client_rpc_id: Option<Value>,
timeout: Duration,
poll_interval: Duration,
) -> Result<RhaiTaskDetails, RhaiClientError> {
let task_id = self.submit_script(circle_name, script, client_rpc_id).await?;
info!("Task {} submitted. Polling for result with timeout {:?}...", task_id, timeout);
let start_time = Instant::now();
loop {
if start_time.elapsed() > timeout {
warn!("Timeout waiting for task {}", task_id);
return Err(RhaiClientError::Timeout(task_id.clone()));
}
match self.get_task_status(&task_id).await {
Ok(Some(details)) => {
debug!("Polled task {}: status = {}", task_id, details.status);
if details.status == "completed" || details.status == "error" {
info!("Task {} finished with status: {}", task_id, details.status);
return Ok(details);
}
// else status is "pending" or "processing", continue polling
}
Ok(None) => {
// This case should ideally not happen if submit_script succeeded and worker is running,
// unless the task details were manually deleted from Redis.
warn!("Task {} not found during polling. This might indicate an issue.", task_id);
// Depending on desired robustness, could retry a few times or return an error immediately.
// For now, let it continue polling up to timeout, or return a specific error.
// If it persists, it's effectively a timeout or a lost task.
// Let's consider it a lost task if it's not found after a short while post-submission.
if start_time.elapsed() > Duration::from_secs(5) { // Arbitrary short duration
return Err(RhaiClientError::TaskNotFound(task_id.clone()));
}
}
Err(e) => {
// Log error but continue polling unless it's a critical Redis error
error!("Error polling task {}: {}. Will retry.", task_id, e);
}
}
sleep(poll_interval).await;
}
}
}
#[cfg(test)]
mod tests {
// use super::*;
// Basic tests can be added later, especially once examples are in place.
// For now, ensuring it compiles is the priority.
#[test]
fn it_compiles() {
assert_eq!(2 + 2, 4);
}
}

1
rhai_worker/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

27
rhai_worker/Cargo.toml Normal file
View File

@ -0,0 +1,27 @@
[package]
name = "rhai_worker"
version = "0.1.0"
edition = "2021"
[lib]
name = "rhai_worker_lib" # Can be different from package name, or same
path = "src/lib.rs"
[[bin]]
name = "rhai_worker"
path = "src/main.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
redis = { version = "0.25.0", features = ["tokio-comp"] }
rhai = { version = "1.18.0", features = ["sync", "decimal"] } # Added "decimal" for broader script support
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }
log = "0.4"
env_logger = "0.10"
clap = { version = "4.4", features = ["derive"] }
uuid = { version = "1.6", features = ["v4", "serde"] } # Though task_id is string, uuid might be useful
chrono = { version = "0.4", features = ["serde"] }
rhai_client = { path = "../rhai_client" }

View File

@ -0,0 +1,76 @@
use rhai::Engine;
use rhai_client::RhaiClient; // To submit tasks
use rhai_worker_lib::{run_worker_loop, Args as WorkerArgs}; // To run the worker
use std::time::Duration;
use tokio::time::sleep;
// Custom function for Rhai
fn add(a: i64, b: i64) -> i64 {
a + b
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
log::info!("Starting Math Worker Example...");
// 1. Configure and start the Rhai Worker with a custom engine
let mut math_engine = Engine::new();
math_engine.register_fn("add", add);
log::info!("Custom 'add' function registered with Rhai engine for Math Worker.");
let worker_args = WorkerArgs {
redis_url: "redis://127.0.0.1/".to_string(),
circles: vec!["math_circle".to_string()], // Worker listens on a specific queue
};
let worker_args_clone = worker_args.clone(); // Clone for the worker task
tokio::spawn(async move {
log::info!("Math Worker task starting...");
if let Err(e) = run_worker_loop(math_engine, worker_args_clone).await {
log::error!("Math Worker loop failed: {}", e);
}
});
// Give the worker a moment to start and connect
sleep(Duration::from_secs(1)).await;
// 2. Use RhaiClient to submit a script to the "math_circle"
let client = RhaiClient::new("redis://127.0.0.1/")?;
let script_content = r#"
let x = 10;
let y = add(x, 32); // Use the custom registered function
print("Math script: 10 + 32 = " + y);
y // Return the result
"#;
log::info!("Submitting math script to 'math_circle' and awaiting result...");
let timeout_duration = Duration::from_secs(10);
let poll_interval = Duration::from_millis(500);
match client.submit_script_and_await_result(
"math_circle",
script_content.to_string(),
None,
timeout_duration,
poll_interval
).await {
Ok(details) => {
log::info!("Math Worker Example: Task finished. Status: {}, Output: {:?}, Error: {:?}",
details.status, details.output, details.error);
if details.status == "completed" {
assert_eq!(details.output, Some("42".to_string()));
log::info!("Math Worker Example: Assertion for output 42 passed!");
Ok(())
} else {
log::error!("Math Worker Example: Task completed with error: {:?}", details.error);
Err(format!("Task failed with error: {:?}", details.error).into())
}
}
Err(e) => {
log::error!("Math Worker Example: Failed to get task result: {}", e);
Err(e.into())
}
}
}

View File

@ -0,0 +1,76 @@
use rhai::Engine;
use rhai_client::RhaiClient; // To submit tasks
use rhai_worker_lib::{run_worker_loop, Args as WorkerArgs}; // To run the worker
use std::time::Duration;
use tokio::time::sleep;
// Custom function for Rhai
fn reverse_string(s: String) -> String {
s.chars().rev().collect()
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
log::info!("Starting String Worker Example...");
// 1. Configure and start the Rhai Worker with a custom engine
let mut string_engine = Engine::new();
string_engine.register_fn("reverse_it", reverse_string);
log::info!("Custom 'reverse_it' function registered with Rhai engine for String Worker.");
let worker_args = WorkerArgs {
redis_url: "redis://127.0.0.1/".to_string(),
circles: vec!["string_circle".to_string()], // Worker listens on a specific queue
};
let worker_args_clone = worker_args.clone();
tokio::spawn(async move {
log::info!("String Worker task starting...");
if let Err(e) = run_worker_loop(string_engine, worker_args_clone).await {
log::error!("String Worker loop failed: {}", e);
}
});
// Give the worker a moment to start and connect
sleep(Duration::from_secs(1)).await;
// 2. Use RhaiClient to submit a script to the "string_circle"
let client = RhaiClient::new("redis://127.0.0.1/")?;
let script_content = r#"
let original = "hello world";
let reversed = reverse_it(original);
print("String script: original = '" + original + "', reversed = '" + reversed + "'");
reversed // Return the result
"#;
log::info!("Submitting string script to 'string_circle' and awaiting result...");
let timeout_duration = Duration::from_secs(10);
let poll_interval = Duration::from_millis(500);
match client.submit_script_and_await_result(
"string_circle",
script_content.to_string(),
None,
timeout_duration,
poll_interval
).await {
Ok(details) => {
log::info!("String Worker Example: Task finished. Status: {}, Output: {:?}, Error: {:?}",
details.status, details.output, details.error);
if details.status == "completed" {
assert_eq!(details.output, Some("\"dlrow olleh\"".to_string())); // Rhai strings include quotes in `debug` format
log::info!("String Worker Example: Assertion for output \"dlrow olleh\" passed!");
Ok(())
} else {
log::error!("String Worker Example: Task completed with error: {:?}", details.error);
Err(format!("Task failed with error: {:?}", details.error).into())
}
}
Err(e) => {
log::error!("String Worker Example: Failed to get task result: {}", e);
Err(e.into())
}
}
}

144
rhai_worker/src/lib.rs Normal file
View File

@ -0,0 +1,144 @@
use chrono::Utc;
use clap::Parser;
use log::{debug, error, info}; // Removed warn as it wasn't used in the loop
use redis::AsyncCommands;
use rhai::{Engine, Scope}; // EvalAltResult is not directly returned by the loop
use std::collections::HashMap; // For hgetall result
// Re-export RhaiTaskDetails from rhai_client if needed by examples,
// or examples can depend on rhai_client directly.
// For now, the worker logic itself just interacts with the hash fields.
const REDIS_TASK_DETAILS_PREFIX: &str = "rhai_task_details:";
const REDIS_QUEUE_PREFIX: &str = "rhai_tasks:";
const BLPOP_TIMEOUT_SECONDS: usize = 5;
#[derive(Parser, Debug, Clone)] // Added Clone for potential use in examples
#[clap(author, version, about, long_about = None)]
pub struct Args {
#[clap(long, value_parser, default_value = "redis://127.0.0.1/")]
pub redis_url: String,
#[clap(short, long, value_parser, required = true, num_args = 1..)]
pub circles: Vec<String>,
}
// This function updates specific fields in the Redis hash.
// It doesn't need to know the full RhaiTaskDetails struct, only the field names.
async fn update_task_status_in_redis(
conn: &mut redis::aio::MultiplexedConnection,
task_id: &str,
status: &str,
output: Option<String>,
error_msg: Option<String>,
) -> redis::RedisResult<()> {
let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id);
let mut updates: Vec<(&str, String)> = vec![
("status", status.to_string()),
("updatedAt", Utc::now().to_rfc3339()), // Ensure this field name matches what rhai_client sets/expects
];
if let Some(out) = output {
updates.push(("output", out)); // Ensure this field name matches
}
if let Some(err) = error_msg {
updates.push(("error", err)); // Ensure this field name matches
}
debug!("Updating task {} in Redis with status: {}, updates: {:?}", task_id, status, updates);
conn.hset_multiple::<_, _, _, ()>(&task_key, &updates).await?;
Ok(())
}
pub async fn run_worker_loop(engine: Engine, args: Args) -> Result<(), Box<dyn std::error::Error>> {
info!("Rhai Worker Loop starting. Connecting to Redis at {}", args.redis_url);
info!("Worker Loop will listen for tasks for circles: {:?}", args.circles);
let redis_client = redis::Client::open(args.redis_url.as_str())?;
let mut redis_conn = redis_client.get_multiplexed_async_connection().await?;
info!("Worker Loop successfully connected to Redis.");
let queue_keys: Vec<String> = args
.circles
.iter()
.map(|name| format!("{}{}", REDIS_QUEUE_PREFIX, name.replace(" ", "_").to_lowercase()))
.collect();
info!("Worker Loop listening on Redis queues: {:?}", queue_keys);
loop {
let response: Option<(String, String)> = redis_conn
.blpop(&queue_keys, BLPOP_TIMEOUT_SECONDS as f64)
.await?;
if let Some((queue_name, task_id)) = response {
info!("Worker Loop received task_id: {} from queue: {}", task_id, queue_name);
let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id);
let task_details_map: Result<HashMap<String, String>, _> =
redis_conn.hgetall(&task_key).await;
match task_details_map {
Ok(details_map) => {
let script_content_opt = details_map.get("script").cloned();
if let Some(script_content) = script_content_opt {
info!("Worker Loop processing task_id: {}. Script: {:.50}...", task_id, script_content);
update_task_status_in_redis(&mut redis_conn, &task_id, "processing", None, None).await?;
let mut scope = Scope::new();
// Examples can show how to pre-populate the scope via the engine or here
match engine.eval_with_scope::<rhai::Dynamic>(&mut scope, &script_content) {
Ok(result) => {
let output_str = format!("{:?}", result);
info!("Worker Loop task {} completed. Output: {}", task_id, output_str);
update_task_status_in_redis(
&mut redis_conn,
&task_id,
"completed",
Some(output_str),
None,
)
.await?;
}
Err(e) => {
let error_str = format!("{:?}", *e); // Dereference EvalAltResult
error!("Worker Loop task {} failed. Error: {}", task_id, error_str);
update_task_status_in_redis(
&mut redis_conn,
&task_id,
"error",
None,
Some(error_str),
)
.await?;
}
}
} else {
error!(
"Worker Loop: Could not find script content for task_id: {} in Redis hash: {}",
task_id, task_key
);
update_task_status_in_redis(
&mut redis_conn,
&task_id,
"error",
None,
Some("Script content not found in Redis hash".to_string()),
)
.await?;
}
}
Err(e) => {
error!(
"Worker Loop: Failed to fetch details for task_id: {} from Redis. Error: {:?}",
task_id, e
);
}
}
} else {
debug!("Worker Loop: BLPOP timed out. No new tasks.");
}
}
// Loop is infinite, Ok(()) is effectively unreachable unless loop breaks
}

18
rhai_worker/src/main.rs Normal file
View File

@ -0,0 +1,18 @@
use rhai::Engine;
use rhai_worker_lib::{run_worker_loop, Args}; // Use the library name defined in Cargo.toml
use clap::Parser; // Required for Args::parse() to be in scope
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let args = Args::parse();
log::info!("Rhai Worker (binary) starting with default engine.");
let engine = Engine::new();
// If specific default configurations are needed for the binary's engine, set them up here.
// For example: engine.set_max_operations(1_000_000);
run_worker_loop(engine, args).await
}