linting and fmt

This commit is contained in:
timurgordon 2025-06-19 02:32:56 +03:00
parent 4e717bc054
commit 1a3fa6242d
24 changed files with 1024 additions and 620 deletions

View File

@ -1,9 +1,9 @@
use criterion::{criterion_group, criterion_main, Criterion};
use redis::{Client, Commands};
use std::process::{Command, Child, Stdio};
use std::time::Duration;
use std::thread;
use std::fs;
use std::process::{Child, Command, Stdio};
use std::thread;
use std::time::Duration;
const REDIS_URL: &str = "redis://127.0.0.1:6379";
const CIRCLE_NAME: &str = "bench_circle";
@ -25,11 +25,20 @@ fn cleanup_redis() -> Result<(), redis::RedisError> {
fn start_worker() -> Result<Child, std::io::Error> {
Command::new("cargo")
.args(&["run", "--release", "--bin", "worker", "--",
"--circle", CIRCLE_NAME,
"--redis-url", REDIS_URL,
"--worker-id", "bench_worker",
"--preserve-tasks"])
.args(&[
"run",
"--release",
"--bin",
"worker",
"--",
"--circle",
CIRCLE_NAME,
"--redis-url",
REDIS_URL,
"--worker-id",
"bench_worker",
"--preserve-tasks",
])
.current_dir("src/worker")
.stdout(Stdio::null())
.stderr(Stdio::null())
@ -82,7 +91,11 @@ fn wait_and_measure(task_key: &str) -> Result<f64, redis::RedisError> {
match status.as_deref() {
Some("completed") | Some("error") => {
println!("Task {} completed with status: {}", task_key, status.as_deref().unwrap_or("unknown"));
println!(
"Task {} completed with status: {}",
task_key,
status.as_deref().unwrap_or("unknown")
);
let created_at: u64 = conn.hget(task_key, "createdAt")?;
let updated_at: u64 = conn.hget(task_key, "updatedAt")?;
return Ok((updated_at - created_at) as f64 * 1000.0); // Convert to milliseconds
@ -99,7 +112,7 @@ fn wait_and_measure(task_key: &str) -> Result<f64, redis::RedisError> {
if start_time.elapsed() > timeout {
return Err(redis::RedisError::from((
redis::ErrorKind::IoError,
"Timeout waiting for task completion"
"Timeout waiting for task completion",
)));
}
}
@ -141,7 +154,12 @@ fn wait_for_batch_completion(task_keys: &[String]) -> Result<f64, Box<dyn std::e
// Check timeout
if start_time.elapsed() > timeout {
return Err(format!("Timeout waiting for batch completion. Completed: {}/{}", completed_count, task_keys.len()).into());
return Err(format!(
"Timeout waiting for batch completion. Completed: {}/{}",
completed_count,
task_keys.len()
)
.into());
}
thread::sleep(Duration::from_millis(100));
@ -183,7 +201,8 @@ fn bench_single_rhai_task(c: &mut Criterion) {
// Create 100 tasks and measure average latency using Redis timestamps
let task_keys = create_batch_tasks(5000).expect("Failed to create batch tasks");
let avg_latency_ms = wait_for_batch_completion(&task_keys).expect("Failed to measure batch completion");
let avg_latency_ms = wait_for_batch_completion(&task_keys)
.expect("Failed to measure batch completion");
// Convert average latency to duration
total_latency += Duration::from_millis(avg_latency_ms as u64);

View File

@ -27,7 +27,3 @@ path = "example_string_worker.rs"
[[bin]]
name = "dedicated_reply_queue_demo"
path = "dedicated_reply_queue_demo.rs"
[[bin]]
name = "lua_client_demo"
path = "lua_client_demo.rs"

View File

@ -1,4 +1,4 @@
use log::{info, error, debug};
use log::{debug, error, info};
use rhai::Engine;
use rhai_client::{RhaiClient, RhaiClientError}; // RhaiTaskDetails is now used for its fields
use rhailib_worker::spawn_rhai_worker;
@ -55,7 +55,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let task_timeout = Duration::from_secs(10);
let task_id = Uuid::new_v4().to_string(); // Generate a unique task_id
info!("Submitting script to circle '{}' with task_id '{}' and awaiting result...", circle_name, task_id);
info!(
"Submitting script to circle '{}' with task_id '{}' and awaiting result...",
circle_name, task_id
);
info!("Script: {}", script_to_run);
match client
@ -64,7 +67,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
task_id.clone(), // Pass the generated task_id
script_to_run.to_string(),
task_timeout,
None // public_key
None, // public_key
)
.await
{
@ -72,7 +75,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("Task {} completed successfully!", details.task_id);
debug!("Full Task Details: {:#?}", details);
// The task_id is now part of the returned RhaiTaskDetails struct.
info!("Received details for task_id: {}, script: {}", details.task_id, details.script);
info!(
"Received details for task_id: {}, script: {}",
details.task_id, details.script
);
info!("Status: {}", details.status);
if let Some(output) = details.output {
info!("Output: {}", output); // Expected: 42

View File

@ -57,7 +57,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
log::error!("Failed to read script file '{}': {}", script_path_str, e);
// Attempt to read from an alternative path if run via `cargo run --example`
// where current dir might be the crate root.
let alt_script_path = Path::new(file!()).parent().unwrap().join("auth_script.rhai");
let alt_script_path = Path::new(file!())
.parent()
.unwrap()
.join("auth_script.rhai");
log::info!("Attempting alternative script path: {:?}", alt_script_path);
fs::read_to_string(&alt_script_path)?
}
@ -106,9 +109,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
);
// Basic assertion for expected output
if caller_pk == "admin_pk" {
assert_eq!(details.output, Some("Access Granted: Welcome Admin!".to_string()));
assert_eq!(
details.output,
Some("Access Granted: Welcome Admin!".to_string())
);
} else if caller_pk == "user_pk" {
assert_eq!(details.output, Some("Limited Access: Welcome User!".to_string()));
assert_eq!(
details.output,
Some("Limited Access: Welcome User!".to_string())
);
}
}
Err(e) => {

View File

@ -2,9 +2,9 @@ use rhai::Engine;
use rhai_client::RhaiClient; // To submit tasks
use uuid::Uuid; // For generating task_id
use rhailib_worker::spawn_rhai_worker;
use std::time::Duration;
use tokio::time::sleep;
use rhailib_worker::spawn_rhai_worker;
// Custom function for Rhai
fn add(a: i64, b: i64) -> i64 {
@ -51,22 +51,32 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let timeout_duration = Duration::from_secs(10);
let task_id = Uuid::new_v4().to_string();
match client.submit_script_and_await_result(
"math_circle",
script_content.to_string(),
task_id, // Pass the generated task_id
timeout_duration,
None
).await {
match client
.submit_script_and_await_result(
"math_circle",
script_content.to_string(),
task_id, // Pass the generated task_id
timeout_duration,
None,
)
.await
{
Ok(details) => {
log::info!("Math Worker Example: Task finished. Status: {}, Output: {:?}, Error: {:?}",
details.status, details.output, details.error);
log::info!(
"Math Worker Example: Task finished. Status: {}, Output: {:?}, Error: {:?}",
details.status,
details.output,
details.error
);
if details.status == "completed" {
assert_eq!(details.output, Some("42".to_string()));
log::info!("Math Worker Example: Assertion for output 42 passed!");
Ok(())
} else {
log::error!("Math Worker Example: Task completed with error: {:?}", details.error);
log::error!(
"Math Worker Example: Task completed with error: {:?}",
details.error
);
Err(format!("Task failed with error: {:?}", details.error).into())
}
}

View File

@ -2,9 +2,9 @@ use rhai::Engine;
use rhai_client::RhaiClient; // To submit tasks
use uuid::Uuid; // For generating task_id
use rhailib_worker::spawn_rhai_worker;
use std::time::Duration;
use tokio::time::sleep;
use rhailib_worker::spawn_rhai_worker;
// Custom function for Rhai
fn reverse_string(s: String) -> String {
@ -51,22 +51,32 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let timeout_duration = Duration::from_secs(10);
let task_id = Uuid::new_v4().to_string();
match client.submit_script_and_await_result(
"string_circle",
script_content.to_string(),
task_id, // Pass the generated task_id
timeout_duration,
None
).await {
match client
.submit_script_and_await_result(
"string_circle",
script_content.to_string(),
task_id, // Pass the generated task_id
timeout_duration,
None,
)
.await
{
Ok(details) => {
log::info!("String Worker Example: Task finished. Status: {}, Output: {:?}, Error: {:?}",
details.status, details.output, details.error);
log::info!(
"String Worker Example: Task finished. Status: {}, Output: {:?}, Error: {:?}",
details.status,
details.output,
details.error
);
if details.status == "completed" {
assert_eq!(details.output, Some("dlrow olleh".to_string()));
log::info!("String Worker Example: Assertion for output \"dlrow olleh\" passed!");
Ok(())
} else {
log::error!("String Worker Example: Task completed with error: {:?}", details.error);
log::error!(
"String Worker Example: Task completed with error: {:?}",
details.error
);
Err(format!("Task failed with error: {:?}", details.error).into())
}
}

View File

@ -1,10 +1,12 @@
use rhai_client::{RhaiClient, RhaiTaskDetails}; // Assuming RhaiTaskDetails might be part of the success path, though we expect error
use std::time::Duration;
use log::info;
use rhai_client::RhaiClient; // Assuming RhaiTaskDetails might be part of the success path, though we expect error
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::builder().filter_level(log::LevelFilter::Info).init();
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.init();
let client = RhaiClient::new("redis://127.0.0.1/")?;
info!("RhaiClient created.");
@ -53,8 +55,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("Timeout Example PASSED: Correctly received RhaiClientError::Timeout for task_id: {}", task_id);
// Ensure the elapsed time is close to the timeout duration
// Allow for some buffer for processing
assert!(elapsed >= very_short_timeout && elapsed < very_short_timeout + Duration::from_secs(1), "Elapsed time {:?} should be close to timeout {:?}", elapsed, very_short_timeout);
info!("Elapsed time {:?} is consistent with timeout duration {:?}.", elapsed, very_short_timeout);
assert!(
elapsed >= very_short_timeout
&& elapsed < very_short_timeout + Duration::from_secs(1),
"Elapsed time {:?} should be close to timeout {:?}",
elapsed,
very_short_timeout
);
info!(
"Elapsed time {:?} is consistent with timeout duration {:?}.",
elapsed, very_short_timeout
);
Ok(())
}
other_error => {
@ -62,7 +73,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
"Timeout Example FAILED: Expected RhaiClientError::Timeout, but got other error: {:?}",
other_error
);
Err(format!("Expected RhaiClientError::Timeout, got other error: {:?}", other_error).into())
Err(format!(
"Expected RhaiClientError::Timeout, got other error: {:?}",
other_error
)
.into())
}
}
}

View File

@ -1,8 +1,8 @@
use chrono::Utc;
use log::{debug, info, warn, error}; // Added error
use log::{debug, error, info, warn}; // Added error
use redis::AsyncCommands;
use std::time::Duration; // Duration is still used, Instant and sleep were removed
use serde::{Deserialize, Serialize};
use std::time::Duration; // Duration is still used, Instant and sleep were removed
use uuid::Uuid;
const REDIS_TASK_DETAILS_PREFIX: &str = "rhai_task_details:";
@ -33,7 +33,7 @@ pub struct RhaiTaskDetails {
pub enum RhaiClientError {
RedisError(redis::RedisError),
SerializationError(serde_json::Error),
Timeout(String), // task_id that timed out
Timeout(String), // task_id that timed out
TaskNotFound(String), // task_id not found after submission (should be rare)
}
@ -54,8 +54,12 @@ impl std::fmt::Display for RhaiClientError {
match self {
RhaiClientError::RedisError(e) => write!(f, "Redis error: {}", e),
RhaiClientError::SerializationError(e) => write!(f, "Serialization error: {}", e),
RhaiClientError::Timeout(task_id) => write!(f, "Timeout waiting for task {} to complete", task_id),
RhaiClientError::TaskNotFound(task_id) => write!(f, "Task {} not found after submission", task_id),
RhaiClientError::Timeout(task_id) => {
write!(f, "Timeout waiting for task {} to complete", task_id)
}
RhaiClientError::TaskNotFound(task_id) => {
write!(f, "Task {} not found after submission", task_id)
}
}
}
}
@ -69,7 +73,9 @@ pub struct RhaiClient {
impl RhaiClient {
pub fn new(redis_url: &str) -> Result<Self, RhaiClientError> {
let client = redis::Client::open(redis_url)?;
Ok(Self { redis_client: client })
Ok(Self {
redis_client: client,
})
}
// Internal helper to submit script details and push to work queue
@ -86,7 +92,11 @@ impl RhaiClient {
let now = Utc::now();
let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id);
let worker_queue_key = format!("{}{}", REDIS_QUEUE_PREFIX, circle_name.replace(" ", "_").to_lowercase());
let worker_queue_key = format!(
"{}{}",
REDIS_QUEUE_PREFIX,
circle_name.replace(" ", "_").to_lowercase()
);
debug!(
"Preparing task_id: {} for circle: {} to worker_queue: {}. Script: {}, replyToQueue: {:?}, publicKey: {:?}",
@ -95,7 +105,7 @@ impl RhaiClient {
let mut hset_args: Vec<(String, String)> = vec![
("taskId".to_string(), task_id.to_string()), // Add taskId
("script".to_string(), script), // script is moved here
("script".to_string(), script), // script is moved here
("status".to_string(), "pending".to_string()),
("createdAt".to_string(), now.to_rfc3339()),
("updatedAt".to_string(), now.to_rfc3339()),
@ -103,10 +113,12 @@ impl RhaiClient {
// clientRpcId field and its corresponding hset_args logic are removed.
if let Some(queue_name) = &reply_to_queue_name { // Use the passed parameter
if let Some(queue_name) = &reply_to_queue_name {
// Use the passed parameter
hset_args.push(("replyToQueue".to_string(), queue_name.clone()));
}
if let Some(pk) = &public_key { // Use the passed parameter
if let Some(pk) = &public_key {
// Use the passed parameter
hset_args.push(("publicKey".to_string(), pk.clone()));
}
@ -116,15 +128,14 @@ impl RhaiClient {
// Simpler:
// Explicitly type K, F, V for hset_multiple if inference is problematic.
// RV (return value of the command itself) is typically () for HSET type commands.
conn.hset_multiple::<_, _, _, ()>(&task_key, &hset_args).await?;
conn.hset_multiple::<_, _, _, ()>(&task_key, &hset_args)
.await?;
// lpush also infers its types, RV is typically i64 (length of list) or () depending on exact command variant
// For `redis::AsyncCommands::lpush`, it's `RedisResult<R>` where R: FromRedisValue
// Often this is the length of the list. Let's allow inference or specify if needed.
let _: redis::RedisResult<i64> = conn.lpush(&worker_queue_key, task_id).await;
Ok(())
}
@ -133,13 +144,15 @@ impl RhaiClient {
&self,
circle_name: &str,
script: String,
// client_rpc_id: Option<Value> is removed
public_key: Option<String>,
) -> Result<String, RhaiClientError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let task_id = Uuid::new_v4().to_string(); // task_id is generated here for fire-and-forget
debug!("Client submitting script (fire-and-forget) with new task_id: {} to circle: {}", task_id, circle_name);
debug!(
"Client submitting script (fire-and-forget) with new task_id: {} to circle: {}",
task_id, circle_name
);
self.submit_script_to_worker_queue(
&mut conn,
@ -157,11 +170,15 @@ impl RhaiClient {
// Optional: A method to check task status, similar to what circle_server_ws polling does.
// This could be useful for a client that wants to poll for results itself.
pub async fn get_task_status(&self, task_id: &str) -> Result<Option<RhaiTaskDetails>, RhaiClientError> {
pub async fn get_task_status(
&self,
task_id: &str,
) -> Result<Option<RhaiTaskDetails>, RhaiClientError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id);
let result_map: Option<std::collections::HashMap<String, String>> = conn.hgetall(&task_key).await?;
let result_map: Option<std::collections::HashMap<String, String>> =
conn.hgetall(&task_key).await?;
match result_map {
Some(map) => {
@ -222,8 +239,7 @@ impl RhaiClient {
) -> Result<RhaiTaskDetails, RhaiClientError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
// let task_id = Uuid::new_v4().to_string(); // Removed, task_id is a parameter
let reply_to_queue_name =
format!("{}{}", REDIS_REPLY_QUEUE_PREFIX, task_id); // Derived from the passed task_id
let reply_to_queue_name = format!("{}{}", REDIS_REPLY_QUEUE_PREFIX, task_id); // Derived from the passed task_id
self.submit_script_to_worker_queue(
&mut conn,
@ -247,7 +263,13 @@ impl RhaiClient {
// The timeout for BLPOP is in seconds (integer)
let blpop_timeout_secs = timeout.as_secs().max(1); // Ensure at least 1 second for BLPOP timeout
match conn.blpop::<&String, Option<(String, String)>>(&reply_to_queue_name, blpop_timeout_secs as f64).await {
match conn
.blpop::<&String, Option<(String, String)>>(
&reply_to_queue_name,
blpop_timeout_secs as f64,
)
.await
{
Ok(Some((_queue, result_message_str))) => {
// Attempt to deserialize the result message into RhaiTaskDetails or a similar structure
// For now, we assume the worker sends back a JSON string of RhaiTaskDetails
@ -267,21 +289,29 @@ impl RhaiClient {
}
Err(e) => {
error!("Task {}: Failed to deserialize result message from reply queue: {}. Message: {}", task_id, e, result_message_str);
// Optionally, delete the reply queue
// Optionally, delete the reply queue
let _: redis::RedisResult<i32> = conn.del(&reply_to_queue_name).await;
Err(RhaiClientError::SerializationError(e))
}
}
}
Ok(None) => { // BLPOP timed out
warn!("Timeout waiting for result on reply queue {} for task {}", reply_to_queue_name, task_id);
// Optionally, delete the reply queue
Ok(None) => {
// BLPOP timed out
warn!(
"Timeout waiting for result on reply queue {} for task {}",
reply_to_queue_name, task_id
);
// Optionally, delete the reply queue
let _: redis::RedisResult<i32> = conn.del(&reply_to_queue_name).await;
Err(RhaiClientError::Timeout(task_id))
}
Err(e) => { // Redis error
error!("Redis error on BLPOP for reply queue {}: {}", reply_to_queue_name, e);
// Optionally, delete the reply queue
Err(e) => {
// Redis error
error!(
"Redis error on BLPOP for reply queue {}: {}",
reply_to_queue_name, e
);
// Optionally, delete the reply queue
let _: redis::RedisResult<i32> = conn.del(&reply_to_queue_name).await;
Err(RhaiClientError::RedisError(e))
}

View File

@ -1,9 +1,6 @@
use std::sync::Arc;
use std::path::Path;
use rhai::{Engine, Scope};
use heromodels::models::calendar::{Calendar, Event, Attendee, AttendanceStatus};
use engine::{create_heromodels_engine, eval_file};
use engine::mock_db::{create_mock_db, seed_mock_db};
use engine::{create_heromodels_engine, eval_file};
use rhai::Engine;
fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Calendar Rhai Example");
@ -39,26 +36,29 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}
println!("\nScript executed successfully!");
Ok(())
},
}
Err(err) => {
eprintln!("\nError running script: {}", err);
Err(Box::new(std::io::Error::new(std::io::ErrorKind::Other, err.to_string())))
Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
err.to_string(),
)))
}
}
}
// Register timestamp helper functions with the engine
fn register_timestamp_helpers(engine: &mut Engine) {
use chrono::{DateTime, Utc, TimeZone, NaiveDateTime};
use chrono::{TimeZone, Utc};
// Function to get current timestamp
engine.register_fn("timestamp_now", || {
Utc::now().timestamp() as i64
});
engine.register_fn("timestamp_now", || Utc::now().timestamp() as i64);
// Function to format a timestamp
engine.register_fn("format_timestamp", |ts: i64| {
let dt = Utc.timestamp_opt(ts, 0).single()
let dt = Utc
.timestamp_opt(ts, 0)
.single()
.expect("Invalid timestamp");
dt.format("%Y-%m-%d %H:%M:%S UTC").to_string()
});

View File

@ -1,11 +1,7 @@
use std::sync::Arc;
use std::path::Path;
use rhai::{Engine, Scope};
use heromodels::models::finance::account::Account;
use heromodels::models::finance::asset::{Asset, AssetType};
use heromodels::models::finance::marketplace::{Listing, Bid, ListingStatus, ListingType, BidStatus};
use engine::{create_heromodels_engine, eval_file};
use engine::mock_db::{create_mock_db, seed_mock_db};
use engine::{create_heromodels_engine, eval_file};
use rhai::Engine;
use std::path::Path;
fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Finance Rhai Example");
@ -40,26 +36,29 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
}
println!("\nScript executed successfully!");
Ok(())
},
}
Err(err) => {
eprintln!("\nError running script: {}", err);
Err(Box::new(std::io::Error::new(std::io::ErrorKind::Other, err.to_string())))
Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
err.to_string(),
)))
}
}
}
// Register timestamp helper functions with the engine
fn register_timestamp_helpers(engine: &mut Engine) {
use chrono::{DateTime, Utc, TimeZone, NaiveDateTime};
use chrono::{TimeZone, Utc};
// Function to get current timestamp
engine.register_fn("timestamp_now", || {
Utc::now().timestamp() as i64
});
engine.register_fn("timestamp_now", || Utc::now().timestamp() as i64);
// Function to format a timestamp
engine.register_fn("format_timestamp", |ts: i64| {
let dt = Utc.timestamp_opt(ts, 0).single()
let dt = Utc
.timestamp_opt(ts, 0)
.single()
.expect("Invalid timestamp");
dt.format("%Y-%m-%d %H:%M:%S UTC").to_string()
});

View File

@ -1,9 +1,9 @@
use std::path::Path;
use rhai::{Scope};
use heromodels::models::flow::{Flow, FlowStep, SignatureRequirement};
use engine::{create_heromodels_engine, eval_file};
use engine::mock_db::{create_mock_db, seed_mock_db};
use engine::{create_heromodels_engine, eval_file};
use heromodels::models::flow::{Flow, FlowStep, SignatureRequirement};
use heromodels_core::Model;
use rhai::Scope;
use std::path::Path;
fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Flow Rhai Example");
@ -34,10 +34,13 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("\nScript returned: {:?}", result);
}
println!("\nScript executed successfully!");
},
}
Err(err) => {
eprintln!("\nError running script: {}", err);
return Err(Box::new(std::io::Error::new(std::io::ErrorKind::Other, err.to_string())));
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
err.to_string(),
)));
}
}
@ -52,7 +55,11 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let result = engine.eval::<Flow>("new_flow(0, \"Direct Rust Flow\")");
match result {
Ok(mut flow) => {
println!("Created flow from Rust: {} (ID: {})", flow.name, flow.get_id());
println!(
"Created flow from Rust: {} (ID: {})",
flow.name,
flow.get_id()
);
// Set flow status using the builder pattern
flow = flow.status("active".to_string());
@ -63,13 +70,20 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
match result {
Ok(mut step) => {
println!("Created flow step from Rust: Step Order {} (ID: {})",
step.step_order, step.get_id());
println!(
"Created flow step from Rust: Step Order {} (ID: {})",
step.step_order,
step.get_id()
);
// Set step description
step = step.description("Direct Rust Step".to_string());
println!("Set step description to: {}",
step.description.clone().unwrap_or_else(|| "None".to_string()));
println!(
"Set step description to: {}",
step.description
.clone()
.unwrap_or_else(|| "None".to_string())
);
// Create a signature requirement using the Rhai function
let result = engine.eval::<SignatureRequirement>(
@ -78,41 +92,66 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
match result {
Ok(req) => {
println!("Created signature requirement from Rust: Public Key {} (ID: {})",
req.public_key, req.get_id());
println!(
"Created signature requirement from Rust: Public Key {} (ID: {})",
req.public_key,
req.get_id()
);
// Add the step to the flow using the builder pattern
flow = flow.add_step(step);
println!("Added step to flow. Flow now has {} steps", flow.steps.len());
println!(
"Added step to flow. Flow now has {} steps",
flow.steps.len()
);
// Save the flow to the database using the Rhai function
let save_flow_script = "fn save_it(f) { return db::save_flow(f); }";
let save_flow_ast = engine.compile(save_flow_script).unwrap();
let result = engine.call_fn::<Flow>(&mut scope, &save_flow_ast, "save_it", (flow,));
let result = engine.call_fn::<Flow>(
&mut scope,
&save_flow_ast,
"save_it",
(flow,),
);
match result {
Ok(saved_flow) => {
println!("Saved flow to database with ID: {}", saved_flow.get_id());
},
println!(
"Saved flow to database with ID: {}",
saved_flow.get_id()
);
}
Err(err) => eprintln!("Error saving flow: {}", err),
}
// Save the signature requirement to the database using the Rhai function
let save_req_script = "fn save_it(r) { return db::save_signature_requirement(r); }";
let save_req_script =
"fn save_it(r) { return db::save_signature_requirement(r); }";
let save_req_ast = engine.compile(save_req_script).unwrap();
let result = engine.call_fn::<SignatureRequirement>(&mut scope, &save_req_ast, "save_it", (req,));
let result = engine.call_fn::<SignatureRequirement>(
&mut scope,
&save_req_ast,
"save_it",
(req,),
);
match result {
Ok(saved_req) => {
println!("Saved signature requirement to database with ID: {}", saved_req.get_id());
},
Err(err) => eprintln!("Error saving signature requirement: {}", err),
println!(
"Saved signature requirement to database with ID: {}",
saved_req.get_id()
);
}
Err(err) => {
eprintln!("Error saving signature requirement: {}", err)
}
}
},
}
Err(err) => eprintln!("Error creating signature requirement: {}", err),
}
},
}
Err(err) => eprintln!("Error creating flow step: {}", err),
}
},
}
Err(err) => eprintln!("Error creating flow: {}", err),
}

