add support for auth and other improvements

This commit is contained in:
timurgordon 2025-06-19 01:42:02 +03:00
parent de1740f0d1
commit 4e717bc054
33 changed files with 433 additions and 864 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

4
.gitignore vendored
View File

@ -1 +1,3 @@
target target
worker_rhai_temp_db
dump.rdb

View File

@ -1,91 +0,0 @@
# Rhailib Benchmarking - SIMPLIFIED ✨
> **Note**: This document describes the old complex benchmarking system.
> **For the new minimal system, see [`bench/README.md`](bench/README.md)**
## 🎯 New Minimal Benchmark System
The benchmarking system has been **drastically simplified**:
- **85% Code Reduction**: From 800+ lines to ~113 lines
- **Single File**: All logic in [`bench/simple_bench.rs`](bench/simple_bench.rs)
- **Direct Timing**: Redis timestamps, no complex stats
- **Minimal Dependencies**: No criterion, no abstractions
### Quick Start
```bash
cd bench
cargo run --bin simple_bench
```
### Expected Output
```
🧹 Cleaning up Redis...
🚀 Starting worker...
📝 Creating single task...
⏱️ Waiting for completion...
✅ Task completed in 23.45ms
🧹 Cleaning up...
```
## 📁 New Structure
```
rhailib/
├── bench/ # NEW: Minimal benchmark system
│ ├── simple_bench.rs # Main benchmark (85 lines)
│ ├── batch_task.lua # Simple task creation (28 lines)
│ ├── Cargo.toml # Dependencies
│ └── README.md # Usage instructions
└── scripts/ # Cleaned up scripts
├── run_rhai_batch.lua # Original batch script (kept)
└── run_rhai.lua # Basic script (kept)
```
## 🗑️ What Was Removed
- `benches/` directory (complex criterion-based benchmarks)
- `src/benchmarks/` module (redis_stats.rs, worker_manager.rs)
- Complex Lua scripts (`run_rhai_with_wait.lua`, `run_rhai_blocking.sh`)
- Framework dependencies (criterion, complex stats)
## 🚀 Benefits of New System
1. **Minimalism**: Single file, linear flow
2. **Direct Timing**: `updated_at - created_at` from Redis
3. **Easy to Understand**: No abstractions or frameworks
4. **Fast to Modify**: 85 lines vs 800+ lines
5. **Reliable**: Simple Redis operations
6. **Extensible**: Easy to add features incrementally
## 📈 Iteration Plan
- **Current**: Single task (n=1) benchmarking
- **Next**: Small batches (n=5, n=10)
- **Future**: Larger batches and script complexity
---
## 📚 Old System Documentation (Archived)
The following describes the previous complex system that has been removed:
### Old Architecture (REMOVED)
- Complex Criterion-based benchmarking
- Multi-module statistics collection
- Abstract worker management
- Complex configuration systems
- Framework dependencies
### Old Files (REMOVED)
- `benches/rhai_performance_bench.rs` (237 lines)
- `src/benchmarks/redis_stats.rs` (285 lines)
- `src/benchmarks/worker_manager.rs` (~200 lines)
- `src/benchmarks/mod.rs` (10 lines)
**Total removed**: ~800+ lines of complex code
---
**For current benchmarking, use the new minimal system in [`bench/`](bench/)**

45
Cargo.lock generated
View File

@ -2342,9 +2342,13 @@ dependencies = [
"env_logger", "env_logger",
"log", "log",
"redis", "redis",
"rhai",
"rhai_client",
"rhailib_worker",
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
"uuid",
] ]
[[package]] [[package]]
@ -2358,7 +2362,25 @@ dependencies = [
"rhai_client", "rhai_client",
"serde_json", "serde_json",
"tokio", "tokio",
"worker", ]
[[package]]
name = "rhailib_worker"
version = "0.1.0"
dependencies = [
"chrono",
"clap",
"engine",
"env_logger",
"heromodels",
"log",
"redis",
"rhai",
"rhai_client",
"serde",
"serde_json",
"tokio",
"uuid",
] ]
[[package]] [[package]]
@ -3018,13 +3040,13 @@ dependencies = [
"log", "log",
"rhai", "rhai",
"rhai_client", "rhai_client",
"rhailib_worker",
"rustyline", "rustyline",
"tempfile", "tempfile",
"tokio", "tokio",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"url", "url",
"worker",
] ]
[[package]] [[package]]
@ -3429,25 +3451,6 @@ dependencies = [
"bitflags 2.9.1", "bitflags 2.9.1",
] ]
[[package]]
name = "worker"
version = "0.1.0"
dependencies = [
"chrono",
"clap",
"engine",
"env_logger",
"heromodels",
"log",
"redis",
"rhai",
"rhai_client",
"serde",
"serde_json",
"tokio",
"uuid",
]
[[package]] [[package]]
name = "writeable" name = "writeable"
version = "0.6.1" version = "0.6.1"

View File