View File

@ -1,4 +1,4 @@
use rhai::{Engine, AST, Scope, EvalAltResult}; // Added EvalAltResult
use rhai::{Engine, EvalAltResult, Scope, AST}; // Added EvalAltResult
use std::sync::Arc;
// use std::sync::Mutex; // Unused
// use std::collections::HashMap; // Unused
@ -15,8 +15,8 @@ pub fn create_heromodels_engine(db: Arc<OurDB>) -> Engine {
// Configure engine settings
engine.set_max_expr_depths(128, 128);
engine.set_max_string_size(10 * 1024 * 1024); // 10 MB
engine.set_max_array_size(10 * 1024); // 10K elements
engine.set_max_map_size(10 * 1024); // 10K elements
engine.set_max_array_size(10 * 1024); // 10K elements
engine.set_max_map_size(10 * 1024); // 10K elements
// Register all heromodels Rhai modules
register_all_modules(&mut engine, db);
@ -58,22 +58,24 @@ pub fn register_all_modules(engine: &mut Engine, db: Arc<OurDB>) {
}
/// Evaluate a Rhai script string
pub fn eval_script(engine: &Engine, script: &str) -> Result<rhai::Dynamic, Box<rhai::EvalAltResult>> {
pub fn eval_script(
engine: &Engine,
script: &str,
) -> Result<rhai::Dynamic, Box<rhai::EvalAltResult>> {
engine.eval::<rhai::Dynamic>(script)
}
/// Evaluate a Rhai script file
pub fn eval_file(engine: &Engine, file_path: &Path) -> Result<rhai::Dynamic, Box<rhai::EvalAltResult>> {
pub fn eval_file(
engine: &Engine,
file_path: &Path,
) -> Result<rhai::Dynamic, Box<rhai::EvalAltResult>> {
match fs::read_to_string(file_path) {
Ok(script_content) => {
engine.eval::<rhai::Dynamic>(&script_content)
}
Err(io_err) => {
Err(Box::new(EvalAltResult::ErrorSystem(
format!("Failed to read script file: {}", file_path.display()),
Box::new(io_err),
)))
}
Ok(script_content) => engine.eval::<rhai::Dynamic>(&script_content),
Err(io_err) => Err(Box::new(EvalAltResult::ErrorSystem(
format!("Failed to read script file: {}", file_path.display()),
Box::new(io_err),
))),
}
}
@ -83,6 +85,10 @@ pub fn compile_script(engine: &Engine, script: &str) -> Result<AST, Box<rhai::Ev
}
/// Run a compiled Rhai script AST
pub fn run_ast(engine: &Engine, ast: &AST, scope: &mut Scope) -> Result<rhai::Dynamic, Box<rhai::EvalAltResult>> {
pub fn run_ast(
engine: &Engine,
ast: &AST,
scope: &mut Scope,
) -> Result<rhai::Dynamic, Box<rhai::EvalAltResult>> {
engine.eval_ast_with_scope(scope, ast)
}

View File

@ -1,26 +1,27 @@
use std::sync::Arc;
use std::env;
use heromodels::db::hero::OurDB;
use heromodels::db::{Db, Collection}; // Import both Db and Collection traits
use heromodels::models::calendar::{Calendar, Event, Attendee, AttendanceStatus};
use heromodels_core::Model; // Import Model trait to use build method
use chrono::Utc;
use heromodels::models::userexample::User;
use heromodels::db::hero::OurDB;
use heromodels::db::{Collection, Db}; // Import both Db and Collection traits
use heromodels::models::calendar::{Calendar, Event};
use heromodels_core::Model; // Import Model trait to use build method
use std::env;
use std::sync::Arc;
// Import finance models
use heromodels::models::finance::account::Account;
use heromodels::models::finance::asset::{Asset, AssetType};
use heromodels::models::finance::marketplace::{Listing, Bid, ListingStatus, ListingType, BidStatus};
use heromodels::models::finance::marketplace::{Listing, ListingType};
// Conditionally import other modules based on features
#[cfg(feature = "flow")]
use heromodels::models::flow::{Flow, FlowStep, SignatureRequirement};
#[cfg(feature = "legal")]
use heromodels::models::legal::{Contract, ContractRevision, ContractSigner, ContractStatus, SignerStatus};
use heromodels::models::legal::{
Contract, ContractRevision, ContractSigner, ContractStatus, SignerStatus,
};
#[cfg(feature = "projects")]
use heromodels::models::projects::{Project, Status as ProjectStatus, Priority, ItemType};
use heromodels::models::projects::{ItemType, Priority, Project, Status as ProjectStatus};
/// Create a mock in-memory database for examples
pub fn create_mock_db() -> Arc<OurDB> {
@ -63,12 +64,12 @@ fn seed_calendar_data(db: Arc<OurDB>) {
let mut calendar = Calendar::new(None, "Work Calendar".to_string());
calendar.description = Some("My work schedule".to_string());
// Store the calendar in the database
let (calendar_id, updated_calendar) = db.collection::<Calendar>()
.expect("Failed to get Calendar collection")
.set(&calendar)
.expect("Failed to store calendar");
// Store the calendar in the database
let (calendar_id, updated_calendar) = db
.collection::<Calendar>()
.expect("Failed to get Calendar collection")
.set(&calendar)
.expect("Failed to store calendar");
// Create an event
let now = Utc::now().timestamp();
@ -96,7 +97,8 @@ fn seed_calendar_data(db: Arc<OurDB>) {
// let event = event.build();
// Store the event in the database first to get its ID
let (event_id, updated_event) = db.collection()
let (event_id, updated_event) = db
.collection()
.expect("Failed to get Event collection")
.set(&event)
.expect("Failed to store event");
@ -105,14 +107,21 @@ fn seed_calendar_data(db: Arc<OurDB>) {
calendar = calendar.add_event(event_id as i64);
// Store the calendar in the database
let (calendar_id, updated_calendar) = db.collection::<Calendar>()
let (calendar_id, updated_calendar) = db
.collection::<Calendar>()
.expect("Failed to get Calendar collection")
.set(&calendar)
.expect("Failed to store calendar");
println!("Mock database seeded with calendar data:");
println!(" - Added calendar: {} (ID: {})", updated_calendar.name, updated_calendar.base_data.id);
println!(" - Added event: {} (ID: {})", updated_event.title, updated_event.base_data.id);
println!(
" - Added calendar: {} (ID: {})",
updated_calendar.name, updated_calendar.base_data.id
);
println!(
" - Added event: {} (ID: {})",
updated_event.title, updated_event.base_data.id
);
}
/// Seed the mock database with flow data
@ -135,35 +144,53 @@ fn seed_flow_data(db: Arc<OurDB>) {
step2 = step2.status("pending".to_string());
// Add signature requirements
let mut req1 = SignatureRequirement::new(0, 1, "Legal Team".to_string(), "Please review this document".to_string());
let mut req2 = SignatureRequirement::new(0, 2, "Department Head".to_string(), "Please approve this document".to_string());
let mut req1 = SignatureRequirement::new(
0,
1,
"Legal Team".to_string(),
"Please review this document".to_string(),
);
let mut req2 = SignatureRequirement::new(
0,
2,
"Department Head".to_string(),
"Please approve this document".to_string(),
);
// Add steps to flow
flow = flow.add_step(step1);
flow = flow.add_step(step2);
// Store in the database
let (_, updated_flow) = db.collection::<Flow>()
let (_, updated_flow) = db
.collection::<Flow>()
.expect("Failed to get Flow collection")
.set(&flow)
.expect("Failed to store flow");
// Store signature requirements in the database
let (_, updated_req1) = db.collection::<SignatureRequirement>()
let (_, updated_req1) = db
.collection::<SignatureRequirement>()
.expect("Failed to get SignatureRequirement collection")
.set(&req1)
.expect("Failed to store signature requirement");
let (_, updated_req2) = db.collection::<SignatureRequirement>()
let (_, updated_req2) = db
.collection::<SignatureRequirement>()
.expect("Failed to get SignatureRequirement collection")
.set(&req2)
.expect("Failed to store signature requirement");
println!("Mock database seeded with flow data:");
println!(" - Added flow: {} (ID: {})", updated_flow.name, updated_flow.base_data.id);
println!(
" - Added flow: {} (ID: {})",
updated_flow.name, updated_flow.base_data.id
);
println!(" - Added {} steps", updated_flow.steps.len());
println!(" - Added signature requirements with IDs: {} and {}",
updated_req1.base_data.id, updated_req2.base_data.id);
println!(
" - Added signature requirements with IDs: {} and {}",
updated_req1.base_data.id, updated_req2.base_data.id
);
}
/// Seed the mock database with legal data
@ -191,16 +218,22 @@ fn seed_legal_data(db: Arc<OurDB>) {
contract.add_signer(signer2);
// Store in the database
let (_, updated_contract) = db.collection::<Contract>()
let (_, updated_contract) = db
.collection::<Contract>()
.expect("Failed to get Contract collection")
.set(&contract)
.expect("Failed to store contract");
println!("Mock database seeded with legal data:");
println!(" - Added contract: {} (ID: {})", updated_contract.name, updated_contract.base_data.id);
println!(" - Added {} revisions and {} signers",
updated_contract.revisions.len(),
updated_contract.signers.len());
println!(
" - Added contract: {} (ID: {})",
updated_contract.name, updated_contract.base_data.id
);
println!(
" - Added {} revisions and {} signers",
updated_contract.revisions.len(),
updated_contract.signers.len()
);
}
/// Seed the mock database with projects data
@ -219,17 +252,26 @@ fn seed_projects_data(db: Arc<OurDB>) {
project.add_tag("web".to_string());
// Store in the database
let (_, updated_project) = db.collection::<Project>()
let (_, updated_project) = db
.collection::<Project>()
.expect("Failed to get Project collection")
.set(&project)
.expect("Failed to store project");
println!("Mock database seeded with projects data:");
println!(" - Added project: {} (ID: {})", updated_project.name, updated_project.base_data.id);
println!(" - Status: {}, Priority: {}", updated_project.status, updated_project.priority);
println!(" - Added {} members and {} tags",
updated_project.member_ids.len(),
updated_project.tags.len());
println!(
" - Added project: {} (ID: {})",
updated_project.name, updated_project.base_data.id
);
println!(
" - Status: {}, Priority: {}",
updated_project.status, updated_project.priority
);
println!(
" - Added {} members and {} tags",
updated_project.member_ids.len(),
updated_project.tags.len()
);
}
/// Seed the mock database with finance data
fn seed_finance_data(db: Arc<OurDB>) {
@ -243,7 +285,8 @@ fn seed_finance_data(db: Arc<OurDB>) {
.pubkey("0xabcdef1234567890abcdef1234567890abcdef12");
// Store the account in the database
let (account_id, updated_account) = db.collection::<Account>()
let (account_id, updated_account) = db
.collection::<Account>()
.expect("Failed to get Account collection")
.set(&account)
.expect("Failed to store account");
@ -258,7 +301,8 @@ fn seed_finance_data(db: Arc<OurDB>) {
.decimals(18);
// Store the token asset in the database
let (token_id, updated_token) = db.collection::<Asset>()
let (token_id, updated_token) = db
.collection::<Asset>()
.expect("Failed to get Asset collection")
.set(&token_asset)
.expect("Failed to store token asset");
@ -273,7 +317,8 @@ fn seed_finance_data(db: Arc<OurDB>) {
.decimals(0);
// Store the NFT asset in the database
let (nft_id, updated_nft) = db.collection::<Asset>()
let (nft_id, updated_nft) = db
.collection::<Asset>()
.expect("Failed to get Asset collection")
.set(&nft_asset)
.expect("Failed to store NFT asset");
@ -283,7 +328,8 @@ fn seed_finance_data(db: Arc<OurDB>) {
account = account.add_asset(nft_id);
// Update the account in the database
let (_, updated_account) = db.collection::<Account>()
let (_, updated_account) = db
.collection::<Account>()
.expect("Failed to get Account collection")
.set(&account)
.expect("Failed to store updated account");
@ -302,14 +348,27 @@ fn seed_finance_data(db: Arc<OurDB>) {
.add_tag("collectible".to_string());
// Store the listing in the database
let (listing_id, updated_listing) = db.collection::<Listing>()
let (listing_id, updated_listing) = db
.collection::<Listing>()
.expect("Failed to get Listing collection")
.set(&listing)
.expect("Failed to store listing");
println!("Mock database seeded with finance data:");
println!(" - Added account: {} (ID: {})", updated_account.name, updated_account.base_data.id);
println!(" - Added token asset: {} (ID: {})", updated_token.name, updated_token.base_data.id);
println!(" - Added NFT asset: {} (ID: {})", updated_nft.name, updated_nft.base_data.id);
println!(" - Added listing: {} (ID: {})", updated_listing.title, updated_listing.base_data.id);
println!(
" - Added account: {} (ID: {})",
updated_account.name, updated_account.base_data.id
);
println!(
" - Added token asset: {} (ID: {})",
updated_token.name, updated_token.base_data.id
);
println!(
" - Added NFT asset: {} (ID: {})",
updated_nft.name, updated_nft.base_data.id
);
println!(
" - Added listing: {} (ID: {})",
updated_listing.title, updated_listing.base_data.id
);
}

View File

@ -7,8 +7,8 @@ use crate::plot;
use crate::tasks::{self, RhaiTask};
use redis::{AsyncCommands, Client as RedisClient};
use std::collections::HashMap;
use tokio::time::{sleep, Duration};
use tokio::signal;
use tokio::time::{sleep, Duration};
const REDIS_URL: &str = "redis://127.0.0.1/";
const POLLING_INTERVAL_MILLISECONDS: u64 = 10; // Increased polling interval for SCAN
@ -24,7 +24,10 @@ pub async fn start_monitoring(worker_names: &[String]) -> Result<()> {
let ping_result: String = redis::cmd("PING").query_async(&mut con).await?;
tracing::info!("Redis PING response: {}", ping_result);
tracing::info!("Starting live monitor. Configured workers: {:?}. Press Ctrl+C to exit.", worker_names);
tracing::info!(
"Starting live monitor. Configured workers: {:?}. Press Ctrl+C to exit.",
worker_names
);
loop {
tokio::select! {

View File

@ -11,7 +11,6 @@ struct Args {
/// List of worker names to monitor, comma-separated
#[clap(short, long, value_delimiter = ',', required = true, num_args = 1..)]
workers: Vec<String>,
// TODO: Add other options like Redis connection details if not using a config file or env vars.
}

View File

@ -1,6 +1,6 @@
// rhailib/monitor/src/tasks.rs
use anyhow::Result;
use prettytable::{Cell, Row, Table, format};
use prettytable::{format, Cell, Row, Table};
use std::collections::HashMap;
#[derive(Debug, Clone)]
@ -32,7 +32,10 @@ impl RhaiTask {
RhaiTask {
id: task_id,
script: get_opt_string("script"),
status: details.get("status").cloned().unwrap_or_else(|| "unknown".to_string()),
status: details
.get("status")
.cloned()
.unwrap_or_else(|| "unknown".to_string()),
created_at: get_opt_string("createdAt"),
updated_at: get_opt_string("updatedAt"),
client_rpc_id: get_opt_string("clientRpcId"),
@ -66,7 +69,7 @@ pub async fn display_task_table(tasks: &[RhaiTask]) -> Result<()> {
for task in tasks {
let details_str = match (&task.output, &task.error) {
(Some(out), None) => format!("Output: {:.50}", out), // Truncate for display
(None, Some(err)) => format!("Error: {:.50}", err), // Truncate for display
(None, Some(err)) => format!("Error: {:.50}", err), // Truncate for display
(Some(out), Some(err)) => format!("Output: {:.30}... Error: {:.30}...", out, err),
(None, None) => "N/A".to_string(),
};

View File

@ -1,20 +1,24 @@
use anyhow::Context;
use rhai_client::{RhaiClient, RhaiClientError, RhaiTaskDetails};
use std::env;
use std::time::Duration;
use std::sync::Arc;
use anyhow::Context;
use tracing_subscriber::EnvFilter;
use std::time::Duration;
use tokio::sync::mpsc;
use tracing_subscriber::EnvFilter;
use worker_lib::spawn_rhai_worker;
use engine::create_heromodels_engine;
use heromodels::db::hero::OurDB;
use std::path::PathBuf;
use worker_lib::spawn_rhai_worker;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env().add_directive("connect_and_play=info".parse().unwrap()).add_directive("rhai_client=info".parse().unwrap()))
.with_env_filter(
EnvFilter::from_default_env()
.add_directive("connect_and_play=info".parse().unwrap())
.add_directive("rhai_client=info".parse().unwrap()),
)
.init();
let args: Vec<String> = env::args().collect();
@ -41,13 +45,23 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let worker_circle_name_for_task = worker_name.clone();
let db_path_for_task = db_path_str.clone();
log::info!("[Main] Spawning worker for circle '{}' with DB path '{}'", worker_circle_name_for_task, db_path_for_task);
log::info!(
"[Main] Spawning worker for circle '{}' with DB path '{}'",
worker_circle_name_for_task,
db_path_for_task
);
let worker_join_handle = tokio::spawn(async move {
log::info!("[BG Worker] Starting for circle '{}' on Redis '{}'", worker_circle_name_for_task, worker_redis_url);
log::info!(
"[BG Worker] Starting for circle '{}' on Redis '{}'",
worker_circle_name_for_task,
worker_redis_url
);
// The `reset: true` in OurDB::new handles pre-cleanup if the directory exists.
let db = Arc::new(OurDB::new(&db_path_for_task, true)
.expect("Failed to create temp DB for example worker"));
let db = Arc::new(
OurDB::new(&db_path_for_task, true)
.expect("Failed to create temp DB for example worker"),
);
let mut engine = create_heromodels_engine(db);
engine.set_max_operations(0);
engine.set_max_expr_depths(0, 0);
@ -59,18 +73,30 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
engine,
worker_redis_url.clone(),
shutdown_rx, // Pass the receiver from main
false, // preserve_tasks
).await {
log::error!("[BG Worker] Failed to spawn or worker error for circle '{}': {}", worker_circle_name_for_task, e);
false, // preserve_tasks
)
.await
{
log::error!(
"[BG Worker] Failed to spawn or worker error for circle '{}': {}",
worker_circle_name_for_task,
e
);
} else {
log::info!("[BG Worker] Worker for circle '{}' shut down gracefully.", worker_circle_name_for_task);
log::info!(
"[BG Worker] Worker for circle '{}' shut down gracefully.",
worker_circle_name_for_task
);
}
});
// Give the worker a moment to start up
tokio::time::sleep(Duration::from_secs(1)).await;
println!("Initializing RhaiClient for Redis at {} to target worker '{}'...", redis_url, worker_name);
println!(
"Initializing RhaiClient for Redis at {} to target worker '{}'...",
redis_url, worker_name
);
let client = RhaiClient::new(&redis_url)
.with_context(|| format!("Failed to create RhaiClient for Redis URL: {}", redis_url))?;
println!("RhaiClient initialized.");
@ -79,7 +105,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("\nSending script:\n```rhai\n{}\n```", script);
let timeout = Duration::from_secs(30);
match client.submit_script_and_await_result(&worker_name, script.to_string(), None, timeout).await {
match client
.submit_script_and_await_result(&worker_name, script.to_string(), None, timeout)
.await
{
Ok(task_details) => {
println!("\nWorker response:");
if let Some(ref output) = task_details.output {
@ -89,24 +118,35 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
eprintln!("Error: {}", error_msg);
}
if task_details.output.is_none() && task_details.error.is_none() {
println!("Worker finished with no explicit output or error. Status: {}", task_details.status);
println!(
"Worker finished with no explicit output or error. Status: {}",
task_details.status
);
}
}
Err(e) => match e {
RhaiClientError::Timeout(task_id) => {
eprintln!("\nError: Script execution timed out for task_id: {}.", task_id);
eprintln!(
"\nError: Script execution timed out for task_id: {}.",
task_id
);
}
RhaiClientError::RedisError(redis_err) => {
eprintln!("\nError: Redis communication failed: {}. Check Redis connection and server status.", redis_err);
eprintln!(
"\nError: Redis communication failed: {}. Check Redis connection and server status.",
redis_err
);
}
RhaiClientError::SerializationError(serde_err) => {
eprintln!("\nError: Failed to serialize/deserialize task data: {}.", serde_err);
eprintln!(
"\nError: Failed to serialize/deserialize task data: {}.",
serde_err
);
}
RhaiClientError::TaskNotFound(task_id) => {
eprintln!("\nError: Task {} not found after submission.", task_id);
}
/* All RhaiClientError variants are handled, so _ arm is not strictly needed
unless RhaiClientError becomes non-exhaustive in the future. */
eprintln!("\nError: Task {} not found after submission.", task_id);
} /* All RhaiClientError variants are handled, so _ arm is not strictly needed
unless RhaiClientError becomes non-exhaustive in the future. */
},
}
@ -114,7 +154,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Send shutdown signal to the worker
if let Err(e) = shutdown_tx.send(()).await {
eprintln!("[Main] Failed to send shutdown signal to worker: {} (worker might have already exited or an error occurred)", e);
eprintln!(
"[Main] Failed to send shutdown signal to worker: {} (worker might have already exited or an error occurred)",
e
);
}
// Wait for the worker to finish
@ -126,15 +169,28 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
// Clean up the database directory
log::info!("[Main] Cleaning up database directory: {}", db_path.display());
log::info!(
"[Main] Cleaning up database directory: {}",
db_path.display()
);
if db_path.exists() {
if let Err(e) = std::fs::remove_dir_all(&db_path) {
eprintln!("[Main] Failed to remove database directory '{}': {}", db_path.display(), e);
eprintln!(
"[Main] Failed to remove database directory '{}': {}",
db_path.display(),
e
);
} else {
log::info!("[Main] Successfully removed database directory: {}", db_path.display());
log::info!(
"[Main] Successfully removed database directory: {}",
db_path.display()
);
}
} else {
log::info!("[Main] Database directory '{}' not found, no cleanup needed.", db_path.display());
log::info!(
"[Main] Database directory '{}' not found, no cleanup needed.",
db_path.display()
);
}
println!("Example fully completed and cleaned up.");

View File

@ -1,13 +1,13 @@
use tracing_subscriber::EnvFilter;
use anyhow::Context;
use rhai_client::{RhaiClient, RhaiClientError, RhaiTaskDetails};
use rustyline::error::ReadlineError;
use rustyline::{DefaultEditor, Config, EditMode};
use rustyline::{Config, DefaultEditor, EditMode};
use std::env;
use std::fs;
use std::process::Command;
use std::env;
use std::time::Duration;
use tempfile::Builder as TempFileBuilder;
use anyhow::Context;
use tracing_subscriber::EnvFilter;
// Default timeout for script execution
const DEFAULT_SCRIPT_TIMEOUT_SECONDS: u64 = 30;
@ -17,11 +17,17 @@ async fn execute_script(client: &RhaiClient, circle_name: &str, script_content:
println!("Script is empty, not sending.");
return;
}
println!("Sending script to worker '{}':\n---\n{}\n---", circle_name, script_content);
println!(
"Sending script to worker '{}':\n---\n{}\n---",
circle_name, script_content
);
let timeout = Duration::from_secs(DEFAULT_SCRIPT_TIMEOUT_SECONDS);
match client.submit_script_and_await_result(circle_name, script_content, None, timeout).await {
match client
.submit_script_and_await_result(circle_name, script_content, None, timeout)
.await
{
Ok(task_details) => {
if let Some(output) = &task_details.output {
println!("worker: {}", output);
@ -30,36 +36,58 @@ async fn execute_script(client: &RhaiClient, circle_name: &str, script_content:
eprintln!("Worker error: {}", error_msg);
}
if task_details.output.is_none() && task_details.error.is_none() {
println!("Worker finished with no explicit output or error. Status: {}", task_details.status);
println!(
"Worker finished with no explicit output or error. Status: {}",
task_details.status
);
}
}
Err(e) => match e {
RhaiClientError::Timeout(task_id) => {
eprintln!("Error: Script execution timed out for task_id: {}.", task_id);
eprintln!(
"Error: Script execution timed out for task_id: {}.",
task_id
);
}
RhaiClientError::RedisError(redis_err) => {
eprintln!("Error: Redis communication failed: {}. Check Redis connection and server status.", redis_err);
eprintln!(
"Error: Redis communication failed: {}. Check Redis connection and server status.",
redis_err
);
}
RhaiClientError::SerializationError(serde_err) => {
eprintln!("Error: Failed to serialize/deserialize task data: {}.", serde_err);
eprintln!(
"Error: Failed to serialize/deserialize task data: {}.",
serde_err
);
}
RhaiClientError::TaskNotFound(task_id) => {
eprintln!("Error: Task {} not found after submission (this should be rare).", task_id);
eprintln!(
"Error: Task {} not found after submission (this should be rare).",
task_id
);
}
},
}
}
async fn run_repl(redis_url: String, circle_name: String) -> anyhow::Result<()> {
println!("Initializing Rhai REPL for worker '{}' via Redis at {}...", circle_name, redis_url);
println!(
"Initializing Rhai REPL for worker '{}' via Redis at {}...",
circle_name, redis_url
);
let client = RhaiClient::new(&redis_url)
.with_context(|| format!("Failed to create RhaiClient for Redis URL: {}", redis_url))?;
// No explicit connect() needed for rhai_client, connection is handled per-operation or pooled.
println!("RhaiClient initialized. Ready to send scripts to worker '{}'.", circle_name);
println!("Type Rhai scripts, '.edit' to use $EDITOR, '.run <path>' to execute a file, or 'exit'/'quit'.");
println!(
"RhaiClient initialized. Ready to send scripts to worker '{}'.",
circle_name
);
println!(
"Type Rhai scripts, '.edit' to use $EDITOR, '.run <path>' to execute a file, or 'exit'/'quit'."
);
println!("Vi mode enabled for input line.");
let config = Config::builder()
@ -105,11 +133,14 @@ async fn run_repl(redis_url: String, circle_name: String) -> anyhow::Result<()>
let editor_executable = editor_parts.next().unwrap_or("vi"); // Default to vi if $EDITOR is empty string
let editor_args: Vec<&str> = editor_parts.collect();
println!("Launching editor: '{}' with args: {:?} for script editing. Save and exit editor to execute.", editor_executable, editor_args);
println!(
"Launching editor: '{}' with args: {:?} for script editing. Save and exit editor to execute.",
editor_executable, editor_args
);
let mut command = Command::new(editor_executable);
command.args(editor_args); // Add any arguments from $EDITOR (like -w)
command.arg(&temp_path); // Add the temp file path as the last argument
command.arg(&temp_path); // Add the temp file path as the last argument
let status = command.status();
@ -119,11 +150,19 @@ async fn run_repl(redis_url: String, circle_name: String) -> anyhow::Result<()>
Ok(script_content) => {
execute_script(&client, &circle_name, script_content).await;
}
Err(e) => eprintln!("Error reading temp file {:?}: {}", temp_path, e),
Err(e) => {
eprintln!("Error reading temp file {:?}: {}", temp_path, e)
}
}
}
Ok(exit_status) => eprintln!("Editor exited with status: {}. Script not executed.", exit_status),
Err(e) => eprintln!("Failed to launch editor '{}': {}. Ensure it's in your PATH.", editor_executable, e), // Changed 'editor' to 'editor_executable'
Ok(exit_status) => eprintln!(
"Editor exited with status: {}. Script not executed.",
exit_status
),
Err(e) => eprintln!(
"Failed to launch editor '{}': {}. Ensure it's in your PATH.",
editor_executable, e
), // Changed 'editor' to 'editor_executable'
}
// temp_file is automatically deleted when it goes out of scope
} else if input.starts_with(".run ") || input.starts_with("run ") {
@ -145,11 +184,13 @@ async fn run_repl(redis_url: String, circle_name: String) -> anyhow::Result<()>
}
// rl.add_history_entry(line.as_str()) is handled by auto_add_history(true)
}
Err(ReadlineError::Interrupted) => { // Ctrl-C
Err(ReadlineError::Interrupted) => {
// Ctrl-C
println!("Input interrupted. Type 'exit' or 'quit' to close.");
continue;
}
Err(ReadlineError::Eof) => { // Ctrl-D
Err(ReadlineError::Eof) => {
// Ctrl-D
println!("Exiting REPL (EOF).");
break;
}
@ -172,7 +213,11 @@ async fn run_repl(redis_url: String, circle_name: String) -> anyhow::Result<()>
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env().add_directive("ui_repl=info".parse()?).add_directive("rhai_client=info".parse()?))
.with_env_filter(
EnvFilter::from_default_env()
.add_directive("ui_repl=info".parse()?)
.add_directive("rhai_client=info".parse()?),
)
.init();
let args: Vec<String> = env::args().collect();
@ -189,18 +234,29 @@ async fn main() -> anyhow::Result<()> {
args[2].clone()
} else {
let default_circle = "default_worker".to_string();
println!("No worker/circle name provided. Defaulting to: {}", default_circle);
println!(
"No worker/circle name provided. Defaulting to: {}",
default_circle
);
default_circle
};
println!("Usage: {} [redis_url] [worker_name]", args.get(0).map_or("ui_repl", |s| s.as_str()));
println!("Example: {} redis://127.0.0.1/ my_rhai_worker", args.get(0).map_or("ui_repl", |s| s.as_str()));
println!(
"Usage: {} [redis_url] [worker_name]",
args.get(0).map_or("ui_repl", |s| s.as_str())
);
println!(
"Example: {} redis://127.0.0.1/ my_rhai_worker",
args.get(0).map_or("ui_repl", |s| s.as_str())
);
// Basic validation for Redis URL (scheme)
// A more robust validation might involve trying to parse it with redis::ConnectionInfo
if !redis_url_str.starts_with("redis://") {
eprintln!("Warning: Redis URL '{}' does not start with 'redis://'. Attempting to use it anyway.", redis_url_str);
eprintln!(
"Warning: Redis URL '{}' does not start with 'redis://'. Attempting to use it anyway.",
redis_url_str
);
}
if let Err(e) = run_repl(redis_url_str, circle_name_str).await {

View File

@ -1,9 +1,9 @@
use yew::prelude::*;
use gloo_net::http::Request;
use gloo_timers::callback::Interval;
use serde::{Deserialize, Serialize};
use wasm_bindgen_futures::spawn_local;
use web_sys::HtmlInputElement;
use yew::prelude::*;
use yew::{html, Component, Context, Html, TargetCast};
// --- Data Structures (placeholders, to be refined based on backend API) ---
@ -21,8 +21,6 @@ pub struct TaskSummary {
pub status: String,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct TaskDetails {
pub hash: String,
@ -33,8 +31,6 @@ pub struct TaskDetails {
pub error: Option<String>,
}
// Combined structure for initial fetch
#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
pub struct WorkerDataResponse {
@ -112,19 +108,30 @@ impl Component for App {
if response.ok() {
match response.json::<WorkerDataResponse>().await {
Ok(data) => link.send_message(Msg::SetWorkerData(Ok(data))),
Err(e) => link.send_message(Msg::SetWorkerData(Err(format!("Failed to parse worker data: {}", e)))),
Err(e) => link.send_message(Msg::SetWorkerData(Err(format!(
"Failed to parse worker data: {}",
e
)))),
}
} else {
link.send_message(Msg::SetWorkerData(Err(format!("API error: {} {}", response.status(), response.status_text()))));
link.send_message(Msg::SetWorkerData(Err(format!(
"API error: {} {}",
response.status(),
response.status_text()
))));
}
}
Err(e) => link.send_message(Msg::SetWorkerData(Err(format!("Network error fetching worker data: {}", e)))),
Err(e) => link.send_message(Msg::SetWorkerData(Err(format!(
"Network error fetching worker data: {}",
e
)))),
}
});
// Set up polling for queue stats
let link_for_timer = ctx.link().clone();
let timer = Interval::new(5000, move || { // Poll every 5 seconds
let timer = Interval::new(5000, move || {
// Poll every 5 seconds
link_for_timer.send_message(Msg::IntervalTick);
});
if let Some(old_timer) = self.queue_poll_timer.take() {
@ -142,14 +149,25 @@ impl Component for App {
Ok(response) => {
if response.ok() {
match response.json::<QueueStats>().await {
Ok(stats) => link.send_message(Msg::SetQueueStats(Ok(stats))),
Err(e) => link.send_message(Msg::SetQueueStats(Err(format!("Failed to parse queue stats: {}", e)))),
Ok(stats) => {
link.send_message(Msg::SetQueueStats(Ok(stats)))
}
Err(e) => link.send_message(Msg::SetQueueStats(Err(
format!("Failed to parse queue stats: {}", e),
))),
}
} else {
link.send_message(Msg::SetQueueStats(Err(format!("API error (queue_stats): {} {}", response.status(), response.status_text()))));
link.send_message(Msg::SetQueueStats(Err(format!(
"API error (queue_stats): {} {}",
response.status(),
response.status_text()
))));
}
}
Err(e) => link.send_message(Msg::SetQueueStats(Err(format!("Network error fetching queue stats: {}", e)))),
Err(e) => link.send_message(Msg::SetQueueStats(Err(format!(
"Network error fetching queue stats: {}",
e
)))),
}
});
}
@ -191,14 +209,26 @@ impl Component for App {
Ok(response) => {
if response.ok() {
match response.json::<TaskDetails>().await {
Ok(details) => link.send_message(Msg::SetTaskDetails(Ok(details))),
Err(e) => link.send_message(Msg::SetTaskDetails(Err(format!("Failed to parse task details: {}", e)))),
Ok(details) => {
link.send_message(Msg::SetTaskDetails(Ok(details)))
}
Err(e) => link.send_message(Msg::SetTaskDetails(Err(format!(
"Failed to parse task details: {}",
e
)))),
}
} else {
link.send_message(Msg::SetTaskDetails(Err(format!("API error (task_details): {} {}", response.status(), response.status_text()))));
link.send_message(Msg::SetTaskDetails(Err(format!(
"API error (task_details): {} {}",
response.status(),
response.status_text()
))));
}
}
Err(e) => link.send_message(Msg::SetTaskDetails(Err(format!("Network error fetching task details: {}", e)))),
Err(e) => link.send_message(Msg::SetTaskDetails(Err(format!(
"Network error fetching task details: {}",
e
)))),
}
});
true
@ -281,7 +311,10 @@ impl Component for App {
impl App {
fn view_tasks_table(&self, ctx: &Context<Self>) -> Html {
if self.tasks_list.is_empty() && self.worker_name_to_monitor.is_some() && !self.is_loading_initial_data {
if self.tasks_list.is_empty()
&& self.worker_name_to_monitor.is_some()
&& !self.is_loading_initial_data
{
return html! { <p>{ "No tasks found for this worker, or worker not found." }</p> };
}
if !self.tasks_list.is_empty() {
@ -306,8 +339,10 @@ impl App {
fn view_task_row(&self, ctx: &Context<Self>, task: &TaskSummary) -> Html {
let task_hash_clone = task.hash.clone();
let created_at_str = chrono::DateTime::from_timestamp(task.created_at, 0)
.map_or_else(|| "Invalid date".to_string(), |dt| dt.format("%Y-%m-%d %H:%M:%S").to_string());
let created_at_str = chrono::DateTime::from_timestamp(task.created_at, 0).map_or_else(
|| "Invalid date".to_string(),
|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string(),
);
html! {
<tr onclick={ctx.link().callback(move |_| Msg::ViewTaskDetails(task_hash_clone.clone()))}
style="cursor: pointer;">
@ -324,7 +359,10 @@ impl App {
}
if let Some(details) = &self.selected_task_details {
let created_at_str = chrono::DateTime::from_timestamp(details.created_at, 0)
.map_or_else(|| "Invalid date".to_string(), |dt| dt.format("%Y-%m-%d %H:%M:%S UTC").to_string());
.map_or_else(
|| "Invalid date".to_string(),
|dt| dt.format("%Y-%m-%d %H:%M:%S UTC").to_string(),
);
html! {
<div class="task-details-modal">
<h4>{ format!("Task Details: {}", details.hash) }</h4>

View File

@ -9,8 +9,7 @@ mod server {
extract::{Path, State},
http::{Method, StatusCode},
routing::get,
Json,
Router,
Json, Router,
};
use deadpool_redis::{Config, Pool, Runtime};
use redis::{from_redis_value, AsyncCommands, FromRedisValue, Value};
@ -30,12 +29,19 @@ mod server {
pub async fn run() {
let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1/".to_string());
let cfg = Config::from_url(redis_url);
let pool = cfg.create_pool(Some(Runtime::Tokio1)).expect("Failed to create Redis pool");
let pool = cfg
.create_pool(Some(Runtime::Tokio1))
.expect("Failed to create Redis pool");
let cors = CorsLayer::new().allow_methods([Method::GET]).allow_origin(Any);
let cors = CorsLayer::new()
.allow_methods([Method::GET])
.allow_origin(Any);
let app = Router::new()
.route("/api/worker/:worker_name/tasks_and_stats", get(get_worker_data))
.route(
"/api/worker/:worker_name/tasks_and_stats",
get(get_worker_data),
)
.route("/api/worker/:worker_name/queue_stats", get(get_queue_stats))
.route("/api/task/:hash", get(get_task_details))
.nest_service("/", ServeDir::new("dist"))
@ -59,12 +65,16 @@ mod server {
let mut conn = pool.get().await.map_err(internal_error)?;
let queue_key = format!("{}{}", REDIS_QUEUE_PREFIX, worker_name);
let task_ids: Vec<String> = conn.lrange(&queue_key, 0, -1).await.map_err(internal_error)?;
let task_ids: Vec<String> = conn
.lrange(&queue_key, 0, -1)
.await
.map_err(internal_error)?;
let mut tasks = Vec::new();
for task_id in task_ids {
let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id);
let task_details: redis::Value = conn.hgetall(&task_key).await.map_err(internal_error)?;
let task_details: redis::Value =
conn.hgetall(&task_key).await.map_err(internal_error)?;
if let Ok(summary) = task_summary_from_redis_value(&task_details) {
tasks.push(summary);
}
@ -72,7 +82,10 @@ mod server {
let queue_stats = get_queue_stats_internal(&mut conn, &worker_name).await?;
Ok(Json(WorkerDataResponse { tasks, queue_stats: Some(queue_stats) }))
Ok(Json(WorkerDataResponse {
tasks,
queue_stats: Some(queue_stats),
}))
}
async fn get_queue_stats(
@ -107,8 +120,12 @@ mod server {
0..=10 => "green",
11..=50 => "yellow",
_ => "red",
}.to_string();
Ok(QueueStats { current_size: size, color_code })
}
.to_string();
Ok(QueueStats {
current_size: size,
color_code,
})
}
fn internal_error<E: std::error::Error>(err: E) -> (StatusCode, String) {
@ -119,8 +136,14 @@ mod server {
let map: HashMap<String, String> = from_redis_value(v)?;
Ok(TaskSummary {
hash: map.get("hash").cloned().unwrap_or_default(),
created_at: map.get("createdAt").and_then(|s| s.parse().ok()).unwrap_or_default(),
status: map.get("status").cloned().unwrap_or_else(|| "Unknown".to_string()),
created_at: map
.get("createdAt")
.and_then(|s| s.parse().ok())
.unwrap_or_default(),
status: map
.get("status")
.cloned()
.unwrap_or_else(|| "Unknown".to_string()),
})
}
@ -128,8 +151,14 @@ mod server {
let map: HashMap<String, String> = from_redis_value(v)?;
Ok(TaskDetails {
hash: map.get("hash").cloned().unwrap_or_default(),
created_at: map.get("createdAt").and_then(|s| s.parse().ok()).unwrap_or_default(),
status: map.get("status").cloned().unwrap_or_else(|| "Unknown".to_string()),
created_at: map
.get("createdAt")
.and_then(|s| s.parse().ok())
.unwrap_or_default(),
status: map
.get("status")
.cloned()
.unwrap_or_else(|| "Unknown".to_string()),
script_content: map.get("script").cloned().unwrap_or_default(),
result: map.get("output").cloned(),
error: map.get("error").cloned(),

View File

@ -1,8 +1,8 @@
use worker::spawn_rhai_worker;
use clap::Parser;
use engine::create_heromodels_engine;
use heromodels::db::hero::OurDB;
use rhailib_worker::spawn_rhai_worker;
use std::sync::Arc;
use clap::Parser;
use tokio::sync::mpsc;
#[derive(Parser, Debug)]
@ -32,11 +32,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let args = Args::parse();
log::info!("Rhai Worker (binary) starting with performance-optimized engine.");
log::info!("Worker ID: {}, Circle Public Key: {}, Redis: {}", args.worker_id, args.circle_public_key, args.redis_url);
log::info!(
"Worker ID: {}, Circle Public Key: {}, Redis: {}",
args.worker_id,
args.circle_public_key,
args.redis_url
);
// Initialize database with OurDB for the Rhai engine
// Using a temporary/in-memory like database for the worker
let db = Arc::new(OurDB::new("worker_rhai_temp_db", true).expect("Failed to create temporary DB for Rhai engine"));
let db = Arc::new(
OurDB::new("worker_rhai_temp_db", true)
.expect("Failed to create temporary DB for Rhai engine"),
);
let mut engine = create_heromodels_engine(db);
// Performance optimizations for benchmarking
@ -66,18 +74,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Wait for the worker to complete
match worker_handle.await {
Ok(result) => {
match result {
Ok(_) => {
log::info!("Worker completed successfully");
Ok(())
}
Err(e) => {
log::error!("Worker failed: {}", e);
Err(e)
}
Ok(result) => match result {
Ok(_) => {
log::info!("Worker completed successfully");
Ok(())
}
}
Err(e) => {
log::error!("Worker failed: {}", e);
Err(e)
}
},
Err(e) => {
log::error!("Worker task panicked: {}", e);
Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>)

View File

@ -2,11 +2,11 @@ use chrono::Utc;
use log::{debug, error, info};
use redis::AsyncCommands;
use rhai::{Engine, Scope};
use std::collections::HashMap;
use tokio::task::JoinHandle;
use tokio::sync::mpsc; // For shutdown signal
use rhai_client::RhaiTaskDetails; // Import for constructing the reply message
use serde_json; // For serializing the reply message
use serde_json;
use std::collections::HashMap;
use tokio::sync::mpsc; // For shutdown signal
use tokio::task::JoinHandle; // For serializing the reply message
const REDIS_TASK_DETAILS_PREFIX: &str = "rhai_task_details:";
const REDIS_QUEUE_PREFIX: &str = "rhai_tasks:";
@ -32,8 +32,12 @@ async fn update_task_status_in_redis(
if let Some(err) = error_msg {
updates.push(("error", err));
}
debug!("Updating task {} in Redis with status: {}, updates: {:?}", task_id, status, updates);
conn.hset_multiple::<_, _, _, ()>(&task_key, &updates).await?;
debug!(
"Updating task {} in Redis with status: {}, updates: {:?}",
task_id, status, updates
);
conn.hset_multiple::<_, _, _, ()>(&task_key, &updates)
.await?;
Ok(())
}
@ -43,7 +47,7 @@ pub fn spawn_rhai_worker(
engine: Engine,
redis_url: String,
mut shutdown_rx: mpsc::Receiver<()>, // Add shutdown receiver
preserve_tasks: bool, // Flag to control task cleanup
preserve_tasks: bool, // Flag to control task cleanup
) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
tokio::spawn(async move {
let queue_key = format!("{}{}", REDIS_QUEUE_PREFIX, circle_public_key);
@ -55,188 +59,200 @@ pub fn spawn_rhai_worker(
let redis_client = match redis::Client::open(redis_url.as_str()) {
Ok(client) => client,
Err(e) => {
error!("Worker for Circle Public Key '{}': Failed to open Redis client: {}", circle_public_key, e);
error!(
"Worker for Circle Public Key '{}': Failed to open Redis client: {}",
circle_public_key, e
);
return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
}
};
let mut redis_conn = match redis_client.get_multiplexed_async_connection().await {
Ok(conn) => conn,
Err(e) => {
error!("Worker for Circle Public Key '{}': Failed to get Redis connection: {}", circle_public_key, e);
error!(
"Worker for Circle Public Key '{}': Failed to get Redis connection: {}",
circle_public_key, e
);
return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
}
};
info!("Worker for Circle Public Key '{}' successfully connected to Redis.", circle_public_key);
info!(
"Worker for Circle Public Key '{}' successfully connected to Redis.",
circle_public_key
);
loop {
let blpop_keys = vec![queue_key.clone()];
tokio::select! {
// Listen for shutdown signal
_ = shutdown_rx.recv() => {
info!("Worker for Circle Public Key '{}': Shutdown signal received. Terminating loop.", circle_public_key);
break;
}
// Listen for tasks from Redis
blpop_result = redis_conn.blpop(&blpop_keys, BLPOP_TIMEOUT_SECONDS as f64) => {
debug!("Worker for Circle Public Key '{}': Attempting BLPOP on queue: {}", circle_public_key, queue_key);
let response: Option<(String, String)> = match blpop_result {
Ok(resp) => resp,
Err(e) => {
error!("Worker for Circle Public Key '{}': Redis BLPOP error on queue {}: {}. Worker for this circle might stop.", circle_public_key, queue_key, e);
return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
}
};
if let Some((_queue_name_recv, task_id)) = response {
info!("Worker for Circle Public Key '{}' received task_id: {} from queue: {}", circle_public_key, task_id, _queue_name_recv);
debug!("Worker for Circle Public Key '{}', Task {}: Processing started.", circle_public_key, task_id);
let task_details_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id);
debug!("Worker for Circle Public Key '{}', Task {}: Attempting HGETALL from key: {}", circle_public_key, task_id, task_details_key);
let task_details_map_result: Result<HashMap<String, String>, _> =
redis_conn.hgetall(&task_details_key).await;
match task_details_map_result {
Ok(details_map) => {
debug!("Worker for Circle Public Key '{}', Task {}: HGETALL successful. Details: {:?}", circle_public_key, task_id, details_map);
let script_content_opt = details_map.get("script").cloned();
let reply_to_queue_opt = details_map.get("replyToQueue").cloned();
let created_at_str_opt = details_map.get("createdAt").cloned();
let public_key_opt = details_map.get("publicKey").cloned();
if let Some(script_content) = script_content_opt {
info!("Worker for Circle Public Key '{}' processing task_id: {}. Script: {:.50}...", circle_public_key, task_id, script_content);
debug!("Worker for Circle Public Key '{}', Task {}: Attempting to update status to 'processing'.", circle_public_key, task_id);
if let Err(e) = update_task_status_in_redis(&mut redis_conn, &task_id, "processing", None, None).await {
error!("Worker for Circle Public Key '{}', Task {}: Failed to update status to 'processing': {}", circle_public_key, task_id, e);
} else {
debug!("Worker for Circle Public Key '{}', Task {}: Status updated to 'processing'.", circle_public_key, task_id);
// Listen for shutdown signal
_ = shutdown_rx.recv() => {
info!("Worker for Circle Public Key '{}': Shutdown signal received. Terminating loop.", circle_public_key);
break;
}
// Listen for tasks from Redis
blpop_result = redis_conn.blpop(&blpop_keys, BLPOP_TIMEOUT_SECONDS as f64) => {
debug!("Worker for Circle Public Key '{}': Attempting BLPOP on queue: {}", circle_public_key, queue_key);
let response: Option<(String, String)> = match blpop_result {
Ok(resp) => resp,
Err(e) => {
error!("Worker for Circle Public Key '{}': Redis BLPOP error on queue {}: {}. Worker for this circle might stop.", circle_public_key, queue_key, e);
return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
}
};
let mut scope = Scope::new();
scope.push_constant("CIRCLE_PUBLIC_KEY", circle_public_key.clone());
debug!("Worker for Circle Public Key '{}', Task {}: Injected CIRCLE_PUBLIC_KEY into scope.", circle_public_key, task_id);
if let Some((_queue_name_recv, task_id)) = response {
info!("Worker for Circle Public Key '{}' received task_id: {} from queue: {}", circle_public_key, task_id, _queue_name_recv);
debug!("Worker for Circle Public Key '{}', Task {}: Processing started.", circle_public_key, task_id);
if let Some(public_key) = public_key_opt.as_deref() {
if !public_key.is_empty() {
scope.push_constant("CALLER_PUBLIC_KEY", public_key.to_string());
debug!("Worker for Circle Public Key '{}', Task {}: Injected CALLER_PUBLIC_KEY into scope.", circle_public_key, task_id);
let task_details_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id);
debug!("Worker for Circle Public Key '{}', Task {}: Attempting HGETALL from key: {}", circle_public_key, task_id, task_details_key);
let task_details_map_result: Result<HashMap<String, String>, _> =
redis_conn.hgetall(&task_details_key).await;
match task_details_map_result {
Ok(details_map) => {
debug!("Worker for Circle Public Key '{}', Task {}: HGETALL successful. Details: {:?}", circle_public_key, task_id, details_map);
let script_content_opt = details_map.get("script").cloned();
let reply_to_queue_opt = details_map.get("replyToQueue").cloned();
let created_at_str_opt = details_map.get("createdAt").cloned();
let public_key_opt = details_map.get("publicKey").cloned();
if let Some(script_content) = script_content_opt {
info!("Worker for Circle Public Key '{}' processing task_id: {}. Script: {:.50}...", circle_public_key, task_id, script_content);
debug!("Worker for Circle Public Key '{}', Task {}: Attempting to update status to 'processing'.", circle_public_key, task_id);
if let Err(e) = update_task_status_in_redis(&mut redis_conn, &task_id, "processing", None, None).await {
error!("Worker for Circle Public Key '{}', Task {}: Failed to update status to 'processing': {}", circle_public_key, task_id, e);
} else {
debug!("Worker for Circle Public Key '{}', Task {}: Status updated to 'processing'.", circle_public_key, task_id);
}
}
debug!("Worker for Circle Public Key '{}', Task {}: Evaluating script with Rhai engine.", circle_public_key, task_id);
let mut final_status = "error".to_string(); // Default to error
let mut final_output: Option<String> = None;
let mut final_error_msg: Option<String> = None;
let mut scope = Scope::new();
scope.push_constant("CIRCLE_PUBLIC_KEY", circle_public_key.clone());
debug!("Worker for Circle Public Key '{}', Task {}: Injected CIRCLE_PUBLIC_KEY into scope.", circle_public_key, task_id);
match engine.eval_with_scope::<rhai::Dynamic>(&mut scope, &script_content) {
Ok(result) => {
let output_str = if result.is::<String>() {
// If the result is a string, we can unwrap it directly.
// This moves `result`, which is fine because it's the last time we use it in this branch.
result.into_string().unwrap()
} else {
result.to_string()
};
info!("Worker for Circle Public Key '{}' task {} completed. Output: {}", circle_public_key, task_id, output_str);
final_status = "completed".to_string();
final_output = Some(output_str);
if let Some(public_key) = public_key_opt.as_deref() {
if !public_key.is_empty() {
scope.push_constant("CALLER_PUBLIC_KEY", public_key.to_string());
debug!("Worker for Circle Public Key '{}', Task {}: Injected CALLER_PUBLIC_KEY into scope.", circle_public_key, task_id);
}
}
Err(e) => {
let error_str = format!("{:?}", *e);
error!("Worker for Circle Public Key '{}' task {} script evaluation failed. Error: {}", circle_public_key, task_id, error_str);
final_error_msg = Some(error_str);
// final_status remains "error"
debug!("Worker for Circle Public Key '{}', Task {}: Evaluating script with Rhai engine.", circle_public_key, task_id);
let mut final_status = "error".to_string(); // Default to error
let mut final_output: Option<String> = None;
let mut final_error_msg: Option<String> = None;
match engine.eval_with_scope::<rhai::Dynamic>(&mut scope, &script_content) {
Ok(result) => {
let output_str = if result.is::<String>() {
// If the result is a string, we can unwrap it directly.
// This moves `result`, which is fine because it's the last time we use it in this branch.
result.into_string().unwrap()
} else {
result.to_string()
};
info!("Worker for Circle Public Key '{}' task {} completed. Output: {}", circle_public_key, task_id, output_str);
final_status = "completed".to_string();
final_output = Some(output_str);
}
Err(e) => {
let error_str = format!("{:?}", *e);
error!("Worker for Circle Public Key '{}' task {} script evaluation failed. Error: {}", circle_public_key, task_id, error_str);
final_error_msg = Some(error_str);
// final_status remains "error"
}
}
}
debug!("Worker for Circle Public Key '{}', Task {}: Attempting to update status to '{}'.", circle_public_key, task_id, final_status);
if let Err(e) = update_task_status_in_redis(
&mut redis_conn,
&task_id,
&final_status,
final_output.clone(), // Clone for task hash update
final_error_msg.clone(), // Clone for task hash update
).await {
error!("Worker for Circle Public Key '{}', Task {}: Failed to update final status to '{}': {}", circle_public_key, task_id, final_status, e);
} else {
debug!("Worker for Circle Public Key '{}', Task {}: Final status updated to '{}'.", circle_public_key, task_id, final_status);
}
debug!("Worker for Circle Public Key '{}', Task {}: Attempting to update status to '{}'.", circle_public_key, task_id, final_status);
if let Err(e) = update_task_status_in_redis(
&mut redis_conn,
&task_id,
&final_status,
final_output.clone(), // Clone for task hash update
final_error_msg.clone(), // Clone for task hash update
).await {
error!("Worker for Circle Public Key '{}', Task {}: Failed to update final status to '{}': {}", circle_public_key, task_id, final_status, e);
} else {
debug!("Worker for Circle Public Key '{}', Task {}: Final status updated to '{}'.", circle_public_key, task_id, final_status);
}
// Send to reply queue if specified
if let Some(reply_q) = reply_to_queue_opt {
let created_at = created_at_str_opt
.and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(Utc::now); // Fallback, though createdAt should exist
// Send to reply queue if specified
if let Some(reply_q) = reply_to_queue_opt {
let created_at = created_at_str_opt
.and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(Utc::now); // Fallback, though createdAt should exist
let reply_details = RhaiTaskDetails {
task_id: task_id.to_string(), // Add the task_id
script: script_content.clone(), // Include script for context in reply
status: final_status, // The final status
// client_rpc_id is no longer a field
output: final_output, // The final output
error: final_error_msg, // The final error
created_at, // Original creation time
updated_at: Utc::now(), // Time of this final update/reply
// reply_to_queue is no longer a field
public_key: public_key_opt,
};
match serde_json::to_string(&reply_details) {
Ok(reply_json) => {
let lpush_result: redis::RedisResult<i64> = redis_conn.lpush(&reply_q, &reply_json).await;
match lpush_result {
Ok(_) => debug!("Worker for Circle Public Key '{}', Task {}: Successfully sent result to reply queue {}", circle_public_key, task_id, reply_q),
Err(e_lpush) => error!("Worker for Circle Public Key '{}', Task {}: Failed to LPUSH result to reply queue {}: {}", circle_public_key, task_id, reply_q, e_lpush),
let reply_details = RhaiTaskDetails {
task_id: task_id.to_string(), // Add the task_id
script: script_content.clone(), // Include script for context in reply
status: final_status, // The final status
// client_rpc_id is no longer a field
output: final_output, // The final output
error: final_error_msg, // The final error
created_at, // Original creation time
updated_at: Utc::now(), // Time of this final update/reply
// reply_to_queue is no longer a field
public_key: public_key_opt,
};
match serde_json::to_string(&reply_details) {
Ok(reply_json) => {
let lpush_result: redis::RedisResult<i64> = redis_conn.lpush(&reply_q, &reply_json).await;
match lpush_result {
Ok(_) => debug!("Worker for Circle Public Key '{}', Task {}: Successfully sent result to reply queue {}", circle_public_key, task_id, reply_q),
Err(e_lpush) => error!("Worker for Circle Public Key '{}', Task {}: Failed to LPUSH result to reply queue {}: {}", circle_public_key, task_id, reply_q, e_lpush),
}
}
Err(e_json) => {
error!("Worker for Circle Public Key '{}', Task {}: Failed to serialize reply details for queue {}: {}", circle_public_key, task_id, reply_q, e_json);
}
}
Err(e_json) => {
error!("Worker for Circle Public Key '{}', Task {}: Failed to serialize reply details for queue {}: {}", circle_public_key, task_id, reply_q, e_json);
}
// Clean up task details based on preserve_tasks flag
if !preserve_tasks {
// The worker is responsible for cleaning up the task details hash.
if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await {
error!("Worker for Circle Public Key '{}', Task {}: Failed to delete task details key '{}': {}", circle_public_key, task_id, task_details_key, e);
} else {
debug!("Worker for Circle Public Key '{}', Task {}: Cleaned up task details key '{}'.", circle_public_key, task_id, task_details_key);
}
}
}
// Clean up task details based on preserve_tasks flag
if !preserve_tasks {
// The worker is responsible for cleaning up the task details hash.
if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await {
error!("Worker for Circle Public Key '{}', Task {}: Failed to delete task details key '{}': {}", circle_public_key, task_id, task_details_key, e);
} else {
debug!("Worker for Circle Public Key '{}', Task {}: Cleaned up task details key '{}'.", circle_public_key, task_id, task_details_key);
debug!("Worker for Circle Public Key '{}', Task {}: Preserving task details (preserve_tasks=true)", circle_public_key, task_id);
}
} else {
debug!("Worker for Circle Public Key '{}', Task {}: Preserving task details (preserve_tasks=true)", circle_public_key, task_id);
}
} else { // Script content not found in hash
error!(
"Worker for Circle Public Key '{}', Task {}: Script content not found in Redis hash. Details map: {:?}",
circle_public_key, task_id, details_map
);
// Clean up invalid task details based on preserve_tasks flag
if !preserve_tasks {
// Even if the script is not found, the worker should clean up the invalid task hash.
if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await {
error!("Worker for Circle Public Key '{}', Task {}: Failed to delete invalid task details key '{}': {}", circle_public_key, task_id, task_details_key, e);
} else { // Script content not found in hash
error!(
"Worker for Circle Public Key '{}', Task {}: Script content not found in Redis hash. Details map: {:?}",
circle_public_key, task_id, details_map
);
// Clean up invalid task details based on preserve_tasks flag
if !preserve_tasks {
// Even if the script is not found, the worker should clean up the invalid task hash.
if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await {
error!("Worker for Circle Public Key '{}', Task {}: Failed to delete invalid task details key '{}': {}", circle_public_key, task_id, task_details_key, e);
}
} else {
debug!("Worker for Circle Public Key '{}', Task {}: Preserving invalid task details (preserve_tasks=true)", circle_public_key, task_id);
}
} else {
debug!("Worker for Circle Public Key '{}', Task {}: Preserving invalid task details (preserve_tasks=true)", circle_public_key, task_id);
}
}
Err(e) => {
error!(
"Worker for Circle Public Key '{}', Task {}: Failed to fetch details (HGETALL) from Redis for key {}. Error: {:?}",
circle_public_key, task_id, task_details_key, e
);
}
}
Err(e) => {
error!(
"Worker for Circle Public Key '{}', Task {}: Failed to fetch details (HGETALL) from Redis for key {}. Error: {:?}",
circle_public_key, task_id, task_details_key, e
);
}
}
} else {
debug!("Worker for Circle Public Key '{}': BLPOP timed out on queue {}. No new tasks. Checking for shutdown signal again.", &circle_public_key, &queue_key);
}
} // End of blpop_result match
} // End of tokio::select!
} else {
debug!("Worker for Circle Public Key '{}': BLPOP timed out on queue {}. No new tasks. Checking for shutdown signal again.", &circle_public_key, &queue_key);
}
} // End of blpop_result match
} // End of tokio::select!
} // End of loop
info!("Worker for Circle Public Key '{}' has shut down.", circle_public_key);
info!(
"Worker for Circle Public Key '{}' has shut down.",
circle_public_key
);
Ok(())
})
}