@ -4,8 +4,6 @@ version = "0.1.0"
edition = "2021" # Changed to 2021 for consistency with other crates edition = "2021" # Changed to 2021 for consistency with other crates
[dependencies] [dependencies]
anyhow = "1.0" anyhow = "1.0"
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
env_logger = "0.10" env_logger = "0.10"
@ -13,15 +11,24 @@ log = "0.4"
redis = { version = "0.25.0", features = ["tokio-comp"] } redis = { version = "0.25.0", features = ["tokio-comp"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] } tokio = { version = "1", features = ["macros", "rt-multi-thread", "time", "sync", "signal"] }
rhai = "1.21.0"
rhailib_worker = { path = "src/worker" }
rhai_client = { path = "src/client" }
[dev-dependencies] [dev-dependencies]
criterion = { version = "0.5", features = ["html_reports"] } criterion = { version = "0.5", features = ["html_reports"] }
uuid = { version = "1.6", features = ["v4", "serde"] } # For examples like dedicated_reply_queue_demo
[[bench]] [[bench]]
name = "simple_rhai_bench" name = "simple_rhai_bench"
harness = false harness = false
[[example]]
name = "end_to_end_auth_demo"
path = "examples/end_to_end/main.rs"
[workspace] [workspace]
members = [ members = [

View File

@ -65,7 +65,6 @@ Where:
## Benefits ## Benefits
- **Minimal Code**: 85 lines vs previous 800+ lines
- **Easy to Understand**: Single file, linear flow - **Easy to Understand**: Single file, linear flow
- **Direct Timing**: Redis timestamps, no complex stats - **Direct Timing**: Redis timestamps, no complex stats
- **Fast to Modify**: No abstractions or frameworks - **Fast to Modify**: No abstractions or frameworks

View File

@ -21,6 +21,7 @@ The `rhailib` system is composed of the following main components, leveraging Re
* Listens to Redis task queues ("circles") for incoming task IDs. * Listens to Redis task queues ("circles") for incoming task IDs.
* Fetches task details, executes the script using the `rhai_engine`. * Fetches task details, executes the script using the `rhai_engine`.
* Updates task status and results in Redis. * Updates task status and results in Redis.
* Injects the caller's public key into the script's scope as `CALLER_PUBLIC_KEY` if available.
* Sends a notification/result to the client's dedicated reply queue. * Sends a notification/result to the client's dedicated reply queue.
4. **Redis:** 4. **Redis:**
@ -112,6 +113,7 @@ This architecture allows for:
* `created_at`: Timestamp of task creation. * `created_at`: Timestamp of task creation.
* `updated_at`: Timestamp of the last update to the task details. * `updated_at`: Timestamp of the last update to the task details.
* `reply_to_queue`: (New) The name of the dedicated Redis List the client is listening on for the result. * `reply_to_queue`: (New) The name of the dedicated Redis List the client is listening on for the result.
* `publicKey`: (Optional) The public key of the user who submitted the task.
* **Reply Queues:** * **Reply Queues:**
* Key Pattern: `rhai_reply:<unique_identifier>` (e.g., `rhai_reply:<uuid_generated_by_client>`) * Key Pattern: `rhai_reply:<unique_identifier>` (e.g., `rhai_reply:<uuid_generated_by_client>`)
* Type: List * Type: List

View File

@ -7,7 +7,6 @@ publish = false # This is a package of examples, not meant to be published
[dependencies] [dependencies]
# Local Rhailib crates # Local Rhailib crates
rhai_client = { path = "../src/client" } rhai_client = { path = "../src/client" }
worker = { path = "../src/worker" }
# External dependencies # External dependencies
rhai = "1.18.0" rhai = "1.18.0"

View File

@ -1,10 +1,10 @@
use log::{info, error, debug}; use log::{info, error, debug};
use rhai::Engine; use rhai::Engine;
use rhai_client::{RhaiClient, RhaiClientError}; // RhaiTaskDetails is not directly used use rhai_client::{RhaiClient, RhaiClientError}; // RhaiTaskDetails is now used for its fields
use worker_lib::spawn_rhai_worker; use rhailib_worker::spawn_rhai_worker;
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use serde_json::Value; use uuid::Uuid; // Added for generating task_id
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
@ -53,28 +53,26 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 5. Submit script and await result using the new mechanism // 5. Submit script and await result using the new mechanism
let task_timeout = Duration::from_secs(10); let task_timeout = Duration::from_secs(10);
let client_rpc_id: Option<Value> = Some(serde_json::json!({ "demo_request_id": "reply_queue_test_001" })); let task_id = Uuid::new_v4().to_string(); // Generate a unique task_id
info!("Submitting script to circle '{}' and awaiting result...", circle_name); info!("Submitting script to circle '{}' with task_id '{}' and awaiting result...", circle_name, task_id);
info!("Script: {}", script_to_run); info!("Script: {}", script_to_run);
match client match client
.submit_script_and_await_result( .submit_script_and_await_result(
circle_name, circle_name,
task_id.clone(), // Pass the generated task_id
script_to_run.to_string(), script_to_run.to_string(),
client_rpc_id,
task_timeout, task_timeout,
// poll_interval is no longer needed None // public_key
) )
.await .await
{ {
Ok(details) => { Ok(details) => {
info!("Task completed successfully!"); info!("Task {} completed successfully!", details.task_id);
debug!("Full Task Details: {:#?}", details); debug!("Full Task Details: {:#?}", details);
// The task_id is not part of the returned RhaiTaskDetails struct. // The task_id is now part of the returned RhaiTaskDetails struct.
// We could modify the client to return (task_id, details) if needed, info!("Received details for task_id: {}, script: {}", details.task_id, details.script);
// but for this demo, we'll just log the content of the returned details.
info!("Received details for script: {}", details.script);
info!("Status: {}", details.status); info!("Status: {}", details.status);
if let Some(output) = details.output { if let Some(output) = details.output {
info!("Output: {}", output); // Expected: 42 info!("Output: {}", output); // Expected: 42
@ -89,7 +87,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Err(e) => { Err(e) => {
error!("An error occurred while awaiting task result: {}", e); error!("An error occurred while awaiting task result: {}", e);
// The specific error can be inspected if needed, e.g., for timeout // The specific error can be inspected if needed, e.g., for timeout
if let RhaiClientError::Timeout(task_id) = e { if let RhaiClientError::Timeout(returned_task_id) = e {
// Note: 'task_id' here is the one from the error, which should match the one we sent.
info!("Task {} timed out.", returned_task_id);
info!("Task {} timed out.", task_id); info!("Task {} timed out.", task_id);
} }
} }

View File

@ -0,0 +1,24 @@
# End-to-End Authorization Demo
This example demonstrates an end-to-end scenario involving a custom Rhai engine, `rhailib_worker`, and `rhai_client` to showcase how authorization based on `CALLER_PUBLIC_KEY` can be implemented.
## Overview
1. **Custom Rhai Engine**: A Rhai engine is created, and a custom function `check_permission(caller_pk: String)` is registered. This function returns different messages based on the `caller_pk` provided.
2. **Rhai Worker (`rhailib_worker`)**: A worker is spawned with this custom engine. The worker is configured with its own `CIRCLE_PUBLIC_KEY` (e.g., "auth_worker_circle").
3. **Rhai Client (`rhai_client`)**: The client is used to submit a script (`auth_script.rhai`) to the worker.
4. **Authorization Script (`auth_script.rhai`)**: This script calls the `check_permission` function, passing the `CALLER_PUBLIC_KEY` (which is automatically injected into the script's scope by the worker based on the client's submission).
5. **Demonstration**: The `main.rs` program submits the script twice, using two different `CALLER_PUBLIC_KEY`s ("admin_pk" and "user_pk"), and shows that the script produces different results based on the authorization logic in `check_permission`.
This example illustrates how the `rhailib` components can work together to build systems where script execution is controlled and authorized based on the identity of the calling client.
## Running the Example
Assuming you have Redis running and accessible at `redis://127.0.0.1/`:
Run the example from the `rhailib` root directory:
```bash
cargo run --example end_to_end_auth_demo
```
You should see output indicating the results of the script execution for both the "admin_pk" and "user_pk" callers.

View File

@ -0,0 +1,6 @@
// auth_script.rhai
// This script calls a custom registered function 'check_permission'
// and passes the CALLER_PUBLIC_KEY to it.
// CALLER_PUBLIC_KEY is injected into the script's scope by the rhailib_worker.
check_permission(CALLER_PUBLIC_KEY)

136
examples/end_to_end/main.rs Normal file
View File

@ -0,0 +1,136 @@
use rhai::{Engine, EvalAltResult};
use rhai_client::RhaiClient;
use rhailib_worker::spawn_rhai_worker;
use std::{fs, path::Path, time::Duration};
use tokio::sync::mpsc;
use uuid::Uuid;
// Custom Rhai function for authorization
// It takes the caller's public key as an argument.
fn check_permission(caller_pk: String) -> Result<String, Box<EvalAltResult>> {
log::info!("check_permission called with PK: {}", caller_pk);
if caller_pk == "admin_pk" {
Ok("Access Granted: Welcome Admin!".to_string())
} else if caller_pk == "user_pk" {
Ok("Limited Access: Welcome User!".to_string())
} else {
Ok(format!("Access Denied: Unknown public key '{}'", caller_pk))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let redis_url = "redis://127.0.0.1/";
let worker_circle_pk = "auth_worker_circle".to_string();
// 1. Create a Rhai engine and register custom functionality
let mut engine = Engine::new();
engine.register_fn("check_permission", check_permission);
log::info!("Custom 'check_permission' function registered with Rhai engine.");
// 2. Spawn the Rhai worker
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
let worker_handle = tokio::spawn(spawn_rhai_worker(
0, // worker_id
worker_circle_pk.clone(),
engine,
redis_url.to_string(),
shutdown_rx,
false, // use_sentinel
));
log::info!("Rhai worker spawned for circle: {}", worker_circle_pk);
// Give the worker a moment to start up
tokio::time::sleep(Duration::from_secs(1)).await;
// 3. Create a Rhai client
let client = RhaiClient::new(redis_url)?;
log::info!("Rhai client created.");
// 4. Load the Rhai script content
let script_path_str = "examples/end_to_end/auth_script.rhai"; // Relative to Cargo.toml / rhailib root
let script_content = match fs::read_to_string(script_path_str) {
Ok(content) => content,
Err(e) => {
log::error!("Failed to read script file '{}': {}", script_path_str, e);
// Attempt to read from an alternative path if run via `cargo run --example`
// where current dir might be the crate root.
let alt_script_path = Path::new(file!()).parent().unwrap().join("auth_script.rhai");
log::info!("Attempting alternative script path: {:?}", alt_script_path);
fs::read_to_string(&alt_script_path)?
}
};
log::info!("Loaded script content from '{}'", script_path_str);
// Define different caller public keys
let admin_caller_pk = "admin_pk".to_string();
let user_caller_pk = "user_pk".to_string();
let unknown_caller_pk = "unknown_pk".to_string();
let callers = vec![
("Admin", admin_caller_pk),
("User", user_caller_pk),
("Unknown", unknown_caller_pk),
];
for (caller_name, caller_pk) in callers {
let task_id = Uuid::new_v4().to_string();
log::info!(
"Submitting script for caller '{}' (PK: {}) with task_id: {}",
caller_name,
caller_pk,
task_id
);
match client
.submit_script_and_await_result(
&worker_circle_pk,
task_id.clone(), // task_id (UUID) first
script_content.clone(), // script_content second
Duration::from_secs(10),
Some(caller_pk.clone()), // This is the CALLER_PUBLIC_KEY
)
.await
{
Ok(details) => {
log::info!(
"Task {} for caller '{}' (PK: {}) completed. Status: {}, Output: {:?}, Error: {:?}",
task_id,
caller_name,
caller_pk,
details.status,
details.output,
details.error
);
// Basic assertion for expected output
if caller_pk == "admin_pk" {
assert_eq!(details.output, Some("Access Granted: Welcome Admin!".to_string()));
} else if caller_pk == "user_pk" {
assert_eq!(details.output, Some("Limited Access: Welcome User!".to_string()));
}
}
Err(e) => {
log::error!(
"Task {} for caller '{}' (PK: {}) failed: {}",
task_id,
caller_name,
caller_pk,
e
);
}
}
tokio::time::sleep(Duration::from_millis(100)).await; // Small delay between submissions
}
// 5. Shutdown the worker (optional, could also let it run until program exits)
log::info!("Signaling worker to shutdown...");
let _ = shutdown_tx.send(()).await;
if let Err(e) = worker_handle.await {
log::error!("Worker task panicked or encountered an error: {:?}", e);
}
log::info!("Worker shutdown complete.");
Ok(())
}

View File

@ -1,9 +1,10 @@
use rhai::Engine; use rhai::Engine;
use rhai_client::RhaiClient; // To submit tasks use rhai_client::RhaiClient; // To submit tasks
use uuid::Uuid; // For generating task_id
use std::time::Duration; use std::time::Duration;
use tokio::time::sleep; use tokio::time::sleep;
use worker_lib::spawn_rhai_worker; use rhailib_worker::spawn_rhai_worker;
// Custom function for Rhai // Custom function for Rhai
fn add(a: i64, b: i64) -> i64 { fn add(a: i64, b: i64) -> i64 {
@ -48,13 +49,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
log::info!("Submitting math script to 'math_circle' and awaiting result..."); log::info!("Submitting math script to 'math_circle' and awaiting result...");
let timeout_duration = Duration::from_secs(10); let timeout_duration = Duration::from_secs(10);
let task_id = Uuid::new_v4().to_string();
match client.submit_script_and_await_result( match client.submit_script_and_await_result(
"math_circle", "math_circle",
script_content.to_string(), script_content.to_string(),
None, task_id, // Pass the generated task_id
timeout_duration timeout_duration,
None
).await { ).await {
Ok(details) => { Ok(details) => {
log::info!("Math Worker Example: Task finished. Status: {}, Output: {:?}, Error: {:?}", log::info!("Math Worker Example: Task finished. Status: {}, Output: {:?}, Error: {:?}",

View File

@ -1,9 +1,10 @@
use rhai::Engine; use rhai::Engine;
use rhai_client::RhaiClient; // To submit tasks use rhai_client::RhaiClient; // To submit tasks
use uuid::Uuid; // For generating task_id
use std::time::Duration; use std::time::Duration;
use tokio::time::sleep; use tokio::time::sleep;
use worker_lib::spawn_rhai_worker; use rhailib_worker::spawn_rhai_worker;
// Custom function for Rhai // Custom function for Rhai
fn reverse_string(s: String) -> String { fn reverse_string(s: String) -> String {
@ -48,13 +49,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
log::info!("Submitting string script to 'string_circle' and awaiting result..."); log::info!("Submitting string script to 'string_circle' and awaiting result...");
let timeout_duration = Duration::from_secs(10); let timeout_duration = Duration::from_secs(10);
let task_id = Uuid::new_v4().to_string();
match client.submit_script_and_await_result( match client.submit_script_and_await_result(
"string_circle", "string_circle",
script_content.to_string(), script_content.to_string(),
None, task_id, // Pass the generated task_id
timeout_duration timeout_duration,
None
).await { ).await {
Ok(details) => { Ok(details) => {
log::info!("String Worker Example: Task finished. Status: {}, Output: {:?}, Error: {:?}", log::info!("String Worker Example: Task finished. Status: {}, Output: {:?}, Error: {:?}",

View File

@ -1,52 +0,0 @@
use worker_lib::spawn_rhai_worker;
use rhai::Engine;
use tokio::sync::mpsc;
use tokio::signal;
use log::info;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize the logger
env_logger::init();
let redis_url = "redis://127.0.0.1/";
let circle_name = "default".to_string();
let mut engine = Engine::new(); // Create a new, simple Rhai engine
// Register a simple 'ping' function for the readiness check.
engine.register_fn("ping", || -> String {
"pong".to_string()
});
// Create a channel for the shutdown signal
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
info!("Spawning Rhai worker for circle: {}", circle_name);
// Spawn the worker
let worker_handle = spawn_rhai_worker(
1, // circle_id
circle_name.clone(),
engine,
redis_url.to_string(),
shutdown_rx,
false, // preserve_tasks
);
info!("Worker spawned. Press Ctrl+C to shut down.");
// Wait for Ctrl+C
signal::ctrl_c().await?;
info!("Ctrl+C received. Sending shutdown signal to worker.");
let _ = shutdown_tx.send(()).await;
// Wait for the worker to finish
if let Err(e) = worker_handle.await? {
eprintln!("Worker process finished with an error: {:?}", e);
}
info!("Worker has shut down gracefully.");
Ok(())
}

View File

@ -1,133 +0,0 @@
//! Demo script showing how to run the hybrid performance benchmark
//!
//! This example demonstrates:
//! 1. Starting workers programmatically
//! 2. Running the Lua batch script
//! 3. Collecting and displaying statistics
use rhailib::{RedisStatsCollector, WorkerManager, clear_redis_test_data, check_redis_connection};
use redis::{Client, Commands};
use std::fs;
use std::time::Duration;
const REDIS_URL: &str = "redis://localhost:6379";
const CIRCLE_NAME: &str = "demo_circle";
fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
println!("🚀 Rhailib Hybrid Performance Benchmark Demo");
println!("============================================");
// Check Redis connection
println!("📡 Checking Redis connection...");
check_redis_connection(REDIS_URL)?;
println!("✅ Redis connection successful");
// Clear any existing test data
println!("🧹 Clearing existing test data...");
clear_redis_test_data(REDIS_URL)?;
println!("✅ Test data cleared");
// Load Lua script
println!("📜 Loading Lua batch script...");
let lua_script = fs::read_to_string("scripts/run_rhai_batch.lua")?;
println!("✅ Lua script loaded ({} bytes)", lua_script.len());
// Start workers
println!("👷 Starting 2 worker processes...");
let mut worker_manager = WorkerManager::new();
worker_manager.start_workers(2, CIRCLE_NAME, REDIS_URL)?;
worker_manager.wait_for_workers_ready(Duration::from_secs(3))?;
println!("✅ Workers started and ready");
// Connect to Redis
let redis_client = Client::open(REDIS_URL)?;
let mut conn = redis_client.get_connection()?;
// Execute batch workload
println!("🎯 Submitting batch of 100 tasks...");
let batch_id = format!("demo_batch_{}", chrono::Utc::now().timestamp_millis());
let simple_script = "let x = 42; x * 2";
let start_time = std::time::Instant::now();
let result: redis::Value = redis::cmd("EVAL")
.arg(&lua_script)
.arg(0) // No keys
.arg(CIRCLE_NAME)
.arg(100) // task count
.arg(simple_script)
.arg(&batch_id)
.query(&mut conn)?;
let submission_time = start_time.elapsed();
println!("✅ Batch submitted in {:?}", submission_time);
// Parse result
if let redis::Value::Data(data) = result {
let response: serde_json::Value = serde_json::from_slice(&data)?;
println!("📊 Batch info: {}", serde_json::to_string_pretty(&response)?);
}
// Wait for completion and collect statistics
println!("⏳ Waiting for batch completion...");
let stats_collector = RedisStatsCollector::new(REDIS_URL)?;
let completed = stats_collector.wait_for_batch_completion(
&batch_id,
100,
Duration::from_secs(30),
)?;
if !completed {
println!("⚠️ Batch did not complete within timeout");
return Ok(());
}
println!("✅ Batch completed!");
// Collect and display statistics
println!("📈 Collecting performance statistics...");
let timings = stats_collector.collect_batch_timings(&batch_id)?;
let stats = stats_collector.calculate_stats(&timings);
println!("\n📊 PERFORMANCE RESULTS");
println!("======================");
println!("Total tasks: {}", stats.total_tasks);
println!("Completed tasks: {}", stats.completed_tasks);
println!("Failed tasks: {}", stats.failed_tasks);
println!("Error rate: {:.2}%", stats.error_rate);
println!("Throughput: {:.2} tasks/second", stats.throughput_tps);
println!("Batch duration: {:.2} ms", stats.batch_duration_ms);
println!("\nLatency Statistics:");
println!(" Min: {:.2} ms", stats.latency_stats.min_ms);
println!(" Max: {:.2} ms", stats.latency_stats.max_ms);
println!(" Mean: {:.2} ms", stats.latency_stats.mean_ms);
println!(" Median: {:.2} ms", stats.latency_stats.median_ms);
println!(" P95: {:.2} ms", stats.latency_stats.p95_ms);
println!(" P99: {:.2} ms", stats.latency_stats.p99_ms);
println!(" Std Dev: {:.2} ms", stats.latency_stats.std_dev_ms);
// Show some individual task timings
println!("\n🔍 Sample Task Timings (first 10):");
for (i, timing) in timings.iter().take(10).enumerate() {
println!(" Task {}: {} -> {} ({:.2}ms, status: {})",
i + 1,
timing.task_id,
timing.status,
timing.latency_ms,
timing.status
);
}
// Cleanup
println!("\n🧹 Cleaning up...");
stats_collector.cleanup_batch_data(&batch_id)?;
worker_manager.shutdown()?;
println!("✅ Cleanup complete");
println!("\n🎉 Demo completed successfully!");
Ok(())
}

View File

@ -1,117 +0,0 @@
#!/bin/bash
# Hybrid Performance Benchmark Runner for Rhailib
# This script sets up the environment and runs the benchmarks
set -e
echo "🚀 Rhailib Hybrid Performance Benchmark Runner"
echo "=============================================="
# Check if Redis is running
echo "📡 Checking Redis connection..."
if ! redis-cli ping > /dev/null 2>&1; then
echo "❌ Redis is not running. Please start Redis server:"
echo " redis-server"
exit 1
fi
echo "✅ Redis is running"
# Check if we're in the right directory
if [ ! -f "Cargo.toml" ] || [ ! -d "scripts" ]; then
echo "❌ Please run this script from the rhailib root directory"
exit 1
fi
# Build the worker binary in release mode for performance
echo "🔨 Building worker binary in release mode..."
cd src/worker
cargo build --release --bin worker
cd ../..
echo "✅ Worker binary built (release mode)"
# Clear any existing Redis data
echo "🧹 Clearing Redis test data..."
redis-cli FLUSHDB > /dev/null
echo "✅ Redis data cleared"
# Parse command line arguments
BENCHMARK_TYPE="full"
TASK_COUNT=""
WORKER_COUNT=""
while [[ $# -gt 0 ]]; do
case $1 in
--demo)
BENCHMARK_TYPE="demo"
shift
;;
--quick)
BENCHMARK_TYPE="quick"
shift
;;
--tasks)
TASK_COUNT="$2"
shift 2
;;
--workers)
WORKER_COUNT="$2"
shift 2
;;
--help|-h)
echo "Usage: $0 [OPTIONS]"
echo ""
echo "Options:"
echo " --demo Run demo script instead of full benchmarks"
echo " --quick Run quick benchmarks (fewer configurations)"
echo " --tasks N Override task count for demo"
echo " --workers N Override worker count for demo"
echo " --help, -h Show this help message"
echo ""
echo "Examples:"
echo " $0 # Run full benchmarks"
echo " $0 --demo # Run demo script"
echo " $0 --quick # Run quick benchmarks"
echo " $0 --demo --tasks 50 --workers 4 # Custom demo"
exit 0
;;
*)
echo "❌ Unknown option: $1"
echo "Use --help for usage information"
exit 1
;;
esac
done
case $BENCHMARK_TYPE in
"demo")
echo "🎯 Running benchmark demo..."
if [ -n "$TASK_COUNT" ] || [ -n "$WORKER_COUNT" ]; then
echo "⚠️ Custom task/worker counts not yet supported in demo"
echo " Using default values (100 tasks, 2 workers)"
fi
cargo run --example run_benchmark_demo
;;
"quick")
echo "⚡ Running quick benchmarks..."
echo " This will test basic configurations only"
cargo bench --bench rhai_performance_bench -- --quick
;;
"full")
echo "🏁 Running full benchmark suite..."
echo " This may take several minutes..."
cargo bench --bench rhai_performance_bench
echo ""
echo "📊 Benchmark results saved to: target/criterion/"
echo " Open target/criterion/report/index.html to view detailed results"
;;
esac
echo ""
echo "🎉 Benchmark run completed!"
echo ""
echo "📈 Next steps:"
echo " • View HTML reports in target/criterion/report/"
echo " • Run 'cargo bench' for full Criterion benchmarks"
echo " • Run '$0 --demo' for a quick demonstration"
echo " • Check BENCHMARK_README.md for detailed documentation"

BIN
src/.DS_Store vendored

Binary file not shown.

View File

@ -11,28 +11,28 @@ The `rhai_client` crate provides a client interface for submitting Rhai scripts
- Submit a script and get a `task_id` back immediately. - Submit a script and get a `task_id` back immediately.
- Poll for task status and results using the `task_id`. - Poll for task status and results using the `task_id`.
- Optionally, submit a script and await its completion (or error/timeout) with configurable timeout and polling intervals. - Optionally, submit a script and await its completion (or error/timeout) with configurable timeout and polling intervals.
- **Circle-based Task Routing**: Scripts are submitted to named "circles," allowing for different worker pools or configurations. - **Public Key-based Task Routing**: Scripts are submitted to a "circle" identified by its unique `secp256k1` public key. This ensures tasks are routed to the correct, isolated worker process.
## Core Components ## Core Components
- **`RhaiClient`**: The main struct for interacting with the Rhai task system. It's initialized with a Redis connection URL. - **`RhaiClient`**: The main struct for interacting with the Rhai task system.
- `new(redis_url: &str)`: Creates a new client. - `new(redis_url: &str)`: Creates a new client.
- `submit_script(...)`: Submits a script and returns a `task_id`. - `submit_script(...)`: Submits a script and returns a `task_id`. It requires the target circle's public key for routing.
- `get_task_status(task_id: &str)`: Retrieves the current status and details of a task. - `get_task_status(task_id: &str)`: Retrieves the current status and details of a task.
- `submit_script_and_await_result(...)`: Submits a script and polls until it completes, errors out, or the specified timeout is reached. - `submit_script_and_await_result(...)`: A convenient wrapper that submits a script and polls until it completes, errors out, or the specified timeout is reached.
- **`RhaiTaskDetails`**: A struct representing the details of a task, including its script, status (`pending`, `processing`, `completed`, `error`), output, error messages, and timestamps. - **`RhaiTaskDetails`**: A struct representing the details of a task, including its script, status (`pending`, `processing`, `completed`, `error`), output, error messages, and the public key of the caller.
- **`RhaiClientError`**: An enum for various errors that can occur, such as Redis errors, serialization issues, or task timeouts. - **`RhaiClientError`**: An enum for various errors that can occur, such as Redis errors, serialization issues, or task timeouts.
## How It Works ## How It Works
1. The `RhaiClient` is initialized with the Redis server URL. 1. The `RhaiClient` is initialized with the Redis server URL.
2. When a script is submitted via `submit_script` or `submit_script_and_await_result`: 2. When a script is submitted (e.g., via `submit_script_and_await_result`):
a. A unique `task_id` (UUID v4) is generated. a. A unique `task_id` (UUID v4) is generated.
b. `RhaiTaskDetails` are created with the script, initial status set to "pending", and other relevant metadata. b. `RhaiTaskDetails` are created, including the script, the caller's public key (if provided), and an initial status of "pending".
c. These details are stored in a Redis hash with a key like `rhai_task_details:<task_id>`. c. These details are stored in a Redis hash with a key like `rhai_task_details:<task_id>`.
d. The `task_id` is pushed onto a Redis list named `rhai_tasks:<circle_name>`, which acts as a queue for workers listening to that specific circle. d. The `task_id` is pushed onto a Redis list named `rhai_tasks:<circle_public_key>`, which acts as a queue for the worker assigned to that specific circle.
3. Workers (not part of this client crate) would pop `task_id`s from their respective circle queues, retrieve task details from Redis, execute the script, and update the task details (status, output/error) in Redis. 3. A dedicated `rhai_worker` process, which was spawned with the same `circle_public_key`, pops the `task_id` from its queue, retrieves the task details, executes the script, and updates the results in the Redis hash.
4. The `RhaiClient` can then use `get_task_status` to poll the Redis hash for updates or `submit_script_and_await_result` to automate this polling. 4. The `RhaiClient` can use `get_task_status` to poll the Redis hash for these updates, or `submit_script_and_await_result` to automate the polling.
## Prerequisites ## Prerequisites
@ -40,12 +40,12 @@ The `rhai_client` crate provides a client interface for submitting Rhai scripts
## Usage Example ## Usage Example
The following example demonstrates submitting a script and waiting for its result with a timeout. (This is a conceptual adaptation; see `examples/timeout_example.rs` for a runnable example focused on timeout behavior). The following example demonstrates submitting a script to a circle identified by its public key and waiting for the result.
```rust ```rust
use rhai_client::RhaiClient; use rhai_client::RhaiClient;
use std::time::Duration; use std::time::Duration;
use serde_json::json; // For client_rpc_id example use serde_json::json;
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
@ -56,31 +56,30 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
log::info!("RhaiClient created."); log::info!("RhaiClient created.");
let script_content = r#" let script_content = r#"
fn add(a, b) { a + b } // This script can access both CIRCLE_PUBLIC_KEY and CALLER_PUBLIC_KEY
add(10, 32) "Hello from circle: " + CIRCLE_PUBLIC_KEY + ", called by: " + CALLER_PUBLIC_KEY
"#; "#;
let circle_name = "general_compute"; // The target circle is identified by its public key
let circle_public_key = "02f...some_public_key_hex";
// The entity calling the script also has a public key
let caller_public_key = Some("03a...another_public_key_hex");
let timeout = Duration::from_secs(10); let timeout = Duration::from_secs(10);
let poll_interval = Duration::from_millis(500);
// Optional client-side RPC ID to associate with the task log::info!("Submitting script to circle '{}' and awaiting result...", circle_public_key);
let client_rpc_id = Some(json!({ "request_id": "user_request_abc123" }));
log::info!("Submitting script to circle '{}' and awaiting result...", circle_name);
match client match client
.submit_script_and_await_result( .submit_script_and_await_result(
circle_name, circle_public_key,
script_content.to_string(), script_content.to_string(),
client_rpc_id, None, // Optional client-side RPC ID
timeout, timeout,
poll_interval, caller_public_key,
) )
.await .await
{ {
Ok(details) => { Ok(details) => {
log::info!("Task completed successfully!"); log::info!("Task completed successfully!");
log::info!("Task ID: {}", details.script); // Note: This should likely be a dedicated task_id field if needed from details
log::info!("Status: {}", details.status); log::info!("Status: {}", details.status);
if let Some(output) = details.output { if let Some(output) = details.output {
log::info!("Output: {}", output); log::info!("Output: {}", output);
@ -95,10 +94,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
rhai_client::RhaiClientError::Timeout(task_id) => { rhai_client::RhaiClientError::Timeout(task_id) => {
log::warn!("Task {} timed out.", task_id); log::warn!("Task {} timed out.", task_id);
} }
rhai_client::RhaiClientError::TaskNotFound(task_id) => { _ => {
log::error!("Task {} was not found after submission.", task_id);
}
_ => { // Handle other errors like RedisError, SerializationError
log::error!("Unhandled client error: {:?}", e); log::error!("Unhandled client error: {:?}", e);
} }
} }

View File

@ -30,9 +30,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.submit_script_and_await_result( .submit_script_and_await_result(
non_existent_circle, non_existent_circle,
script_content.to_string(), script_content.to_string(),
None, // No specific task_id "some_task_id".to_string(), // No specific task_id
very_short_timeout, very_short_timeout,
poll_interval, None,
) )
.await .await
{ {

View File

@ -3,7 +3,6 @@ use log::{debug, info, warn, error}; // Added error
use redis::AsyncCommands; use redis::AsyncCommands;
use std::time::Duration; // Duration is still used, Instant and sleep were removed use std::time::Duration; // Duration is still used, Instant and sleep were removed
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; // For client_rpc_id, though not directly used by this client's submit method
use uuid::Uuid; use uuid::Uuid;
const REDIS_TASK_DETAILS_PREFIX: &str = "rhai_task_details:"; const REDIS_TASK_DETAILS_PREFIX: &str = "rhai_task_details:";
@ -12,18 +11,22 @@ const REDIS_REPLY_QUEUE_PREFIX: &str = "rhai_reply:";
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RhaiTaskDetails { pub struct RhaiTaskDetails {
#[serde(rename = "taskId")] // Ensure consistent naming with other fields
pub task_id: String,
pub script: String, pub script: String,
pub status: String, // "pending", "processing", "completed", "error" pub status: String, // "pending", "processing", "completed", "error"
#[serde(rename = "clientRpcId")] // client_rpc_id: Option<Value> is removed.
pub client_rpc_id: Option<Value>, // Kept for compatibility with worker/server, but optional for client // Worker responses should ideally not include it, or Serde will ignore unknown fields by default.
pub output: Option<String>, pub output: Option<String>,
pub error: Option<String>, // Renamed from error_message for consistency pub error: Option<String>, // Renamed from error_message for consistency
#[serde(rename = "createdAt")] #[serde(rename = "createdAt")]
pub created_at: chrono::DateTime<chrono::Utc>, pub created_at: chrono::DateTime<chrono::Utc>,
#[serde(rename = "updatedAt")] #[serde(rename = "updatedAt")]
pub updated_at: chrono::DateTime<chrono::Utc>, pub updated_at: chrono::DateTime<chrono::Utc>,
#[serde(rename = "replyToQueue")] // reply_to_queue: Option<String> is removed from the struct.
pub reply_to_queue: Option<String>, // New field for dedicated reply queue // It's passed to submit_script_to_worker_queue if needed and stored in Redis directly.
#[serde(rename = "publicKey")]
pub public_key: Option<String>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -74,46 +77,37 @@ impl RhaiClient {
&self, &self,
conn: &mut redis::aio::MultiplexedConnection, conn: &mut redis::aio::MultiplexedConnection,
circle_name: &str, circle_name: &str,
task_id: &str, task_id: &str, // This is the main task_id
script: String, script: String,
client_rpc_id: Option<Value>, // client_rpc_id: Option<Value> is removed
reply_to_queue_name: Option<String>, // Made this an Option reply_to_queue_name: Option<String>, // Still needed to tell the worker where to reply, if applicable
public_key: Option<String>,
) -> Result<(), RhaiClientError> { ) -> Result<(), RhaiClientError> {
let now = Utc::now(); let now = Utc::now();
let task_details = RhaiTaskDetails {
script,
status: "pending".to_string(),
client_rpc_id,
output: None,
error: None,
created_at: now,
updated_at: now,
reply_to_queue: reply_to_queue_name.clone(),
};
let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id); let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id);
let worker_queue_key = format!("{}{}", REDIS_QUEUE_PREFIX, circle_name.replace(" ", "_").to_lowercase()); let worker_queue_key = format!("{}{}", REDIS_QUEUE_PREFIX, circle_name.replace(" ", "_").to_lowercase());
debug!( debug!(
"Preparing task_id: {} for circle: {} to worker_queue: {}. Details: {:?}", "Preparing task_id: {} for circle: {} to worker_queue: {}. Script: {}, replyToQueue: {:?}, publicKey: {:?}",
task_id, circle_name, worker_queue_key, task_details task_id, circle_name, worker_queue_key, script, reply_to_queue_name, public_key
); );
let mut hset_args: Vec<(String, String)> = vec![ let mut hset_args: Vec<(String, String)> = vec![
("script".to_string(), task_details.script.clone()), ("taskId".to_string(), task_id.to_string()), // Add taskId
("status".to_string(), task_details.status.clone()), ("script".to_string(), script), // script is moved here
("createdAt".to_string(), task_details.created_at.to_rfc3339()), ("status".to_string(), "pending".to_string()),
("updatedAt".to_string(), task_details.updated_at.to_rfc3339()), ("createdAt".to_string(), now.to_rfc3339()),
("updatedAt".to_string(), now.to_rfc3339()),
]; ];
if let Some(rpc_id_val) = &task_details.client_rpc_id { // clientRpcId field and its corresponding hset_args logic are removed.
hset_args.push(("clientRpcId".to_string(), serde_json::to_string(rpc_id_val)?));
} else { if let Some(queue_name) = &reply_to_queue_name { // Use the passed parameter
hset_args.push(("clientRpcId".to_string(), Value::Null.to_string())); hset_args.push(("replyToQueue".to_string(), queue_name.clone()));
} }
if let Some(pk) = &public_key { // Use the passed parameter
if let Some(reply_q) = &task_details.reply_to_queue { hset_args.push(("publicKey".to_string(), pk.clone()));
hset_args.push(("replyToQueue".to_string(), reply_q.clone()));
} }
// Ensure hset_args is a slice of tuples (String, String) // Ensure hset_args is a slice of tuples (String, String)
@ -139,21 +133,25 @@ impl RhaiClient {
&self, &self,
circle_name: &str, circle_name: &str,
script: String, script: String,
client_rpc_id: Option<Value>, // client_rpc_id: Option<Value> is removed
public_key: Option<String>,
) -> Result<String, RhaiClientError> { ) -> Result<String, RhaiClientError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?; let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let task_id = Uuid::new_v4().to_string(); let task_id = Uuid::new_v4().to_string(); // task_id is generated here for fire-and-forget
debug!("Client submitting script (fire-and-forget) with new task_id: {} to circle: {}", task_id, circle_name);
self.submit_script_to_worker_queue( self.submit_script_to_worker_queue(
&mut conn, &mut conn,
circle_name, circle_name,
&task_id, &task_id,
script, script,
client_rpc_id, // client_rpc_id argument removed
None, // No reply queue for fire-and-forget None, // No dedicated reply queue for fire-and-forget
public_key,
) )
.await?; .await?;
Ok(task_id) Ok(task_id)
} }
@ -168,25 +166,45 @@ impl RhaiClient {
match result_map { match result_map {
Some(map) => { Some(map) => {
// Reconstruct RhaiTaskDetails from HashMap // Reconstruct RhaiTaskDetails from HashMap
// This is a simplified reconstruction; ensure all fields are handled robustly
let details = RhaiTaskDetails { let details = RhaiTaskDetails {
script: map.get("script").cloned().unwrap_or_default(), task_id: task_id.to_string(), // Use the task_id parameter passed to the function
status: map.get("status").cloned().unwrap_or_default(), script: map.get("script").cloned().unwrap_or_else(|| {
client_rpc_id: map.get("clientRpcId") warn!("Task {}: 'script' field missing from Redis hash, defaulting to empty.", task_id);
.and_then(|s| serde_json::from_str(s).ok()) String::new()
.or(Some(Value::Null)), // Default to Value::Null if missing or parse error }),
status: map.get("status").cloned().unwrap_or_else(|| {
warn!("Task {}: 'status' field missing from Redis hash, defaulting to empty.", task_id);
String::new()
}),
// client_rpc_id is no longer a field in RhaiTaskDetails
output: map.get("output").cloned(), output: map.get("output").cloned(),
error: map.get("error").cloned(), error: map.get("error").cloned(),
created_at: map.get("createdAt") created_at: map.get("createdAt")
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&Utc)) .map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(Utc::now), // Provide a default .unwrap_or_else(|| {
warn!("Task {}: 'createdAt' field missing or invalid in Redis hash, defaulting to Utc::now().", task_id);
Utc::now()
}),
updated_at: map.get("updatedAt") updated_at: map.get("updatedAt")
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&Utc)) .map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(Utc::now), // Provide a default .unwrap_or_else(|| {
reply_to_queue: map.get("replyToQueue").cloned(), warn!("Task {}: 'updatedAt' field missing or invalid in Redis hash, defaulting to Utc::now().", task_id);
Utc::now()
}),
// reply_to_queue is no longer a field in RhaiTaskDetails (it's stored in Redis but not in this struct)
public_key: map.get("publicKey").cloned(),
}; };
// It's important to also check if the 'taskId' field exists in the map and matches the input task_id
// for data integrity, though the struct construction above uses the input task_id directly.
if let Some(redis_task_id) = map.get("taskId") {
if redis_task_id != task_id {
warn!("Task {}: Mismatch between requested task_id and taskId found in Redis hash ('{}'). Proceeding with requested task_id.", task_id, redis_task_id);
}
} else {
warn!("Task {}: 'taskId' field missing from Redis hash.", task_id);
}
Ok(Some(details)) Ok(Some(details))
} }
None => Ok(None), None => Ok(None),
@ -197,27 +215,32 @@ impl RhaiClient {
pub async fn submit_script_and_await_result( pub async fn submit_script_and_await_result(
&self, &self,
circle_name: &str, circle_name: &str,
task_id: String, // task_id is now a mandatory parameter provided by the caller
script: String, script: String,
client_rpc_id: Option<Value>,
timeout: Duration, timeout: Duration,
public_key: Option<String>,
) -> Result<RhaiTaskDetails, RhaiClientError> { ) -> Result<RhaiTaskDetails, RhaiClientError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?; let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let task_id = Uuid::new_v4().to_string(); // let task_id = Uuid::new_v4().to_string(); // Removed, task_id is a parameter
let reply_to_queue_name = format!("{}{}", REDIS_REPLY_QUEUE_PREFIX, Uuid::new_v4().to_string()); let reply_to_queue_name =
format!("{}{}", REDIS_REPLY_QUEUE_PREFIX, task_id); // Derived from the passed task_id
self.submit_script_to_worker_queue( self.submit_script_to_worker_queue(
&mut conn, &mut conn,
circle_name, circle_name,
&task_id, &task_id, // Pass the task_id parameter
script, script,
client_rpc_id, // client_rpc_id argument removed
Some(reply_to_queue_name.clone()), Some(reply_to_queue_name.clone()), // Pass the derived reply_to_queue_name
public_key,
) )
.await?; .await?;
info!( info!(
"Task {} submitted. Waiting for result on queue {} with timeout {:?}...", "Task {} submitted. Waiting for result on queue {} with timeout {:?}...",
task_id, reply_to_queue_name, timeout task_id, // This is the UUID
reply_to_queue_name,
timeout
); );
// BLPOP on the reply queue // BLPOP on the reply queue

View File

@ -32,6 +32,6 @@ path = "examples/flow/example.rs"
required-features = ["flow"] required-features = ["flow"]
[[example]] [[example]]
name = "finance_example" name = "finance"
path = "examples/finance/example.rs" path = "examples/finance/example.rs"
required-features = ["finance"] required-features = ["finance"]

View File

@ -33,7 +33,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("---------------------"); println!("---------------------");
// Run the script // Run the script
match eval_file(&engine, &script_path.to_string_lossy()) { match eval_file(&engine, &script_path) {
Ok(result) => { Ok(result) => {
if !result.is_unit() { if !result.is_unit() {
println!("\nScript returned: {:?}", result); println!("\nScript returned: {:?}", result);

View File

@ -27,8 +27,10 @@ pub fn create_heromodels_engine(db: Arc<OurDB>) -> Engine {
/// Register all heromodels Rhai modules with the engine /// Register all heromodels Rhai modules with the engine
pub fn register_all_modules(engine: &mut Engine, db: Arc<OurDB>) { pub fn register_all_modules(engine: &mut Engine, db: Arc<OurDB>) {
// Register the calendar module if the feature is enabled // Register the calendar module if the feature is enabled
heromodels::models::access::register_access_rhai_module(engine, db.clone());
#[cfg(feature = "calendar")] #[cfg(feature = "calendar")]
heromodels::models::calendar::register_calendar_rhai_module(engine, db.clone()); heromodels::models::calendar::register_calendar_rhai_module(engine, db.clone());
heromodels::models::contact::register_contact_rhai_module(engine, db.clone());
heromodels::models::library::register_library_rhai_module(engine, db.clone()); heromodels::models::library::register_library_rhai_module(engine, db.clone());
heromodels::models::circle::register_circle_rhai_module(engine, db.clone()); heromodels::models::circle::register_circle_rhai_module(engine, db.clone());

View File

@ -1,82 +0,0 @@
/// Seed the mock database with finance data
fn seed_finance_data(db: Arc<OurDB>) {
// Create a user account
let mut account = Account::new()
.name("Demo Account")
.user_id(1)
.description("Demo trading account")
.ledger("ethereum")
.address("0x1234567890abcdef1234567890abcdef12345678")
.pubkey("0xabcdef1234567890abcdef1234567890abcdef12");
// Store the account in the database
let (account_id, updated_account) = db.collection::<Account>()
.expect("Failed to get Account collection")
.set(&account)
.expect("Failed to store account");
// Create an ERC20 token asset
let token_asset = Asset::new()
.name("HERO Token")
.description("Herocode governance token")
.amount(1000.0)
.address("0x9876543210abcdef9876543210abcdef98765432")
.asset_type(AssetType::Erc20)
.decimals(18);
// Store the token asset in the database
let (token_id, updated_token) = db.collection::<Asset>()
.expect("Failed to get Asset collection")
.set(&token_asset)
.expect("Failed to store token asset");
// Create an NFT asset
let nft_asset = Asset::new()
.name("Herocode #1")
.description("Unique digital collectible")
.amount(1.0)
.address("0xabcdef1234567890abcdef1234567890abcdef12")
.asset_type(AssetType::Erc721)
.decimals(0);
// Store the NFT asset in the database
let (nft_id, updated_nft) = db.collection::<Asset>()
.expect("Failed to get Asset collection")
.set(&nft_asset)
.expect("Failed to store NFT asset");
// Add assets to the account
account = updated_account.add_asset(token_id);
account = account.add_asset(nft_id);
// Update the account in the database
let (_, updated_account) = db.collection::<Account>()
.expect("Failed to get Account collection")
.set(&account)
.expect("Failed to store updated account");
// Create a listing for the NFT
let listing = Listing::new()
.seller_id(account_id)
.asset_id(nft_id)
.price(0.5)
.currency("ETH")
.listing_type(ListingType::Auction)
.title(Some("Rare Herocode NFT".to_string()))
.description(Some("One of a kind digital collectible".to_string()))
.image_url(Some("https://example.com/nft/1.png".to_string()))
.add_tag("rare".to_string())
.add_tag("collectible".to_string());
// Store the listing in the database
let (listing_id, updated_listing) = db.collection::<Listing>()
.expect("Failed to get Listing collection")
.set(&listing)
.expect("Failed to store listing");
println!("Mock database seeded with finance data:");
println!(" - Added account: {} (ID: {})", updated_account.name, updated_account.base_data.id);
println!(" - Added token asset: {} (ID: {})", updated_token.name, updated_token.base_data.id);
println!(" - Added NFT asset: {} (ID: {})", updated_nft.name, updated_nft.base_data.id);
println!(" - Added listing: {} (ID: {})", updated_listing.title.unwrap_or_default(), updated_listing.base_data.id);
}

BIN
src/repl/.DS_Store vendored Normal file

Binary file not shown.

View File

@ -15,7 +15,7 @@ tempfile = "3.8" # For creating temporary files for editing
rhai_client = { path = "../client" } rhai_client = { path = "../client" }
anyhow = "1.0" # For simpler error handling anyhow = "1.0" # For simpler error handling
worker_lib = { path = "../worker", package = "worker" } rhailib_worker = { path = "../worker", package = "rhailib_worker" }
engine = { path = "../engine" } engine = { path = "../engine" }
heromodels = { path = "../../../db/heromodels", features = ["rhai"] } heromodels = { path = "../../../db/heromodels", features = ["rhai"] }
rhai = { version = "1.18.0" } # Match version used by worker/engine rhai = { version = "1.18.0" } # Match version used by worker/engine

BIN
src/worker/.DS_Store vendored

Binary file not shown.

View File

@ -1,10 +1,10 @@
[package] [package]
name = "worker" name = "rhailib_worker"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
[lib] [lib]
name = "worker_lib" # Can be different from package name, or same name = "rhailib_worker" # Can be different from package name, or same
path = "src/lib.rs" path = "src/lib.rs"
[[bin]] [[bin]]
@ -15,7 +15,7 @@ path = "cmd/worker.rs"
[dependencies] [dependencies]
redis = { version = "0.25.0", features = ["tokio-comp"] } redis = { version = "0.25.0", features = ["tokio-comp"] }
rhai = { version = "1.18.0", features = ["sync", "decimal"] } # Added "decimal" for broader script support rhai = { version = "1.18.0", default-features = false, features = ["sync", "decimal", "std"] } # Added "decimal" for broader script support
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] } tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }

View File

@ -1,76 +1,69 @@
# Rhai Worker # Rhai Worker
The `rhai_worker` crate implements a worker service that listens for Rhai script execution tasks from a Redis queue, executes them using the Rhai scripting engine, and posts results back to Redis. It is designed to work in conjunction with the `rhai_client` crate. The `rhai_worker` crate implements a standalone worker service that listens for Rhai script execution tasks from a Redis queue, executes them, and posts results back to Redis. It is designed to be spawned as a separate OS process by an orchestrator like the `launcher` crate.
## Features ## Features
- **Redis Queue Consumption**: Listens to one or more specified Redis lists (acting as task queues) for incoming task IDs. - **Redis Queue Consumption**: Listens to a specific Redis list (acting as a task queue) for incoming task IDs. The queue is determined by the `--circle-public-key` argument.
- **Rhai Script Execution**: Executes Rhai scripts retrieved based on task IDs. - **Rhai Script Execution**: Executes Rhai scripts retrieved from Redis based on task IDs.
- **Task State Management**: Updates task status (`processing`, `completed`, `error`) and stores results (output or error messages) in Redis hashes. - **Task State Management**: Updates task status (`processing`, `completed`, `error`) and stores results in Redis hashes.
- **Configurable**: - **Script Scope Injection**: Automatically injects two important constants into the Rhai script's scope:
- Redis URL can be specified via command-line arguments. - `CIRCLE_PUBLIC_KEY`: The public key of the worker's own circle.
- Listens to specific "circles" (task queues) provided as command-line arguments. - `CALLER_PUBLIC_KEY`: The public key of the entity that requested the script execution.
- **Asynchronous Operations**: Built with `tokio` for non-blocking Redis communication and script processing. - **Asynchronous Operations**: Built with `tokio` for non-blocking Redis communication.
- **Graceful Error Handling**: Captures errors during script execution and stores them for the client. - **Graceful Error Handling**: Captures errors during script execution and stores them for the client.
## Core Components ## Core Components
- **`worker_lib` (Library Crate)**: - **`worker_lib` (Library Crate)**:
- **`Args`**: A struct (using `clap`) for parsing command-line arguments like Redis URL and target circle names. - **`Args`**: A struct (using `clap`) for parsing command-line arguments: `--redis-url` and `--circle-public-key`.
- **`run_worker_loop(engine: Engine, args: Args)`**: The main asynchronous function that: - **`run_worker_loop(engine: Engine, args: Args)`**: The main asynchronous function that:
- Connects to Redis. - Connects to Redis.
- Continuously polls specified Redis queues (e.g., `rhai_tasks:<circle_name>`) using `BLPOP`. - Continuously polls the designated Redis queue (`rhai_tasks:<circle_public_key>`) using `BLPOP`.
- Upon receiving a `task_id`: - Upon receiving a `task_id`, it fetches the task details from a Redis hash.
- Fetches task details (including the script) from a Redis hash (e.g., `rhai_task_details:<task_id>`). - It injects `CALLER_PUBLIC_KEY` and `CIRCLE_PUBLIC_KEY` into the script's scope.
- Updates the task status to "processing". - It executes the script and updates the task status in Redis with the output or error.
- Executes the Rhai script using the provided `rhai::Engine`.
- Updates the task status to "completed" with the script's output or "error" with the error message.
- **`update_task_status_in_redis(...)`**: A helper function to update task details in Redis.
- **`worker` (Binary Crate - `cmd/worker.rs`)**: - **`worker` (Binary Crate - `cmd/worker.rs`)**:
- The main executable entry point. - The main executable entry point. It parses command-line arguments, initializes a Rhai engine, and invokes `run_worker_loop`.
- Parses command-line arguments.
- Initializes a default `rhai::Engine`.
- Invokes `run_worker_loop` from `worker_lib`.
## How It Works ## How It Works
1. The worker executable is launched, typically with command-line arguments specifying the Redis URL and the "circle(s)" (queues) to monitor. 1. The worker executable is launched by an external process (e.g., `launcher`), which passes the required command-line arguments.
```bash ```bash
./worker --redis-url redis://your-redis-host/ --circles circle_A circle_B # This is typically done programmatically by a parent process.
/path/to/worker --redis-url redis://127.0.0.1/ --circle-public-key 02...abc
``` ```
2. The `run_worker_loop` connects to Redis and starts listening to the designated task queues (e.g., `rhai_tasks:circle_a`, `rhai_tasks:circle_b`). 2. The `run_worker_loop` connects to Redis and starts listening to its designated task queue (e.g., `rhai_tasks:02...abc`).
3. When a `rhai_client` submits a task, it pushes a `task_id` to one of these queues and stores task details (script, initial status "pending") in a Redis hash. 3. A `rhai_client` submits a task by pushing a `task_id` to this queue and storing the script and other details in a Redis hash.
4. The worker's `BLPOP` command picks up a `task_id` from a queue. 4. The worker's `BLPOP` command picks up the `task_id`.
5. The worker retrieves the script from the corresponding `rhai_task_details:<task_id>` hash in Redis. 5. The worker retrieves the script from the corresponding `rhai_task_details:<task_id>` hash.
6. It updates the task's status to "processing" in the Redis hash. 6. It updates the task's status to "processing".
7. The Rhai script is executed. 7. The Rhai script is executed within a scope that contains both `CIRCLE_PUBLIC_KEY` and `CALLER_PUBLIC_KEY`.
8. After execution: 8. After execution, the status is updated to "completed" (with output) or "error" (with an error message).
- If successful, the status is updated to "completed", and the output is stored in the Redis hash.
- If an error occurs, the status is updated to "error", and the error message is stored.
9. The worker then goes back to listening for the next task. 9. The worker then goes back to listening for the next task.
## Prerequisites ## Prerequisites
- A running Redis instance accessible by the worker. - A running Redis instance accessible by the worker.
- The `rhai_client` (or another system) populating the Redis queues and task detail hashes. - An orchestrator process (like `launcher`) to spawn the worker.
- A `rhai_client` (or another system) to populate the Redis queues.
## Building and Running ## Building and Running
The worker is intended to be built as a dependency and run by another program.
1. **Build the worker:** 1. **Build the worker:**
```bash ```bash
# From the root of the rhailib project or within src/worker # From the root of the rhailib project
cargo build # Or cargo build --release for an optimized version cargo build --package worker
``` ```
The binary will typically be found in `target/debug/worker` or `target/release/worker`. The binary will be located at `target/debug/worker`.
2. **Run the worker:** 2. **Running the worker:**
The worker is not typically run manually. The `launcher` crate is responsible for spawning it with the correct arguments. If you need to run it manually for testing, you must provide the required arguments:
```bash ```bash
# Example: ./target/debug/worker --redis-url redis://127.0.0.1/ --circle-public-key <a_valid_hex_public_key>
./target/debug/worker --redis-url redis://127.0.0.1/ --circles my_circle_1 my_circle_2
``` ```
Replace `redis://127.0.0.1/` with your Redis server's URL and `my_circle_1 my_circle_2` with the names of the task queues you want this worker instance to process.
You can run multiple instances of the worker, potentially listening to the same or different circles, to scale out processing.
## Dependencies ## Dependencies
@ -80,7 +73,3 @@ Key dependencies include:
- `clap`: For command-line argument parsing. - `clap`: For command-line argument parsing.
- `tokio`: For the asynchronous runtime. - `tokio`: For the asynchronous runtime.
- `log`, `env_logger`: For logging. - `log`, `env_logger`: For logging.
- `rhai_client`: For shared definitions (potentially, though direct usage is minimal in current `lib.rs`).
## Note on Binary Path
The `Cargo.toml` for the worker specifies the binary path as `src/bin/worker.rs`. However, the actual file is located at `src/cmd/worker.rs`. This README assumes the latter is the correct and current location. If `Cargo.toml` is updated, this note might become obsolete.

View File

@ -1,4 +1,4 @@
use worker_lib::spawn_rhai_worker; use worker::spawn_rhai_worker;
use engine::create_heromodels_engine; use engine::create_heromodels_engine;
use heromodels::db::hero::OurDB; use heromodels::db::hero::OurDB;
use std::sync::Arc; use std::sync::Arc;
@ -8,9 +8,9 @@ use tokio::sync::mpsc;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)] #[command(author, version, about, long_about = None)]
struct Args { struct Args {
/// Circle name to listen to /// Public key of the circle to listen to
#[arg(short, long, default_value = "default")] #[arg(short, long, default_value = "default_public_key")]
circle: String, circle_public_key: String,
/// Redis URL /// Redis URL
#[arg(short, long, default_value = "redis://localhost:6379")] #[arg(short, long, default_value = "redis://localhost:6379")]
@ -32,7 +32,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let args = Args::parse(); let args = Args::parse();
log::info!("Rhai Worker (binary) starting with performance-optimized engine."); log::info!("Rhai Worker (binary) starting with performance-optimized engine.");
log::info!("Worker ID: {}, Circle: {}, Redis: {}", args.worker_id, args.circle, args.redis_url); log::info!("Worker ID: {}, Circle Public Key: {}, Redis: {}", args.worker_id, args.circle_public_key, args.redis_url);
// Initialize database with OurDB for the Rhai engine // Initialize database with OurDB for the Rhai engine
// Using a temporary/in-memory like database for the worker // Using a temporary/in-memory like database for the worker
@ -57,7 +57,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Spawn the worker // Spawn the worker
let worker_handle = spawn_rhai_worker( let worker_handle = spawn_rhai_worker(
1, // circle_id (not used but required) 1, // circle_id (not used but required)
args.circle, args.circle_public_key,
engine, engine,
args.redis_url, args.redis_url,
shutdown_rx, shutdown_rx,

View File

@ -39,83 +39,92 @@ async fn update_task_status_in_redis(
pub fn spawn_rhai_worker( pub fn spawn_rhai_worker(
_circle_id: u32, // For logging or specific logic if needed in the future _circle_id: u32, // For logging or specific logic if needed in the future
circle_name: String, circle_public_key: String,
engine: Engine, engine: Engine,
redis_url: String, redis_url: String,
mut shutdown_rx: mpsc::Receiver<()>, // Add shutdown receiver mut shutdown_rx: mpsc::Receiver<()>, // Add shutdown receiver
preserve_tasks: bool, // Flag to control task cleanup preserve_tasks: bool, // Flag to control task cleanup
) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> { ) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
tokio::spawn(async move { tokio::spawn(async move {
let queue_key = format!("{}{}", REDIS_QUEUE_PREFIX, circle_name.replace(" ", "_").to_lowercase()); let queue_key = format!("{}{}", REDIS_QUEUE_PREFIX, circle_public_key);
info!( info!(
"Rhai Worker for Circle '{}' starting. Connecting to Redis at {}. Listening on queue: {}. Waiting for tasks or shutdown signal.", "Rhai Worker for Circle Public Key '{}' starting. Connecting to Redis at {}. Listening on queue: {}. Waiting for tasks or shutdown signal.",
circle_name, redis_url, queue_key circle_public_key, redis_url, queue_key
); );
let redis_client = match redis::Client::open(redis_url.as_str()) { let redis_client = match redis::Client::open(redis_url.as_str()) {
Ok(client) => client, Ok(client) => client,
Err(e) => { Err(e) => {
error!("Worker for Circle '{}': Failed to open Redis client: {}", circle_name, e); error!("Worker for Circle Public Key '{}': Failed to open Redis client: {}", circle_public_key, e);
return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>); return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
} }
}; };
let mut redis_conn = match redis_client.get_multiplexed_async_connection().await { let mut redis_conn = match redis_client.get_multiplexed_async_connection().await {
Ok(conn) => conn, Ok(conn) => conn,
Err(e) => { Err(e) => {
error!("Worker for Circle '{}': Failed to get Redis connection: {}", circle_name, e); error!("Worker for Circle Public Key '{}': Failed to get Redis connection: {}", circle_public_key, e);
return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>); return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
} }
}; };
info!("Worker for Circle '{}' successfully connected to Redis.", circle_name); info!("Worker for Circle Public Key '{}' successfully connected to Redis.", circle_public_key);
loop { loop {
let blpop_keys = vec![queue_key.clone()]; let blpop_keys = vec![queue_key.clone()];
tokio::select! { tokio::select! {
// Listen for shutdown signal // Listen for shutdown signal
_ = shutdown_rx.recv() => { _ = shutdown_rx.recv() => {
info!("Worker for Circle '{}': Shutdown signal received. Terminating loop.", circle_name); info!("Worker for Circle Public Key '{}': Shutdown signal received. Terminating loop.", circle_public_key);
break; break;
} }
// Listen for tasks from Redis // Listen for tasks from Redis
blpop_result = redis_conn.blpop(&blpop_keys, BLPOP_TIMEOUT_SECONDS as f64) => { blpop_result = redis_conn.blpop(&blpop_keys, BLPOP_TIMEOUT_SECONDS as f64) => {
debug!("Worker for Circle '{}': Attempting BLPOP on queue: {}", circle_name, queue_key); debug!("Worker for Circle Public Key '{}': Attempting BLPOP on queue: {}", circle_public_key, queue_key);
let response: Option<(String, String)> = match blpop_result { let response: Option<(String, String)> = match blpop_result {
Ok(resp) => resp, Ok(resp) => resp,
Err(e) => { Err(e) => {
error!("Worker for Circle '{}': Redis BLPOP error on queue {}: {}. Worker for this circle might stop.", circle_name, queue_key, e); error!("Worker for Circle Public Key '{}': Redis BLPOP error on queue {}: {}. Worker for this circle might stop.", circle_public_key, queue_key, e);
return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>); return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
} }
}; };
if let Some((_queue_name_recv, task_id)) = response { if let Some((_queue_name_recv, task_id)) = response {
info!("Worker for Circle '{}' received task_id: {} from queue: {}", circle_name, task_id, _queue_name_recv); info!("Worker for Circle Public Key '{}' received task_id: {} from queue: {}", circle_public_key, task_id, _queue_name_recv);
debug!("Worker for Circle '{}', Task {}: Processing started.", circle_name, task_id); debug!("Worker for Circle Public Key '{}', Task {}: Processing started.", circle_public_key, task_id);
let task_details_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id); let task_details_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id);
debug!("Worker for Circle '{}', Task {}: Attempting HGETALL from key: {}", circle_name, task_id, task_details_key); debug!("Worker for Circle Public Key '{}', Task {}: Attempting HGETALL from key: {}", circle_public_key, task_id, task_details_key);
let task_details_map_result: Result<HashMap<String, String>, _> = let task_details_map_result: Result<HashMap<String, String>, _> =
redis_conn.hgetall(&task_details_key).await; redis_conn.hgetall(&task_details_key).await;
match task_details_map_result { match task_details_map_result {
Ok(details_map) => { Ok(details_map) => {
debug!("Worker for Circle '{}', Task {}: HGETALL successful. Details: {:?}", circle_name, task_id, details_map); debug!("Worker for Circle Public Key '{}', Task {}: HGETALL successful. Details: {:?}", circle_public_key, task_id, details_map);
let script_content_opt = details_map.get("script").cloned(); let script_content_opt = details_map.get("script").cloned();
let reply_to_queue_opt = details_map.get("replyToQueue").cloned(); let reply_to_queue_opt = details_map.get("replyToQueue").cloned();
let client_rpc_id_str_opt = details_map.get("clientRpcId").cloned();
let created_at_str_opt = details_map.get("createdAt").cloned(); let created_at_str_opt = details_map.get("createdAt").cloned();
let public_key_opt = details_map.get("publicKey").cloned();
if let Some(script_content) = script_content_opt { if let Some(script_content) = script_content_opt {
info!("Worker for Circle '{}' processing task_id: {}. Script: {:.50}...", circle_name, task_id, script_content); info!("Worker for Circle Public Key '{}' processing task_id: {}. Script: {:.50}...", circle_public_key, task_id, script_content);
debug!("Worker for Circle '{}', Task {}: Attempting to update status to 'processing'.", circle_name, task_id); debug!("Worker for Circle Public Key '{}', Task {}: Attempting to update status to 'processing'.", circle_public_key, task_id);
if let Err(e) = update_task_status_in_redis(&mut redis_conn, &task_id, "processing", None, None).await { if let Err(e) = update_task_status_in_redis(&mut redis_conn, &task_id, "processing", None, None).await {
error!("Worker for Circle '{}', Task {}: Failed to update status to 'processing': {}", circle_name, task_id, e); error!("Worker for Circle Public Key '{}', Task {}: Failed to update status to 'processing': {}", circle_public_key, task_id, e);
} else { } else {
debug!("Worker for Circle '{}', Task {}: Status updated to 'processing'.", circle_name, task_id); debug!("Worker for Circle Public Key '{}', Task {}: Status updated to 'processing'.", circle_public_key, task_id);
} }
let mut scope = Scope::new(); let mut scope = Scope::new();
debug!("Worker for Circle '{}', Task {}: Evaluating script with Rhai engine.", circle_name, task_id); scope.push_constant("CIRCLE_PUBLIC_KEY", circle_public_key.clone());
debug!("Worker for Circle Public Key '{}', Task {}: Injected CIRCLE_PUBLIC_KEY into scope.", circle_public_key, task_id);
if let Some(public_key) = public_key_opt.as_deref() {
if !public_key.is_empty() {
scope.push_constant("CALLER_PUBLIC_KEY", public_key.to_string());
debug!("Worker for Circle Public Key '{}', Task {}: Injected CALLER_PUBLIC_KEY into scope.", circle_public_key, task_id);
}
}
debug!("Worker for Circle Public Key '{}', Task {}: Evaluating script with Rhai engine.", circle_public_key, task_id);
let mut final_status = "error".to_string(); // Default to error let mut final_status = "error".to_string(); // Default to error
let mut final_output: Option<String> = None; let mut final_output: Option<String> = None;
@ -130,19 +139,19 @@ pub fn spawn_rhai_worker(
} else { } else {
result.to_string() result.to_string()
}; };
info!("Worker for Circle '{}' task {} completed. Output: {}", circle_name, task_id, output_str); info!("Worker for Circle Public Key '{}' task {} completed. Output: {}", circle_public_key, task_id, output_str);
final_status = "completed".to_string(); final_status = "completed".to_string();
final_output = Some(output_str); final_output = Some(output_str);
} }
Err(e) => { Err(e) => {
let error_str = format!("{:?}", *e); let error_str = format!("{:?}", *e);
error!("Worker for Circle '{}' task {} script evaluation failed. Error: {}", circle_name, task_id, error_str); error!("Worker for Circle Public Key '{}' task {} script evaluation failed. Error: {}", circle_public_key, task_id, error_str);
final_error_msg = Some(error_str); final_error_msg = Some(error_str);
// final_status remains "error" // final_status remains "error"
} }
} }
debug!("Worker for Circle '{}', Task {}: Attempting to update status to '{}'.", circle_name, task_id, final_status); debug!("Worker for Circle Public Key '{}', Task {}: Attempting to update status to '{}'.", circle_public_key, task_id, final_status);
if let Err(e) = update_task_status_in_redis( if let Err(e) = update_task_status_in_redis(
&mut redis_conn, &mut redis_conn,
&task_id, &task_id,
@ -150,39 +159,40 @@ pub fn spawn_rhai_worker(
final_output.clone(), // Clone for task hash update final_output.clone(), // Clone for task hash update
final_error_msg.clone(), // Clone for task hash update final_error_msg.clone(), // Clone for task hash update
).await { ).await {
error!("Worker for Circle '{}', Task {}: Failed to update final status to '{}': {}", circle_name, task_id, final_status, e); error!("Worker for Circle Public Key '{}', Task {}: Failed to update final status to '{}': {}", circle_public_key, task_id, final_status, e);
} else { } else {
debug!("Worker for Circle '{}', Task {}: Final status updated to '{}'.", circle_name, task_id, final_status); debug!("Worker for Circle Public Key '{}', Task {}: Final status updated to '{}'.", circle_public_key, task_id, final_status);
} }
// Send to reply queue if specified // Send to reply queue if specified
if let Some(reply_q) = reply_to_queue_opt { if let Some(reply_q) = reply_to_queue_opt {
let client_rpc_id = client_rpc_id_str_opt.and_then(|s| serde_json::from_str(&s).ok());
let created_at = created_at_str_opt let created_at = created_at_str_opt
.and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok()) .and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
.map(|dt| dt.with_timezone(&Utc)) .map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(Utc::now); // Fallback, though createdAt should exist .unwrap_or_else(Utc::now); // Fallback, though createdAt should exist
let reply_details = RhaiTaskDetails { let reply_details = RhaiTaskDetails {
task_id: task_id.to_string(), // Add the task_id
script: script_content.clone(), // Include script for context in reply script: script_content.clone(), // Include script for context in reply
status: final_status, // The final status status: final_status, // The final status
client_rpc_id, // client_rpc_id is no longer a field
output: final_output, // The final output output: final_output, // The final output
error: final_error_msg, // The final error error: final_error_msg, // The final error
created_at, // Original creation time created_at, // Original creation time
updated_at: Utc::now(), // Time of this final update/reply updated_at: Utc::now(), // Time of this final update/reply
reply_to_queue: None, // This field is not relevant for the message content itself // reply_to_queue is no longer a field
public_key: public_key_opt,
}; };
match serde_json::to_string(&reply_details) { match serde_json::to_string(&reply_details) {
Ok(reply_json) => { Ok(reply_json) => {
let lpush_result: redis::RedisResult<i64> = redis_conn.lpush(&reply_q, &reply_json).await; let lpush_result: redis::RedisResult<i64> = redis_conn.lpush(&reply_q, &reply_json).await;
match lpush_result { match lpush_result {
Ok(_) => debug!("Worker for Circle '{}', Task {}: Successfully sent result to reply queue {}", circle_name, task_id, reply_q), Ok(_) => debug!("Worker for Circle Public Key '{}', Task {}: Successfully sent result to reply queue {}", circle_public_key, task_id, reply_q),
Err(e_lpush) => error!("Worker for Circle '{}', Task {}: Failed to LPUSH result to reply queue {}: {}", circle_name, task_id, reply_q, e_lpush), Err(e_lpush) => error!("Worker for Circle Public Key '{}', Task {}: Failed to LPUSH result to reply queue {}: {}", circle_public_key, task_id, reply_q, e_lpush),
} }
} }
Err(e_json) => { Err(e_json) => {
error!("Worker for Circle '{}', Task {}: Failed to serialize reply details for queue {}: {}", circle_name, task_id, reply_q, e_json); error!("Worker for Circle Public Key '{}', Task {}: Failed to serialize reply details for queue {}: {}", circle_public_key, task_id, reply_q, e_json);
} }
} }
} }
@ -190,43 +200,43 @@ pub fn spawn_rhai_worker(
if !preserve_tasks { if !preserve_tasks {
// The worker is responsible for cleaning up the task details hash. // The worker is responsible for cleaning up the task details hash.
if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await { if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await {
error!("Worker for Circle '{}', Task {}: Failed to delete task details key '{}': {}", circle_name, task_id, task_details_key, e); error!("Worker for Circle Public Key '{}', Task {}: Failed to delete task details key '{}': {}", circle_public_key, task_id, task_details_key, e);
} else { } else {
debug!("Worker for Circle '{}', Task {}: Cleaned up task details key '{}'.", circle_name, task_id, task_details_key); debug!("Worker for Circle Public Key '{}', Task {}: Cleaned up task details key '{}'.", circle_public_key, task_id, task_details_key);
} }
} else { } else {
debug!("Worker for Circle '{}', Task {}: Preserving task details (preserve_tasks=true)", circle_name, task_id); debug!("Worker for Circle Public Key '{}', Task {}: Preserving task details (preserve_tasks=true)", circle_public_key, task_id);
} }
} else { // Script content not found in hash } else { // Script content not found in hash
error!( error!(
"Worker for Circle '{}', Task {}: Script content not found in Redis hash. Details map: {:?}", "Worker for Circle Public Key '{}', Task {}: Script content not found in Redis hash. Details map: {:?}",
circle_name, task_id, details_map circle_public_key, task_id, details_map
); );
// Clean up invalid task details based on preserve_tasks flag // Clean up invalid task details based on preserve_tasks flag
if !preserve_tasks { if !preserve_tasks {
// Even if the script is not found, the worker should clean up the invalid task hash. // Even if the script is not found, the worker should clean up the invalid task hash.
if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await { if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await {
error!("Worker for Circle '{}', Task {}: Failed to delete invalid task details key '{}': {}", circle_name, task_id, task_details_key, e); error!("Worker for Circle Public Key '{}', Task {}: Failed to delete invalid task details key '{}': {}", circle_public_key, task_id, task_details_key, e);
} }
} else { } else {
debug!("Worker for Circle '{}', Task {}: Preserving invalid task details (preserve_tasks=true)", circle_name, task_id); debug!("Worker for Circle Public Key '{}', Task {}: Preserving invalid task details (preserve_tasks=true)", circle_public_key, task_id);
} }
} }
} }
Err(e) => { Err(e) => {
error!( error!(
"Worker for Circle '{}', Task {}: Failed to fetch details (HGETALL) from Redis for key {}. Error: {:?}", "Worker for Circle Public Key '{}', Task {}: Failed to fetch details (HGETALL) from Redis for key {}. Error: {:?}",
circle_name, task_id, task_details_key, e circle_public_key, task_id, task_details_key, e
); );
} }
} }
} else { } else {
debug!("Worker for Circle '{}': BLPOP timed out on queue {}. No new tasks. Checking for shutdown signal again.", circle_name, queue_key); debug!("Worker for Circle Public Key '{}': BLPOP timed out on queue {}. No new tasks. Checking for shutdown signal again.", &circle_public_key, &queue_key);
} }
} // End of blpop_result match } // End of blpop_result match
} // End of tokio::select! } // End of tokio::select!
} // End of loop } // End of loop
info!("Worker for Circle '{}' has shut down.", circle_name); info!("Worker for Circle Public Key '{}' has shut down.", circle_public_key);
Ok(()) Ok(())
}) })
} }

View File

@ -1,158 +0,0 @@
#!/bin/bash
# Exit immediately if a command exits with a non-zero status.
# set -e # We will handle errors manually for cleanup
# Instead of set -e, we'll check command statuses and exit if needed after attempting cleanup.
# Default to not force killing processes
FORCE_KILL=false
# Parse command-line options
while getopts "f" opt; do
case ${opt} in
f )
FORCE_KILL=true
;;
\? )
echo "Invalid option: -$OPTARG" 1>&2
exit 1
;;
esac
done
shift $((OPTIND -1))
# Array to store PIDs of background processes
BG_PIDS=()
# Cleanup function
cleanup() {
echo "Caught signal, cleaning up background processes..."
for pid in "${BG_PIDS[@]}"; do
if ps -p "$pid" > /dev/null; then # Check if process exists
echo "Stopping process $pid..."
kill "$pid"
fi
done
# Wait for all background processes to terminate
for pid in "${BG_PIDS[@]}"; do
if ps -p "$pid" > /dev/null; then
wait "$pid" 2>/dev/null # Suppress "No such process" if already gone
fi
done
echo "All background processes stopped."
exit 0 # Exit script after cleanup
}
# Trap SIGINT (Ctrl+C) and SIGTERM
trap cleanup SIGINT SIGTERM
# Define circles and their base port
# The client will need to know these port assignments.
# Circle names should match what's in your mock data for consistency,
# but for the WS server, it's what the server identifies itself as.
# The client will use the lowercase_with_underscores version for the path.
# Define circles and their ports using indexed arrays
CIRCLE_NAMES=(
"OurWorld"
"My Personal Space"
"threefold"
"circles_app"
)
CIRCLE_PORTS=(
"9000"
"9001"
"9002"
"9003"
)
# Add more circles and their ports here if needed, ensuring arrays match
# Build the WebSocket server first
echo "Building circle_server_ws..."
cargo build --package circle_server_ws
if [ $? -ne 0 ]; then echo "Failed to build circle_server_ws"; cleanup; exit 1; fi
echo "Building rhai_worker..."
cargo build --package rhai_worker
if [ $? -ne 0 ]; then echo "Failed to build rhai_worker"; cleanup; exit 1; fi
# Paths to the compiled binaries
WS_SERVER_BINARY="./target/debug/circle_server_ws"
RHAI_WORKER_BINARY="./target/debug/rhai_worker"
if [ ! -f "$WS_SERVER_BINARY" ]; then
echo "Error: WebSocket server binary not found at $WS_SERVER_BINARY after build."
cleanup
exit 1
fi
if [ ! -f "$RHAI_WORKER_BINARY" ]; then
echo "Error: Rhai worker binary not found at $RHAI_WORKER_BINARY after build."
cleanup
exit 1
fi
echo "Starting WebSocket servers..."
for i in "${!CIRCLE_NAMES[@]}"; do
NAME="${CIRCLE_NAMES[i]}"
PORT="${CIRCLE_PORTS[i]}"
if [ "$FORCE_KILL" = true ]; then
echo "Checking if port $PORT is in use (force mode)..."
# lsof -i :<port> -t lists PIDs listening on the port
# The output might be empty or multiple PIDs.
# We'll kill any PID found.
PIDS_ON_PORT=$(lsof -i ":$PORT" -t 2>/dev/null || true) # Suppress lsof error if port not in use, || true ensures command doesn't fail script
if [ -n "$PIDS_ON_PORT" ]; then
for PID_TO_KILL in $PIDS_ON_PORT; do
echo "Port $PORT is in use by PID $PID_TO_KILL. Forcing kill..."
kill -9 "$PID_TO_KILL" # Force kill
# Add a small delay to give the OS time to free the port
sleep 0.5
done
else
echo "Port $PORT is free."
fi
fi
# The circle name passed to the server is the "identity" name.
# The client will still connect to ws://localhost:PORT/ws
echo "Starting server for '$NAME' on port $PORT..."
# Run in background
"$WS_SERVER_BINARY" --port "$PORT" --circle-name "$NAME" &
BG_PIDS+=($!) # Store PID of the last backgrounded process
done
echo "All WebSocket servers launched."
# Prepare circle names for rhai_worker
# It expects names like "OurWorld", "My Personal Space"
# We can directly use the CIRCLE_NAMES array
echo "Starting Rhai worker for circles: ${CIRCLE_NAMES[@]}..."
# Run rhai_worker in the background
# Assuming default Redis URL redis://127.0.0.1/
"$RHAI_WORKER_BINARY" --circles "${CIRCLE_NAMES[@]}" &
BG_PIDS+=($!) # Store PID of the Rhai worker
echo "Rhai worker launched."
echo "All processes launched. Press Ctrl+C to stop all servers and the worker."
# Wait for all background PIDs.
# If any of them exit prematurely, this script will also exit.
# The trap will handle cleanup if Ctrl+C is pressed.
for pid in "${BG_PIDS[@]}"; do
wait "$pid"
# If a process exits with an error, its exit code will be propagated by wait.
# The script will then exit due to `wait` itself exiting with that code.
# The trap should still run on SIGINT/SIGTERM.
# For other signals or unexpected exits, the trap might not run.
# More robust error handling for individual process failures could be added here.
done
# If all processes exited cleanly (e.g., were killed by the trap),
# the script will reach here. The trap's exit 0 will handle this.
# If they exited due to an error, `wait` would have exited the script.
cleanup # Call cleanup if all jobs finished normally (e.g. if they self-terminate)