add benchmarking, more models and examples

This commit is contained in:
timurgordon 2025-06-12 05:21:52 +03:00
parent 79b37cf9ce
commit de1740f0d1
51 changed files with 7110 additions and 231 deletions

124
ARCHITECTURE.md Normal file
View File

@ -0,0 +1,124 @@
# Rhailib Architecture: Distributed Rhai Scripting
## 1. Overview
`rhailib` provides a robust infrastructure for executing Rhai scripts in a distributed manner, primarily designed to integrate with and extend the HeroModels ecosystem. It allows for dynamic scripting capabilities, offloading computation, and enabling flexible automation. This document describes the target architecture utilizing dedicated reply queues for efficient result notification.
## 2. Core Components
The `rhailib` system is composed of the following main components, leveraging Redis for task queuing, state management, and result notification:
1. **Rhai Engine (`src/engine`):**
* The core scripting capability. Provides a Rhai engine pre-configured with various modules.
* Utilized by the `rhai_worker` to process tasks.
2. **Rhai Client (`src/client`):**
* Offers an interface for applications to submit Rhai scripts as tasks.
* Submits tasks to named Redis queues ("circles").
* Waits for results on a dedicated reply queue, avoiding polling.
3. **Rhai Worker (`src/worker`):**
* Listens to Redis task queues ("circles") for incoming task IDs.
* Fetches task details, executes the script using the `rhai_engine`.
* Updates task status and results in Redis.
* Sends a notification/result to the client's dedicated reply queue.
4. **Redis:**
* Acts as the message broker and data store for task queues, detailed task information (including scripts, status, and results), and reply queues for notifications.
## 3. Architecture & Workflow (Dedicated Reply Queues)
The system employs a "Dedicated Reply Queue" pattern to notify clients of task completion, enhancing efficiency by eliminating client-side polling for results.
**Workflow:**
1. **Task Submission (Client):**
a. The client generates a unique `task_id` and a unique `reply_queue_name` (e.g., `rhai_reply:<uuid>`).
b. Task details, including the script, initial status ("pending"), and the `reply_queue_name`, are stored in a Redis Hash: `rhai_task_details:<task_id>`.
c. The `task_id` is pushed onto a Redis List acting as a task queue for a specific "circle": `rhai_tasks:<circle_name>`.
d. The client then performs a blocking pop (`BLPOP`) on its `reply_queue_name`, waiting for the result message.
2. **Task Consumption & Processing (Worker):**
a. A `rhai_worker` instance, listening to one or more `rhai_tasks:<circle_name>` queues, picks up a `task_id` using `BLPOP`.
b. The worker retrieves the full task details (including the script and `reply_queue_name`) from the `rhai_task_details:<task_id>` hash.
c. The worker updates the task's status in the hash to "processing".
d. The Rhai script is executed using an instance of the `rhai_engine`.
3. **Result Storage & Notification (Worker):**
a. Upon completion (or error), the worker updates the `rhai_task_details:<task_id>` hash with the final status ("completed" or "error") and the script's output or error message.
b. If a `reply_queue_name` was provided in the task details, the worker constructs a result message (e.g., JSON containing `task_id`, final `status`, `output`/`error`).
c. The worker pushes this result message onto the specified `reply_queue_name` using `LPUSH`.
4. **Result Reception (Client):**
a. The client's `BLPOP` on its `reply_queue_name` receives the result message.
b. The client processes the result or error.
c. Optionally, the client may `DEL`ete its temporary reply queue.
**Diagram:**
```mermaid
sequenceDiagram
participant Client
participant Redis
participant Worker
Client->>Client: 1. Generate task_id & reply_queue_name
Client->>Redis: 2. LPUSH task_id to rhai_tasks:<circle>
Client->>Redis: 3. HSET rhai_task_details:<task_id> (script, status:pending, reply_to:reply_queue_name)
Client->>Redis: 4. BLPOP from reply_queue_name (waits with timeout)
Worker->>Redis: 5. BLPOP from rhai_tasks:<circle>
Redis-->>Worker: task_id
Worker->>Redis: 6. HGETALL rhai_task_details:<task_id>
Redis-->>Worker: task_details (script, reply_to)
Worker->>Redis: 7. HSET rhai_task_details:<task_id> (status:processing)
Note over Worker: Executes Rhai script
alt Script Success
Worker->>Redis: 8a. HSET rhai_task_details:<task_id> (status:completed, output)
Worker->>Redis: 9a. LPUSH to reply_queue_name (message: {task_id, status:completed, output})
else Script Error
Worker->>Redis: 8b. HSET rhai_task_details:<task_id> (status:error, error_msg)
Worker->>Redis: 9b. LPUSH to reply_queue_name (message: {task_id, status:error, error_msg})
end
Redis-->>Client: Result message from reply_queue_name
Client->>Client: Process result/error
Client->>Redis: 10. DEL reply_queue_name (optional cleanup)
```
This architecture allows for:
* Asynchronous and non-blocking script execution for the client.
* Scalable processing of Rhai scripts by running multiple workers.
* Efficient, event-driven result notification to clients.
* Robust task state tracking and observability.
## 4. Redis Data Structures
* **Task Queues:**
* Key Pattern: `rhai_tasks:<circle_name>`
* Type: List
* Purpose: FIFO queue for `task_id`s waiting to be processed by workers assigned to a specific circle. Workers use `BLPOP`. Clients use `LPUSH`.
* **Task Details:**
* Key Pattern: `rhai_task_details:<task_id>`
* Type: Hash
* Purpose: Stores all information about a specific task.
* Key Fields:
* `script`: The Rhai script content.
* `status`: Current state of the task (e.g., "pending", "processing", "completed", "error").
* `client_rpc_id`: Optional client-provided identifier.
* `output`: The result of a successful script execution.
* `error`: Error message if script execution failed.
* `created_at`: Timestamp of task creation.
* `updated_at`: Timestamp of the last update to the task details.
* `reply_to_queue`: (New) The name of the dedicated Redis List the client is listening on for the result.
* **Reply Queues:**
* Key Pattern: `rhai_reply:<unique_identifier>` (e.g., `rhai_reply:<uuid_generated_by_client>`)
* Type: List
* Purpose: A temporary, client-specific queue where the worker pushes the final result/notification for a particular task. The client uses `BLPOP` to wait for a message on this queue.
## 5. Key Design Choices
* **Hashes for Task Details:** Storing comprehensive task details in a Redis Hash provides a persistent, inspectable, and easily updatable record of each task's lifecycle. This aids in monitoring, debugging, and potential recovery scenarios.
* **Dedicated Reply Queues:** Using a unique Redis List per client request for result notification offers reliable message delivery (compared to Pub/Sub's fire-and-forget) and allows the client to efficiently block until its specific result is ready, eliminating polling.
* **Status Tracking in Hash:** Maintaining the `status` field within the task details hash ("pending", "processing", "completed", "error") offers crucial observability into the system's state and the progress of individual tasks, independent of the client-worker notification flow.

91
BENCHMARK_README.md Normal file
View File

@ -0,0 +1,91 @@
# Rhailib Benchmarking - SIMPLIFIED ✨
> **Note**: This document describes the old complex benchmarking system.
> **For the new minimal system, see [`bench/README.md`](bench/README.md)**
## 🎯 New Minimal Benchmark System
The benchmarking system has been **drastically simplified**:
- **85% Code Reduction**: From 800+ lines to ~113 lines
- **Single File**: All logic in [`bench/simple_bench.rs`](bench/simple_bench.rs)
- **Direct Timing**: Redis timestamps, no complex stats
- **Minimal Dependencies**: No criterion, no abstractions
### Quick Start
```bash
cd bench
cargo run --bin simple_bench
```
### Expected Output
```
🧹 Cleaning up Redis...
🚀 Starting worker...
📝 Creating single task...
⏱️ Waiting for completion...
✅ Task completed in 23.45ms
🧹 Cleaning up...
```
## 📁 New Structure
```
rhailib/
├── bench/ # NEW: Minimal benchmark system
│ ├── simple_bench.rs # Main benchmark (85 lines)
│ ├── batch_task.lua # Simple task creation (28 lines)
│ ├── Cargo.toml # Dependencies
│ └── README.md # Usage instructions
└── scripts/ # Cleaned up scripts
├── run_rhai_batch.lua # Original batch script (kept)
└── run_rhai.lua # Basic script (kept)
```
## 🗑️ What Was Removed
- `benches/` directory (complex criterion-based benchmarks)
- `src/benchmarks/` module (redis_stats.rs, worker_manager.rs)
- Complex Lua scripts (`run_rhai_with_wait.lua`, `run_rhai_blocking.sh`)
- Framework dependencies (criterion, complex stats)
## 🚀 Benefits of New System
1. **Minimalism**: Single file, linear flow
2. **Direct Timing**: `updated_at - created_at` from Redis
3. **Easy to Understand**: No abstractions or frameworks
4. **Fast to Modify**: 85 lines vs 800+ lines
5. **Reliable**: Simple Redis operations
6. **Extensible**: Easy to add features incrementally
## 📈 Iteration Plan
- **Current**: Single task (n=1) benchmarking
- **Next**: Small batches (n=5, n=10)
- **Future**: Larger batches and script complexity
---
## 📚 Old System Documentation (Archived)
The following describes the previous complex system that has been removed:
### Old Architecture (REMOVED)
- Complex Criterion-based benchmarking
- Multi-module statistics collection
- Abstract worker management
- Complex configuration systems
- Framework dependencies
### Old Files (REMOVED)
- `benches/rhai_performance_bench.rs` (237 lines)
- `src/benchmarks/redis_stats.rs` (285 lines)
- `src/benchmarks/worker_manager.rs` (~200 lines)
- `src/benchmarks/mod.rs` (10 lines)
**Total removed**: ~800+ lines of complex code
---
**For current benchmarking, use the new minimal system in [`bench/`](bench/)**

2008
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -4,7 +4,24 @@ version = "0.1.0"
edition = "2021" # Changed to 2021 for consistency with other crates edition = "2021" # Changed to 2021 for consistency with other crates
[dependencies] [dependencies]
# Dependencies for rhailib itself would go here
anyhow = "1.0"
chrono = { version = "0.4", features = ["serde"] }
env_logger = "0.10"
log = "0.4"
redis = { version = "0.25.0", features = ["tokio-comp"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }
[dev-dependencies]
criterion = { version = "0.5", features = ["html_reports"] }
[[bench]]
name = "simple_rhai_bench"
harness = false
[workspace] [workspace]
members = [ members = [
@ -12,6 +29,9 @@ members = [
"src/client", "src/client",
"src/engine", "src/engine",
"src/worker", "src/worker",
"src/monitor", # Added the new monitor package to workspace
"src/repl", # Added the refactored REPL package
"examples", "examples",
"src/rhai_engine_ui",
] ]
resolver = "2" # Recommended for new workspaces resolver = "2" # Recommended for new workspaces

View File

@ -0,0 +1,72 @@
# Minimal Rhailib Benchmark
A simplified, minimal benchmarking tool for rhailib performance testing.
## Overview
This benchmark focuses on simplicity and direct timing measurements:
- Creates a single task (n=1) using Lua script
- Measures latency using Redis timestamps
- Uses existing worker binary
- ~85 lines of code total
## Usage
### Prerequisites
- Redis running on `127.0.0.1:6379`
- Worker binary built: `cd src/worker && cargo build --release`
### Run Benchmark
```bash
# From project root
cargo bench
```
### Expected Output
```
🧹 Cleaning up Redis...
🚀 Starting worker...
📝 Creating single task...
⏱️ Waiting for completion...
✅ Task completed in 23.45ms
🧹 Cleaning up...
```
## Files
- `simple_bench.rs` - Main benchmark binary (85 lines)
- `batch_task.lua` - Minimal Lua script for task creation (28 lines)
- `Cargo.toml` - Dependencies and binary configuration
- `README.md` - This file
## How It Works
1. **Cleanup**: Clear Redis queues and task details
2. **Start Worker**: Spawn single worker process
3. **Create Task**: Use Lua script to create one task with timestamp
4. **Wait & Measure**: Poll task until complete, calculate latency
5. **Cleanup**: Kill worker and clear Redis
## Latency Calculation
```
latency_ms = updated_at - created_at
```
Where:
- `created_at`: Timestamp when task was created (Lua script)
- `updated_at`: Timestamp when worker completed task
## Future Iterations
- **Iteration 2**: Small batches (n=5, n=10)
- **Iteration 3**: Larger batches and script complexity
- **Iteration 4**: Performance optimizations
## Benefits
- **Minimal Code**: 85 lines vs previous 800+ lines
- **Easy to Understand**: Single file, linear flow
- **Direct Timing**: Redis timestamps, no complex stats
- **Fast to Modify**: No abstractions or frameworks
- **Reliable**: Simple Redis operations

View File

@ -0,0 +1,46 @@
-- Minimal Lua script for single task creation (n=1)
-- Args: circle_name, rhai_script_content, task_count (optional, defaults to 1)
-- Returns: array of task keys for timing
if #ARGV < 2 then
return redis.error_reply("Usage: EVAL script 0 circle_name rhai_script_content [task_count]")
end
local circle_name = ARGV[1]
local rhai_script_content = ARGV[2]
local task_count = tonumber(ARGV[3]) or 1
-- Validate task_count
if task_count <= 0 or task_count > 10000 then
return redis.error_reply("task_count must be a positive integer between 1 and 10000")
end
-- Get current timestamp in Unix seconds (to match worker expectations)
local rhai_task_queue = 'rhai_tasks:' .. circle_name
local task_keys = {}
local current_time = redis.call('TIME')[1]
-- Create multiple tasks
for i = 1, task_count do
-- Generate unique task ID
local task_id = 'task_' .. redis.call('INCR', 'global_task_counter')
local task_details_key = 'rhai_task_details:' .. task_id
-- Create task details hash with creation timestamp
redis.call('HSET', task_details_key,
'script', rhai_script_content,
'status', 'pending',
'createdAt', current_time,
'updatedAt', current_time,
'task_sequence', tostring(i)
)
-- Queue the task for workers
redis.call('LPUSH', rhai_task_queue, task_id)
-- Add key to return array
table.insert(task_keys, task_details_key)
end
-- Return array of task keys for timing analysis
return task_keys

View File

@ -0,0 +1,204 @@
use criterion::{criterion_group, criterion_main, Criterion};
use redis::{Client, Commands};
use std::process::{Command, Child, Stdio};
use std::time::Duration;
use std::thread;
use std::fs;
const REDIS_URL: &str = "redis://127.0.0.1:6379";
const CIRCLE_NAME: &str = "bench_circle";
const SIMPLE_SCRIPT: &str = "new_event()\n .title(\"Weekly Sync\")\n .location(\"Conference Room A\")\n .description(\"Regular team sync meeting\")\n .save_event();";
fn cleanup_redis() -> Result<(), redis::RedisError> {
let client = Client::open(REDIS_URL)?;
let mut conn = client.get_connection()?;
// Clear task queue and any existing task details
let _: () = conn.del(format!("rhai_tasks:{}", CIRCLE_NAME))?;
let keys: Vec<String> = conn.scan_match("rhai_task_details:*")?.collect();
if !keys.is_empty() {
let _: () = conn.del(keys)?;
}
Ok(())
}
fn start_worker() -> Result<Child, std::io::Error> {
Command::new("cargo")
.args(&["run", "--release", "--bin", "worker", "--",
"--circle", CIRCLE_NAME,
"--redis-url", REDIS_URL,
"--worker-id", "bench_worker",
"--preserve-tasks"])
.current_dir("src/worker")
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()
}
fn create_batch_tasks(task_count: usize) -> Result<Vec<String>, Box<dyn std::error::Error>> {
let client = Client::open(REDIS_URL)?;
let mut conn = client.get_connection()?;
// Load and execute Lua script
let lua_script = fs::read_to_string("benches/simple_rhai_bench/batch_task.lua")?;
let result: redis::Value = redis::cmd("EVAL")
.arg(lua_script)
.arg(0)
.arg(CIRCLE_NAME)
.arg(SIMPLE_SCRIPT)
.arg(task_count)
.query(&mut conn)?;
// Parse the task keys from the response
let task_keys = match result {
redis::Value::Bulk(items) => {
let mut keys = Vec::new();
for item in items {
if let redis::Value::Data(key_data) = item {
keys.push(String::from_utf8_lossy(&key_data).to_string());
}
}
keys
}
_ => {
return Err(format!("Unexpected Redis response type: {:?}", result).into());
}
};
Ok(task_keys)
}
fn wait_and_measure(task_key: &str) -> Result<f64, redis::RedisError> {
let client = Client::open(REDIS_URL)?;
let mut conn = client.get_connection()?;
let start_time = std::time::Instant::now();
let timeout = Duration::from_secs(100);
// Poll until task is completed or timeout
loop {
let status: Option<String> = conn.hget(task_key, "status")?;
match status.as_deref() {
Some("completed") | Some("error") => {
println!("Task {} completed with status: {}", task_key, status.as_deref().unwrap_or("unknown"));
let created_at: u64 = conn.hget(task_key, "createdAt")?;
let updated_at: u64 = conn.hget(task_key, "updatedAt")?;
return Ok((updated_at - created_at) as f64 * 1000.0); // Convert to milliseconds
}
Some("pending") | Some("processing") => {
thread::sleep(Duration::from_millis(100));
}
_ => {
thread::sleep(Duration::from_millis(100));
}
}
// Check timeout
if start_time.elapsed() > timeout {
return Err(redis::RedisError::from((
redis::ErrorKind::IoError,
"Timeout waiting for task completion"
)));
}
}
}
fn wait_for_batch_completion(task_keys: &[String]) -> Result<f64, Box<dyn std::error::Error>> {
let client = Client::open(REDIS_URL)?;
let mut conn = client.get_connection()?;
let start_time = std::time::Instant::now();
let timeout = Duration::from_secs(30);
// Wait for all tasks to complete
loop {
let mut completed_count = 0;
let mut total_latency = 0u64;
for task_key in task_keys {
let status: Option<String> = conn.hget(task_key, "status")?;
match status.as_deref() {
Some("completed") | Some("error") => {
completed_count += 1;
// Get timing data
let created_at: u64 = conn.hget(task_key, "createdAt")?;
let updated_at: u64 = conn.hget(task_key, "updatedAt")?;
total_latency += updated_at - created_at;
}
_ => {} // Still pending or processing
}
}
if completed_count == task_keys.len() {
// All tasks completed, calculate average latency in milliseconds
let avg_latency_ms = (total_latency as f64 / task_keys.len() as f64) * 1000.0;
return Ok(avg_latency_ms);
}
// Check timeout
if start_time.elapsed() > timeout {
return Err(format!("Timeout waiting for batch completion. Completed: {}/{}", completed_count, task_keys.len()).into());
}
thread::sleep(Duration::from_millis(100));
}
}
fn cleanup_worker(mut worker: Child) -> Result<(), std::io::Error> {
worker.kill()?;
worker.wait()?;
Ok(())
}
fn bench_single_rhai_task(c: &mut Criterion) {
// Setup: ensure worker is built
let _ = Command::new("cargo")
.args(&["build", "--release", "--bin", "worker"])
.current_dir("src/worker")
.output()
.expect("Failed to build worker");
// Clean up before starting
cleanup_redis().expect("Failed to cleanup Redis");
// Start worker once and reuse it
let worker = start_worker().expect("Failed to start worker");
thread::sleep(Duration::from_millis(1000)); // Give worker time to start
let mut group = c.benchmark_group("rhai_task_execution");
group.sample_size(10); // Reduce sample size
group.measurement_time(Duration::from_secs(10)); // Reduce measurement time
group.bench_function("batch_task_latency", |b| {
b.iter_custom(|iters| {
let mut total_latency = Duration::ZERO;
for _i in 0..iters {
// Clean up Redis between iterations
cleanup_redis().expect("Failed to cleanup Redis");
// Create 100 tasks and measure average latency using Redis timestamps
let task_keys = create_batch_tasks(5000).expect("Failed to create batch tasks");
let avg_latency_ms = wait_for_batch_completion(&task_keys).expect("Failed to measure batch completion");
// Convert average latency to duration
total_latency += Duration::from_millis(avg_latency_ms as u64);
}
total_latency
});
});
group.finish();
// Cleanup worker
cleanup_worker(worker).expect("Failed to cleanup worker");
cleanup_redis().expect("Failed to cleanup Redis");
}
criterion_group!(benches, bench_single_rhai_task);
criterion_main!(benches);

View File

@ -6,16 +6,16 @@ publish = false # This is a package of examples, not meant to be published
[dependencies] [dependencies]
# Local Rhailib crates # Local Rhailib crates
# Allows 'use rhai_client::...'
rhai_client = { path = "../src/client" } rhai_client = { path = "../src/client" }
# Allows 'use worker_lib::...' worker = { path = "../src/worker" }
worker_lib = { path = "../src/worker", package = "worker" }
# External dependencies (versions aligned with other crates) # External dependencies
rhai = { version = "1.18.0", features = ["sync", "decimal"] } rhai = "1.18.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] } tokio = { version = "1", features = ["full"] }
log = "0.4" log = "0.4"
env_logger = "0.10" env_logger = "0.10"
serde_json = "1.0"
chrono = "0.4"
[[bin]] [[bin]]
name = "example_math_worker" name = "example_math_worker"
@ -24,3 +24,11 @@ path = "example_math_worker.rs"
[[bin]] [[bin]]
name = "example_string_worker" name = "example_string_worker"
path = "example_string_worker.rs" path = "example_string_worker.rs"
[[bin]]
name = "dedicated_reply_queue_demo"
path = "dedicated_reply_queue_demo.rs"
[[bin]]
name = "lua_client_demo"
path = "lua_client_demo.rs"

View File

@ -0,0 +1,113 @@
use log::{info, error, debug};
use rhai::Engine;
use rhai_client::{RhaiClient, RhaiClientError}; // RhaiTaskDetails is not directly used
use worker_lib::spawn_rhai_worker;
use std::time::Duration;
use tokio::sync::mpsc;
use serde_json::Value;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let redis_url = "redis://127.0.0.1/";
let circle_name = "reply_demo_circle";
let script_to_run = "let x = 40; x + 2"; // Simple script
info!("Starting Dedicated Reply Queue Demo...");
// 1. Create a Rhai Engine for the worker
let engine = Engine::new();
// 2. Setup shutdown channel for the worker
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
// 3. Spawn the worker
info!("Spawning worker for circle: {}", circle_name);
let worker_handle = spawn_rhai_worker(
0, // circle_id (can be arbitrary for this demo)
circle_name.to_string(),
engine,
redis_url.to_string(),
shutdown_rx,
false, // preserve_tasks
);
// Give the worker a moment to start up and connect (optional, but good for demo)
tokio::time::sleep(Duration::from_millis(500)).await;
// 4. Create RhaiClient
info!("Creating RhaiClient...");
let client = match RhaiClient::new(redis_url) {
Ok(c) => c,
Err(e) => {
error!("Failed to create RhaiClient: {}", e);
// Attempt to shutdown worker before exiting
let _ = shutdown_tx.send(()).await;
let _ = worker_handle.await;
// Explicitly cast the error to the trait object to satisfy the return type
return Err(Box::new(e) as Box<dyn std::error::Error>);
}
};
info!("RhaiClient created.");
// 5. Submit script and await result using the new mechanism
let task_timeout = Duration::from_secs(10);
let client_rpc_id: Option<Value> = Some(serde_json::json!({ "demo_request_id": "reply_queue_test_001" }));
info!("Submitting script to circle '{}' and awaiting result...", circle_name);
info!("Script: {}", script_to_run);
match client
.submit_script_and_await_result(
circle_name,
script_to_run.to_string(),
client_rpc_id,
task_timeout,
// poll_interval is no longer needed
)
.await
{
Ok(details) => {
info!("Task completed successfully!");
debug!("Full Task Details: {:#?}", details);
// The task_id is not part of the returned RhaiTaskDetails struct.
// We could modify the client to return (task_id, details) if needed,
// but for this demo, we'll just log the content of the returned details.
info!("Received details for script: {}", details.script);
info!("Status: {}", details.status);
if let Some(output) = details.output {
info!("Output: {}", output); // Expected: 42
assert_eq!(output, "42");
} else {
error!("Expected output, but got None.");
}
if let Some(error_msg) = details.error {
error!("Error: {}", error_msg);
}
}
Err(e) => {
error!("An error occurred while awaiting task result: {}", e);
// The specific error can be inspected if needed, e.g., for timeout
if let RhaiClientError::Timeout(task_id) = e {
info!("Task {} timed out.", task_id);
}
}
}
// 6. Shutdown the worker
info!("Sending shutdown signal to worker...");
if shutdown_tx.send(()).await.is_err() {
error!("Failed to send shutdown signal to worker. It might have already exited.");
}
info!("Waiting for worker to complete...");
match worker_handle.await {
Ok(Ok(_)) => info!("Worker exited successfully."),
Ok(Err(e)) => error!("Worker exited with an error: {}", e),
Err(e) => error!("Worker task panicked or was cancelled: {}", e),
}
info!("Dedicated Reply Queue Demo finished.");
Ok(())
}

View File

@ -1,8 +1,9 @@
use rhai::Engine; use rhai::Engine;
use rhai_client::RhaiClient; // To submit tasks use rhai_client::RhaiClient; // To submit tasks
use worker_lib::{run_worker_loop, Args as WorkerArgs}; // To run the worker
use std::time::Duration; use std::time::Duration;
use tokio::time::sleep; use tokio::time::sleep;
use worker_lib::spawn_rhai_worker;
// Custom function for Rhai // Custom function for Rhai
fn add(a: i64, b: i64) -> i64 { fn add(a: i64, b: i64) -> i64 {
@ -19,17 +20,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
math_engine.register_fn("add", add); math_engine.register_fn("add", add);
log::info!("Custom 'add' function registered with Rhai engine for Math Worker."); log::info!("Custom 'add' function registered with Rhai engine for Math Worker.");
let worker_args = WorkerArgs { let (shutdown_tx, shutdown_rx) = tokio::sync::mpsc::channel(1);
redis_url: "redis://127.0.0.1/".to_string(),
circles: vec!["math_circle".to_string()], // Worker listens on a specific queue
};
let worker_args_clone = worker_args.clone(); // Clone for the worker task
tokio::spawn(async move { tokio::spawn(async move {
log::info!("Math Worker task starting..."); log::info!("Math Worker task starting...");
if let Err(e) = run_worker_loop(math_engine, worker_args_clone).await { let _worker_handle = spawn_rhai_worker(
log::error!("Math Worker loop failed: {}", e); 0,
} "math_circle".to_string(),
math_engine,
"redis://127.0.0.1/".to_string(),
shutdown_rx,
false,
);
}); });
// Give the worker a moment to start and connect // Give the worker a moment to start and connect
@ -47,14 +48,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
log::info!("Submitting math script to 'math_circle' and awaiting result..."); log::info!("Submitting math script to 'math_circle' and awaiting result...");
let timeout_duration = Duration::from_secs(10); let timeout_duration = Duration::from_secs(10);
let poll_interval = Duration::from_millis(500);
match client.submit_script_and_await_result( match client.submit_script_and_await_result(
"math_circle", "math_circle",
script_content.to_string(), script_content.to_string(),
None, None,
timeout_duration, timeout_duration
poll_interval
).await { ).await {
Ok(details) => { Ok(details) => {
log::info!("Math Worker Example: Task finished. Status: {}, Output: {:?}, Error: {:?}", log::info!("Math Worker Example: Task finished. Status: {}, Output: {:?}, Error: {:?}",

View File

@ -1,8 +1,9 @@
use rhai::Engine; use rhai::Engine;
use rhai_client::RhaiClient; // To submit tasks use rhai_client::RhaiClient; // To submit tasks
use worker_lib::{run_worker_loop, Args as WorkerArgs}; // To run the worker
use std::time::Duration; use std::time::Duration;
use tokio::time::sleep; use tokio::time::sleep;
use worker_lib::spawn_rhai_worker;
// Custom function for Rhai // Custom function for Rhai
fn reverse_string(s: String) -> String { fn reverse_string(s: String) -> String {
@ -19,17 +20,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
string_engine.register_fn("reverse_it", reverse_string); string_engine.register_fn("reverse_it", reverse_string);
log::info!("Custom 'reverse_it' function registered with Rhai engine for String Worker."); log::info!("Custom 'reverse_it' function registered with Rhai engine for String Worker.");
let worker_args = WorkerArgs { let (shutdown_tx, shutdown_rx) = tokio::sync::mpsc::channel(1);
redis_url: "redis://127.0.0.1/".to_string(),
circles: vec!["string_circle".to_string()], // Worker listens on a specific queue
};
let worker_args_clone = worker_args.clone();
tokio::spawn(async move { tokio::spawn(async move {
log::info!("String Worker task starting..."); log::info!("String Worker task starting...");
if let Err(e) = run_worker_loop(string_engine, worker_args_clone).await { let _worker_handle = spawn_rhai_worker(
log::error!("String Worker loop failed: {}", e); 0,
} "string_circle".to_string(),
string_engine,
"redis://127.0.0.1/".to_string(),
shutdown_rx,
false,
);
}); });
// Give the worker a moment to start and connect // Give the worker a moment to start and connect
@ -47,20 +48,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
log::info!("Submitting string script to 'string_circle' and awaiting result..."); log::info!("Submitting string script to 'string_circle' and awaiting result...");
let timeout_duration = Duration::from_secs(10); let timeout_duration = Duration::from_secs(10);
let poll_interval = Duration::from_millis(500);
match client.submit_script_and_await_result( match client.submit_script_and_await_result(
"string_circle", "string_circle",
script_content.to_string(), script_content.to_string(),
None, None,
timeout_duration, timeout_duration
poll_interval
).await { ).await {
Ok(details) => { Ok(details) => {
log::info!("String Worker Example: Task finished. Status: {}, Output: {:?}, Error: {:?}", log::info!("String Worker Example: Task finished. Status: {}, Output: {:?}, Error: {:?}",
details.status, details.output, details.error); details.status, details.output, details.error);
if details.status == "completed" { if details.status == "completed" {
assert_eq!(details.output, Some("\"dlrow olleh\"".to_string())); // Rhai strings include quotes in `debug` format assert_eq!(details.output, Some("dlrow olleh".to_string()));
log::info!("String Worker Example: Assertion for output \"dlrow olleh\" passed!"); log::info!("String Worker Example: Assertion for output \"dlrow olleh\" passed!");
Ok(()) Ok(())
} else { } else {

View File

@ -0,0 +1,52 @@
use worker_lib::spawn_rhai_worker;
use rhai::Engine;
use tokio::sync::mpsc;
use tokio::signal;
use log::info;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize the logger
env_logger::init();
let redis_url = "redis://127.0.0.1/";
let circle_name = "default".to_string();
let mut engine = Engine::new(); // Create a new, simple Rhai engine
// Register a simple 'ping' function for the readiness check.
engine.register_fn("ping", || -> String {
"pong".to_string()
});
// Create a channel for the shutdown signal
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
info!("Spawning Rhai worker for circle: {}", circle_name);
// Spawn the worker
let worker_handle = spawn_rhai_worker(
1, // circle_id
circle_name.clone(),
engine,
redis_url.to_string(),
shutdown_rx,
false, // preserve_tasks
);
info!("Worker spawned. Press Ctrl+C to shut down.");
// Wait for Ctrl+C
signal::ctrl_c().await?;
info!("Ctrl+C received. Sending shutdown signal to worker.");
let _ = shutdown_tx.send(()).await;
// Wait for the worker to finish
if let Err(e) = worker_handle.await? {
eprintln!("Worker process finished with an error: {:?}", e);
}
info!("Worker has shut down gracefully.");
Ok(())
}

View File

@ -0,0 +1,133 @@
//! Demo script showing how to run the hybrid performance benchmark
//!
//! This example demonstrates:
//! 1. Starting workers programmatically
//! 2. Running the Lua batch script
//! 3. Collecting and displaying statistics
use rhailib::{RedisStatsCollector, WorkerManager, clear_redis_test_data, check_redis_connection};
use redis::{Client, Commands};
use std::fs;
use std::time::Duration;
const REDIS_URL: &str = "redis://localhost:6379";
const CIRCLE_NAME: &str = "demo_circle";
fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
println!("🚀 Rhailib Hybrid Performance Benchmark Demo");
println!("============================================");
// Check Redis connection
println!("📡 Checking Redis connection...");
check_redis_connection(REDIS_URL)?;
println!("✅ Redis connection successful");
// Clear any existing test data
println!("🧹 Clearing existing test data...");
clear_redis_test_data(REDIS_URL)?;
println!("✅ Test data cleared");
// Load Lua script
println!("📜 Loading Lua batch script...");
let lua_script = fs::read_to_string("scripts/run_rhai_batch.lua")?;
println!("✅ Lua script loaded ({} bytes)", lua_script.len());
// Start workers
println!("👷 Starting 2 worker processes...");
let mut worker_manager = WorkerManager::new();
worker_manager.start_workers(2, CIRCLE_NAME, REDIS_URL)?;
worker_manager.wait_for_workers_ready(Duration::from_secs(3))?;
println!("✅ Workers started and ready");
// Connect to Redis
let redis_client = Client::open(REDIS_URL)?;
let mut conn = redis_client.get_connection()?;
// Execute batch workload
println!("🎯 Submitting batch of 100 tasks...");
let batch_id = format!("demo_batch_{}", chrono::Utc::now().timestamp_millis());
let simple_script = "let x = 42; x * 2";
let start_time = std::time::Instant::now();
let result: redis::Value = redis::cmd("EVAL")
.arg(&lua_script)
.arg(0) // No keys
.arg(CIRCLE_NAME)
.arg(100) // task count
.arg(simple_script)
.arg(&batch_id)
.query(&mut conn)?;
let submission_time = start_time.elapsed();
println!("✅ Batch submitted in {:?}", submission_time);
// Parse result
if let redis::Value::Data(data) = result {
let response: serde_json::Value = serde_json::from_slice(&data)?;
println!("📊 Batch info: {}", serde_json::to_string_pretty(&response)?);
}
// Wait for completion and collect statistics
println!("⏳ Waiting for batch completion...");
let stats_collector = RedisStatsCollector::new(REDIS_URL)?;
let completed = stats_collector.wait_for_batch_completion(
&batch_id,
100,
Duration::from_secs(30),
)?;
if !completed {
println!("⚠️ Batch did not complete within timeout");
return Ok(());
}
println!("✅ Batch completed!");
// Collect and display statistics
println!("📈 Collecting performance statistics...");
let timings = stats_collector.collect_batch_timings(&batch_id)?;
let stats = stats_collector.calculate_stats(&timings);
println!("\n📊 PERFORMANCE RESULTS");
println!("======================");
println!("Total tasks: {}", stats.total_tasks);
println!("Completed tasks: {}", stats.completed_tasks);
println!("Failed tasks: {}", stats.failed_tasks);
println!("Error rate: {:.2}%", stats.error_rate);
println!("Throughput: {:.2} tasks/second", stats.throughput_tps);
println!("Batch duration: {:.2} ms", stats.batch_duration_ms);
println!("\nLatency Statistics:");
println!(" Min: {:.2} ms", stats.latency_stats.min_ms);
println!(" Max: {:.2} ms", stats.latency_stats.max_ms);
println!(" Mean: {:.2} ms", stats.latency_stats.mean_ms);
println!(" Median: {:.2} ms", stats.latency_stats.median_ms);
println!(" P95: {:.2} ms", stats.latency_stats.p95_ms);
println!(" P99: {:.2} ms", stats.latency_stats.p99_ms);
println!(" Std Dev: {:.2} ms", stats.latency_stats.std_dev_ms);
// Show some individual task timings
println!("\n🔍 Sample Task Timings (first 10):");
for (i, timing) in timings.iter().take(10).enumerate() {
println!(" Task {}: {} -> {} ({:.2}ms, status: {})",
i + 1,
timing.task_id,
timing.status,
timing.latency_ms,
timing.status
);
}
// Cleanup
println!("\n🧹 Cleaning up...");
stats_collector.cleanup_batch_data(&batch_id)?;
worker_manager.shutdown()?;
println!("✅ Cleanup complete");
println!("\n🎉 Demo completed successfully!");
Ok(())
}

63
examples/run_lua_demo.sh Executable file
View File

@ -0,0 +1,63 @@
#!/bin/bash
# Exit immediately if a command exits with a non-zero status.
set -e
CIRCLE_NAME="default"
TASK_QUEUE_KEY="rhai_tasks:${CIRCLE_NAME}"
# --- 1. Clean Up Previous State ---
echo "Cleaning up previous Redis state..."
redis-cli DEL "$TASK_QUEUE_KEY" > /dev/null 2>&1 || true
echo "Redis state cleaned."
# --- 2. Compile and Run the Worker ---
echo "Compiling and running the 'lua_client_demo' worker in the background..."
export RUST_LOG=info
cargo run -p rhailib-examples --bin lua_client_demo &
WORKER_PID=$!
echo "Worker started with PID: $WORKER_PID"
# --- 3. Wait for the Worker to be Ready using Ping-Pong ---
echo "Waiting for worker to be ready (ping-pong check)..."
ATTEMPTS=0
MAX_ATTEMPTS=15 # Wait for a maximum of 15 seconds
while [ $ATTEMPTS -lt $MAX_ATTEMPTS ]; do
# Send a 'ping()' script and check for a 'pong' in the output.
# The timeout for the ping itself is short (2 seconds).
PING_RESULT=$(redis-cli EVAL "$(cat ../scripts/run_rhai.lua)" 0 "$CIRCLE_NAME" "ping()" 2 || true)
if echo "$PING_RESULT" | grep -q "pong"; then
echo "Worker is ready. Received pong."
break
fi
echo "Ping failed or timed out. Retrying..."
sleep 1
ATTEMPTS=$((ATTEMPTS + 1))
done
if [ $ATTEMPTS -eq $MAX_ATTEMPTS ]; then
echo "Error: Timed out waiting for worker to respond to ping."
kill $WORKER_PID
exit 1
fi
# --- 4. Execute the Actual Script ---
echo ""
echo "Executing main Rhai script..."
RESULT=$(redis-cli EVAL "$(cat ../scripts/run_rhai.lua)" 0 "$CIRCLE_NAME" "let x = 100; x * 2" 5)
echo "Result from main script: $RESULT"
# --- 5. Shutdown the Worker ---
echo ""
echo "Shutting down the worker (PID: $WORKER_PID)..."
kill $WORKER_PID
wait $WORKER_PID
echo "Worker shut down."
echo ""
if echo "$RESULT" | grep -q "error"; then
echo "Demo completed with an error."
exit 1
else
echo "Demo completed successfully."
fi

117
run_benchmarks.sh Executable file
View File

@ -0,0 +1,117 @@
#!/bin/bash
# Hybrid Performance Benchmark Runner for Rhailib
# This script sets up the environment and runs the benchmarks
set -e
echo "🚀 Rhailib Hybrid Performance Benchmark Runner"
echo "=============================================="
# Check if Redis is running
echo "📡 Checking Redis connection..."
if ! redis-cli ping > /dev/null 2>&1; then
echo "❌ Redis is not running. Please start Redis server:"
echo " redis-server"
exit 1
fi
echo "✅ Redis is running"
# Check if we're in the right directory
if [ ! -f "Cargo.toml" ] || [ ! -d "scripts" ]; then
echo "❌ Please run this script from the rhailib root directory"
exit 1
fi
# Build the worker binary in release mode for performance
echo "🔨 Building worker binary in release mode..."
cd src/worker
cargo build --release --bin worker
cd ../..
echo "✅ Worker binary built (release mode)"
# Clear any existing Redis data
echo "🧹 Clearing Redis test data..."
redis-cli FLUSHDB > /dev/null
echo "✅ Redis data cleared"
# Parse command line arguments
BENCHMARK_TYPE="full"
TASK_COUNT=""
WORKER_COUNT=""
while [[ $# -gt 0 ]]; do
case $1 in
--demo)
BENCHMARK_TYPE="demo"
shift
;;
--quick)
BENCHMARK_TYPE="quick"
shift
;;
--tasks)
TASK_COUNT="$2"
shift 2
;;
--workers)
WORKER_COUNT="$2"
shift 2
;;
--help|-h)
echo "Usage: $0 [OPTIONS]"
echo ""
echo "Options:"
echo " --demo Run demo script instead of full benchmarks"
echo " --quick Run quick benchmarks (fewer configurations)"
echo " --tasks N Override task count for demo"
echo " --workers N Override worker count for demo"
echo " --help, -h Show this help message"
echo ""
echo "Examples:"
echo " $0 # Run full benchmarks"
echo " $0 --demo # Run demo script"
echo " $0 --quick # Run quick benchmarks"
echo " $0 --demo --tasks 50 --workers 4 # Custom demo"
exit 0
;;
*)
echo "❌ Unknown option: $1"
echo "Use --help for usage information"
exit 1
;;
esac
done
case $BENCHMARK_TYPE in
"demo")
echo "🎯 Running benchmark demo..."
if [ -n "$TASK_COUNT" ] || [ -n "$WORKER_COUNT" ]; then
echo "⚠️ Custom task/worker counts not yet supported in demo"
echo " Using default values (100 tasks, 2 workers)"
fi
cargo run --example run_benchmark_demo
;;
"quick")
echo "⚡ Running quick benchmarks..."
echo " This will test basic configurations only"
cargo bench --bench rhai_performance_bench -- --quick
;;
"full")
echo "🏁 Running full benchmark suite..."
echo " This may take several minutes..."
cargo bench --bench rhai_performance_bench
echo ""
echo "📊 Benchmark results saved to: target/criterion/"
echo " Open target/criterion/report/index.html to view detailed results"
;;
esac
echo ""
echo "🎉 Benchmark run completed!"
echo ""
echo "📈 Next steps:"
echo " • View HTML reports in target/criterion/report/"
echo " • Run 'cargo bench' for full Criterion benchmarks"
echo " • Run '$0 --demo' for a quick demonstration"
echo " • Check BENCHMARK_README.md for detailed documentation"

48
scripts/run_rhai.lua Normal file
View File

@ -0,0 +1,48 @@
--[[
Submits a Rhai script to a distributed worker and returns the task ID.
Since BLPOP cannot block inside Lua scripts (they execute atomically),
this script only submits the task and returns the task ID. The client
must then use BLPOP separately to wait for the result.
ARGV[1] (string): The target circle name (e.g., "default").
ARGV[2] (string): The Rhai script content to execute.
Returns:
- A JSON string containing the task ID and reply queue name.
]]
-- 1. Argument Validation
local circle_name = ARGV[1]
if not circle_name or circle_name == '' then
return cjson.encode({error = "ARGV[1] 'circle_name' is required."})
end
local rhai_script = ARGV[2]
if not rhai_script or rhai_script == '' then
return cjson.encode({error = "ARGV[2] 'rhai_script' is required."})
end
-- 2. Initialization
local task_details_prefix = "rhai_task_details:"
local tasks_queue_prefix = "rhai_tasks:"
local reply_queue_prefix = "rhai_reply:"
local task_id = redis.sha1hex(rhai_script .. redis.call('TIME')[1] .. redis.call('TIME')[2] .. math.random())
local reply_queue_key = reply_queue_prefix .. task_id
local task_details_key = task_details_prefix .. task_id
local task_queue_key = tasks_queue_prefix .. circle_name
-- 3. Task Creation & Queuing
redis.call('HSET', task_details_key,
'script', rhai_script,
'status', 'pending',
'replyToQueue', reply_queue_key
)
redis.call('LPUSH', task_queue_key, task_id)
-- 4. Return task information for client to wait on
return cjson.encode({
task_id = task_id,
reply_queue = reply_queue_key
})

View File

@ -0,0 +1,54 @@
-- Simple Batch Task Creation Script for Rhailib Benchmarking
-- Creates N tasks and returns their Redis keys for timing analysis
-- Script Arguments (ARGV):
-- ARGV[1]: circle_name - The worker circle to send tasks to (e.g., "bench_circle")
-- ARGV[2]: task_count - Number of tasks to create (N)
-- ARGV[3]: rhai_script_content - The Rhai script to execute for all tasks
-- ARGV[4]: batch_id - Batch identifier for grouping
-- Validate arguments
if #ARGV < 4 then
return redis.error_reply("Usage: EVAL script 0 circle_name task_count rhai_script_content batch_id")
end
local circle_name = ARGV[1]
local task_count = tonumber(ARGV[2])
local rhai_script_content = ARGV[3]
local batch_id = ARGV[4]
-- Validate task_count
if not task_count or task_count <= 0 or task_count > 10000 then
return redis.error_reply("task_count must be a positive integer between 1 and 10000")
end
-- Get current timestamp
local current_time = redis.call('TIME')[1]
local rhai_task_queue = 'rhai_tasks:' .. circle_name
local task_keys = {}
-- Create tasks and collect their keys
for i = 1, task_count do
-- Generate unique task ID
local task_id = 'task_' .. redis.call('INCR', 'global_task_counter')
local task_details_key = 'rhai_task_details:' .. task_id
-- Create task details hash with creation timestamp
redis.call('HSET', task_details_key,
'script', rhai_script_content,
'status', 'pending',
'batch_id', batch_id,
'createdAt', current_time,
'updatedAt', current_time,
'task_sequence', tostring(i)
)
-- Queue the task for workers
redis.call('LPUSH', rhai_task_queue, task_id)
-- Add key to return array
table.insert(task_keys, task_details_key)
end
-- Return array of task keys for benchmarking tool to analyze
return task_keys

BIN
src/.DS_Store vendored Normal file

Binary file not shown.

View File

@ -1,14 +1,14 @@
use chrono::Utc; use chrono::Utc;
use log::{debug, info, warn, error}; // Added error use log::{debug, info, warn, error}; // Added error
use redis::AsyncCommands; use redis::AsyncCommands;
use tokio::time::{sleep, Instant}; // For polling with timeout use std::time::Duration; // Duration is still used, Instant and sleep were removed
use std::time::Duration;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; // For client_rpc_id, though not directly used by this client's submit method use serde_json::Value; // For client_rpc_id, though not directly used by this client's submit method
use uuid::Uuid; use uuid::Uuid;
const REDIS_TASK_DETAILS_PREFIX: &str = "rhai_task_details:"; const REDIS_TASK_DETAILS_PREFIX: &str = "rhai_task_details:";
const REDIS_QUEUE_PREFIX: &str = "rhai_tasks:"; const REDIS_QUEUE_PREFIX: &str = "rhai_tasks:";
const REDIS_REPLY_QUEUE_PREFIX: &str = "rhai_reply:";
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub struct RhaiTaskDetails { pub struct RhaiTaskDetails {
@ -22,6 +22,8 @@ pub struct RhaiTaskDetails {
pub created_at: chrono::DateTime<chrono::Utc>, pub created_at: chrono::DateTime<chrono::Utc>,
#[serde(rename = "updatedAt")] #[serde(rename = "updatedAt")]
pub updated_at: chrono::DateTime<chrono::Utc>, pub updated_at: chrono::DateTime<chrono::Utc>,
#[serde(rename = "replyToQueue")]
pub reply_to_queue: Option<String>, // New field for dedicated reply queue
} }
#[derive(Debug)] #[derive(Debug)]
@ -67,17 +69,17 @@ impl RhaiClient {
Ok(Self { redis_client: client }) Ok(Self { redis_client: client })
} }
pub async fn submit_script( // Internal helper to submit script details and push to work queue
async fn submit_script_to_worker_queue(
&self, &self,
conn: &mut redis::aio::MultiplexedConnection,
circle_name: &str, circle_name: &str,
task_id: &str,
script: String, script: String,
client_rpc_id: Option<Value>, // Optional: if the caller has an RPC ID to associate client_rpc_id: Option<Value>,
) -> Result<String, RhaiClientError> { reply_to_queue_name: Option<String>, // Made this an Option
let mut conn = self.redis_client.get_multiplexed_async_connection().await?; ) -> Result<(), RhaiClientError> {
let task_id = Uuid::new_v4().to_string();
let now = Utc::now(); let now = Utc::now();
let task_details = RhaiTaskDetails { let task_details = RhaiTaskDetails {
script, script,
status: "pending".to_string(), status: "pending".to_string(),
@ -86,32 +88,71 @@ impl RhaiClient {
error: None, error: None,
created_at: now, created_at: now,
updated_at: now, updated_at: now,
reply_to_queue: reply_to_queue_name.clone(),
}; };
let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id); let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id);
let queue_key = format!("{}{}", REDIS_QUEUE_PREFIX, circle_name.replace(" ", "_").to_lowercase()); let worker_queue_key = format!("{}{}", REDIS_QUEUE_PREFIX, circle_name.replace(" ", "_").to_lowercase());
debug!( debug!(
"Submitting task_id: {} for circle: {} to queue: {}. Details: {:?}", "Preparing task_id: {} for circle: {} to worker_queue: {}. Details: {:?}",
task_id, circle_name, queue_key, task_details task_id, circle_name, worker_queue_key, task_details
); );
// Using HSET_MULTIPLE for efficiency if redis-rs supports it directly for struct fields. let mut hset_args: Vec<(String, String)> = vec![
// Otherwise, individual HSETs are fine. ("script".to_string(), task_details.script.clone()),
// For simplicity and directness with redis-rs async, individual HSETs are used here. ("status".to_string(), task_details.status.clone()),
conn.hset::<_, _, _, ()>(&task_key, "script", &task_details.script).await?; ("createdAt".to_string(), task_details.created_at.to_rfc3339()),
conn.hset::<_, _, _, ()>(&task_key, "status", &task_details.status).await?; ("updatedAt".to_string(), task_details.updated_at.to_rfc3339()),
if let Some(rpc_id_val) = &task_details.client_rpc_id { ];
conn.hset::<_, _, _, ()>(&task_key, "clientRpcId", serde_json::to_string(rpc_id_val)?).await?;
} else {
// Ensure the field exists even if null, or decide if it should be omitted
conn.hset::<_, _, _, ()>(&task_key, "clientRpcId", Value::Null.to_string()).await?;
}
conn.hset::<_, _, _, ()>(&task_key, "createdAt", task_details.created_at.to_rfc3339()).await?;
conn.hset::<_, _, _, ()>(&task_key, "updatedAt", task_details.updated_at.to_rfc3339()).await?;
// output and error fields are initially None, so they might not be set here or set as empty strings/null
conn.lpush::<_, _, ()>(&queue_key, &task_id).await?; if let Some(rpc_id_val) = &task_details.client_rpc_id {
hset_args.push(("clientRpcId".to_string(), serde_json::to_string(rpc_id_val)?));
} else {
hset_args.push(("clientRpcId".to_string(), Value::Null.to_string()));
}
if let Some(reply_q) = &task_details.reply_to_queue {
hset_args.push(("replyToQueue".to_string(), reply_q.clone()));
}
// Ensure hset_args is a slice of tuples (String, String)
// The redis crate's hset_multiple expects &[(K, V)]
// conn.hset_multiple::<_, String, String, ()>(&task_key, &hset_args).await?;
// Simpler:
// Explicitly type K, F, V for hset_multiple if inference is problematic.
// RV (return value of the command itself) is typically () for HSET type commands.
conn.hset_multiple::<_, _, _, ()>(&task_key, &hset_args).await?;
// lpush also infers its types, RV is typically i64 (length of list) or () depending on exact command variant
// For `redis::AsyncCommands::lpush`, it's `RedisResult<R>` where R: FromRedisValue
// Often this is the length of the list. Let's allow inference or specify if needed.
let _: redis::RedisResult<i64> = conn.lpush(&worker_queue_key, task_id).await;
Ok(())
}
// Public method for fire-and-forget submission (doesn't wait for result)
pub async fn submit_script(
&self,
circle_name: &str,
script: String,
client_rpc_id: Option<Value>,
) -> Result<String, RhaiClientError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
let task_id = Uuid::new_v4().to_string();
self.submit_script_to_worker_queue(
&mut conn,
circle_name,
&task_id,
script,
client_rpc_id,
None, // No reply queue for fire-and-forget
)
.await?;
Ok(task_id) Ok(task_id)
} }
@ -144,6 +185,7 @@ impl RhaiClient {
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&Utc)) .map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(Utc::now), // Provide a default .unwrap_or_else(Utc::now), // Provide a default
reply_to_queue: map.get("replyToQueue").cloned(),
}; };
Ok(Some(details)) Ok(Some(details))
} }
@ -151,51 +193,75 @@ impl RhaiClient {
} }
} }
// New method using dedicated reply queue
pub async fn submit_script_and_await_result( pub async fn submit_script_and_await_result(
&self, &self,
circle_name: &str, circle_name: &str,
script: String, script: String,
client_rpc_id: Option<Value>, client_rpc_id: Option<Value>,
timeout: Duration, timeout: Duration,
poll_interval: Duration,
) -> Result<RhaiTaskDetails, RhaiClientError> { ) -> Result<RhaiTaskDetails, RhaiClientError> {
let task_id = self.submit_script(circle_name, script, client_rpc_id).await?; let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
info!("Task {} submitted. Polling for result with timeout {:?}...", task_id, timeout); let task_id = Uuid::new_v4().to_string();
let reply_to_queue_name = format!("{}{}", REDIS_REPLY_QUEUE_PREFIX, Uuid::new_v4().to_string());
let start_time = Instant::now(); self.submit_script_to_worker_queue(
loop { &mut conn,
if start_time.elapsed() > timeout { circle_name,
warn!("Timeout waiting for task {}", task_id); &task_id,
return Err(RhaiClientError::Timeout(task_id.clone())); script,
} client_rpc_id,
Some(reply_to_queue_name.clone()),
)
.await?;
match self.get_task_status(&task_id).await { info!(
Ok(Some(details)) => { "Task {} submitted. Waiting for result on queue {} with timeout {:?}...",
debug!("Polled task {}: status = {}", task_id, details.status); task_id, reply_to_queue_name, timeout
if details.status == "completed" || details.status == "error" { );
// BLPOP on the reply queue
// The timeout for BLPOP is in seconds (integer)
let blpop_timeout_secs = timeout.as_secs().max(1); // Ensure at least 1 second for BLPOP timeout
match conn.blpop::<&String, Option<(String, String)>>(&reply_to_queue_name, blpop_timeout_secs as f64).await {
Ok(Some((_queue, result_message_str))) => {
// Attempt to deserialize the result message into RhaiTaskDetails or a similar structure
// For now, we assume the worker sends back a JSON string of RhaiTaskDetails
// or at least status, output, error.
// Let's refine what the worker sends. For now, assume it's a simplified result.
// The worker should ideally send a JSON string that can be parsed into RhaiTaskDetails.
// For this example, let's assume the worker sends a JSON string of a simplified result structure.
// A more robust approach would be for the worker to send the full RhaiTaskDetails (or relevant parts)
// and the client deserializes that.
// For now, let's assume the worker sends a JSON string of RhaiTaskDetails.
match serde_json::from_str::<RhaiTaskDetails>(&result_message_str) {
Ok(details) => {
info!("Task {} finished with status: {}", task_id, details.status); info!("Task {} finished with status: {}", task_id, details.status);
return Ok(details); // Optionally, delete the reply queue
} let _: redis::RedisResult<i32> = conn.del(&reply_to_queue_name).await;
// else status is "pending" or "processing", continue polling Ok(details)
}
Ok(None) => {
// This case should ideally not happen if submit_script succeeded and worker is running,
// unless the task details were manually deleted from Redis.
warn!("Task {} not found during polling. This might indicate an issue.", task_id);
// Depending on desired robustness, could retry a few times or return an error immediately.
// For now, let it continue polling up to timeout, or return a specific error.
// If it persists, it's effectively a timeout or a lost task.
// Let's consider it a lost task if it's not found after a short while post-submission.
if start_time.elapsed() > Duration::from_secs(5) { // Arbitrary short duration
return Err(RhaiClientError::TaskNotFound(task_id.clone()));
}
} }
Err(e) => { Err(e) => {
// Log error but continue polling unless it's a critical Redis error error!("Task {}: Failed to deserialize result message from reply queue: {}. Message: {}", task_id, e, result_message_str);
error!("Error polling task {}: {}. Will retry.", task_id, e); // Optionally, delete the reply queue
let _: redis::RedisResult<i32> = conn.del(&reply_to_queue_name).await;
Err(RhaiClientError::SerializationError(e))
} }
} }
sleep(poll_interval).await; }
Ok(None) => { // BLPOP timed out
warn!("Timeout waiting for result on reply queue {} for task {}", reply_to_queue_name, task_id);
// Optionally, delete the reply queue
let _: redis::RedisResult<i32> = conn.del(&reply_to_queue_name).await;
Err(RhaiClientError::Timeout(task_id))
}
Err(e) => { // Redis error
error!("Redis error on BLPOP for reply queue {}: {}", reply_to_queue_name, e);
// Optionally, delete the reply queue
let _: redis::RedisResult<i32> = conn.del(&reply_to_queue_name).await;
Err(RhaiClientError::RedisError(e))
}
} }
} }
} }

View File

@ -22,16 +22,17 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
register_timestamp_helpers(&mut engine); register_timestamp_helpers(&mut engine);
// Get the path to the script // Get the path to the script
let script_path = Path::new(file!()) let manifest_dir = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
.parent() let script_path = manifest_dir
.unwrap() .join("examples")
.join("calendar")
.join("calendar_script.rhai"); .join("calendar_script.rhai");
println!("\nRunning script: {}", script_path.display()); println!("\nRunning script: {}", script_path.display());
println!("---------------------"); println!("---------------------");
// Run the script // Run the script
match eval_file(&engine, &script_path.to_string_lossy()) { match eval_file(&engine, &script_path) {
Ok(result) => { Ok(result) => {
if !result.is_unit() { if !result.is_unit() {
println!("\nScript returned: {:?}", result); println!("\nScript returned: {:?}", result);

View File

@ -1,8 +1,10 @@
use rhai::{Engine, AST, Scope}; use rhai::{Engine, AST, Scope, EvalAltResult}; // Added EvalAltResult
use std::sync::Arc; use std::sync::Arc;
use std::sync::Mutex; // use std::sync::Mutex; // Unused
use std::collections::HashMap; // use std::collections::HashMap; // Unused
use heromodels::db::hero::OurDB; use heromodels::db::hero::OurDB;
use std::fs; // For file operations
use std::path::Path; // For path handling
// Export the mock database module // Export the mock database module
pub mod mock_db; pub mod mock_db;
@ -27,6 +29,8 @@ pub fn register_all_modules(engine: &mut Engine, db: Arc<OurDB>) {
// Register the calendar module if the feature is enabled // Register the calendar module if the feature is enabled
#[cfg(feature = "calendar")] #[cfg(feature = "calendar")]
heromodels::models::calendar::register_calendar_rhai_module(engine, db.clone()); heromodels::models::calendar::register_calendar_rhai_module(engine, db.clone());
heromodels::models::library::register_library_rhai_module(engine, db.clone());
heromodels::models::circle::register_circle_rhai_module(engine, db.clone());
// Register the flow module if the feature is enabled // Register the flow module if the feature is enabled
#[cfg(feature = "flow")] #[cfg(feature = "flow")]
@ -56,6 +60,21 @@ pub fn eval_script(engine: &Engine, script: &str) -> Result<rhai::Dynamic, Box<r
engine.eval::<rhai::Dynamic>(script) engine.eval::<rhai::Dynamic>(script)
} }
/// Evaluate a Rhai script file
pub fn eval_file(engine: &Engine, file_path: &Path) -> Result<rhai::Dynamic, Box<rhai::EvalAltResult>> {
match fs::read_to_string(file_path) {
Ok(script_content) => {
engine.eval::<rhai::Dynamic>(&script_content)
}
Err(io_err) => {
Err(Box::new(EvalAltResult::ErrorSystem(
format!("Failed to read script file: {}", file_path.display()),
Box::new(io_err),
)))
}
}
}
/// Compile a Rhai script to AST for repeated execution /// Compile a Rhai script to AST for repeated execution
pub fn compile_script(engine: &Engine, script: &str) -> Result<AST, Box<rhai::EvalAltResult>> { pub fn compile_script(engine: &Engine, script: &str) -> Result<AST, Box<rhai::EvalAltResult>> {
Ok(engine.compile(script)?) Ok(engine.compile(script)?)

8
src/lib.rs Normal file
View File

@ -0,0 +1,8 @@
//! Rhailib - Distributed Rhai Scripting Library
//!
//! This library provides infrastructure for executing Rhai scripts in a distributed
//! manner using Redis as a message broker and task queue.
// Re-export commonly used types
pub use redis;
pub use serde_json;

23
src/monitor/Cargo.toml Normal file
View File

@ -0,0 +1,23 @@
[package]
name = "monitor"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0"
clap = { version = "4.4", features = ["derive"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread", "signal", "time"] } # time feature might be needed later
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["fmt"] }
redis = { version = "0.25.0", features = ["tokio-comp"] } # For Redis communication
prettytable-rs = "0.10.0" # For displaying tasks in a formatted table
clearscreen = "2.0.1" # For clearing the terminal screen
chrono = { version = "0.4", features = ["serde"] } # For timestamps
futures = "0.3"
# If the monitor library needs to use parts of rhailib (e.g. Redis connections, task definitions):
# rhailib = { path = ".." } # Assuming monitor is a direct sub-directory of rhailib workspace member
[[bin]]
name = "monitor"
path = "src/main.rs"

67
src/monitor/README.md Normal file
View File

@ -0,0 +1,67 @@
# Rhai Worker Monitor (`monitor`)
`monitor` is a command-line tool designed to observe and display live information about Rhai workers managed by `rhailib`. It provides insights into Redis queue congestion and a table of tasks being processed by specified workers.
## Features (Planned)
* **Live Redis Queue Visualization**: Displays a textual, horizontal plot showing the number of tasks in the Redis queue for each monitored worker. The plot will be color-coded to indicate congestion levels and will update by polling the queue size.
* **Task Table**: Shows a table of tasks associated with each worker, including task hash, creation date, status (e.g., pending, running, completed, failed), and potentially other details.
## Prerequisites
* Rust and Cargo installed.
* Access to the Redis instance used by the Rhai workers.
## Building
Navigate to the `rhailib/monitor` crate's root directory and build the project:
```bash
cargo build
```
## Usage
To run the monitor, you need to specify which worker queues you want to observe using the `--workers` (or `-w`) flag. Provide a comma-separated list of worker names.
From the `rhailib/monitor` root directory:
```bash
cargo run -- --workers <worker_name_1>[,<worker_name_2>,...]
```
Or from the parent `rhailib` directory (workspace root):
```bash
cargo run -p monitor -- --workers <worker_name_1>[,<worker_name_2>,...]
```
**Examples:**
* Monitor a single worker named `my_default_worker` (from `rhailib/monitor`):
```bash
cargo run -- --workers my_default_worker
```
* Monitor multiple workers, `image_processing_worker` and `data_analysis_worker` (from `rhailib` workspace root):
```bash
cargo run -p monitor -- --workers image_processing_worker,data_analysis_worker
```
### Command-Line Options
* `-w, --workers <WORKERS>`: (Required) A comma-separated list of worker names to monitor.
(Future options might include Redis connection parameters, polling intervals, etc.)
## Development
The core logic for the monitor is located in `rhailib/monitor/src/`:
* `lib.rs`: Main library file, defines modules.
* `cli_logic.rs`: Handles argument parsing, Redis interaction, and orchestrates the display.
* `plot.rs`: Responsible for generating the textual queue visualization.
* `tasks.rs`: Responsible for fetching and displaying the task table.
The binary entry point is `rhailib/monitor/src/main.rs`.

37
src/monitor/cmd/main.rs Normal file
View File

@ -0,0 +1,37 @@
// File: /Users/timurgordon/code/git.ourworld.tf/herocode/rhailib/src/monitor/cmd/main.rs
use anyhow::Result;
use clap::Parser;
// This assumes that `rhailib/src/lib.rs` will have `pub mod monitor;`
// and `rhailib/src/monitor/mod.rs` will have `pub mod cli_logic;`
// and `cli_logic.rs` will contain `pub async fn start_monitoring`.
// The `crate::` prefix refers to the `rhailib` crate root.
#[derive(Parser, Debug)]
#[clap(author, version, about = "Rhai Worker Live Monitor", long_about = None)]
struct Args {
/// Comma-separated list of worker names to monitor
#[clap(short, long, value_delimiter = ',')]
workers: Vec<String>,
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let args = Args::parse();
if args.workers.is_empty() {
eprintln!("Error: At least one worker name must be provided via --workers.");
// Consider returning an Err or using clap's built-in required attributes.
std::process::exit(1);
}
tracing::info!("Monitor CLI starting for workers: {:?}", args.workers);
// Call the monitoring logic from the `cli_logic` submodule within the `monitor` module
crate::monitor::cli_logic::start_monitoring(&args.workers).await?;
tracing::info!("Monitor CLI finished.");
Ok(())
}

View File

@ -0,0 +1,101 @@
// rhailib/monitor/src/cli_logic.rs
use anyhow::Result;
use futures::stream::{self, StreamExt};
// Import functions from sibling modules within the same crate
use crate::plot;
use crate::tasks::{self, RhaiTask};
use redis::{AsyncCommands, Client as RedisClient};
use std::collections::HashMap;
use tokio::time::{sleep, Duration};
use tokio::signal;
const REDIS_URL: &str = "redis://127.0.0.1/";
const POLLING_INTERVAL_MILLISECONDS: u64 = 10; // Increased polling interval for SCAN
const SCAN_COUNT: isize = 100; // Number of keys to fetch per SCAN iteration
/// Main monitoring logic.
pub async fn start_monitoring(worker_names: &[String]) -> Result<()> {
tracing::info!("Attempting to connect to Redis at {}", REDIS_URL);
let client = RedisClient::open(REDIS_URL)?;
let mut con = client.get_multiplexed_async_connection().await?;
tracing::info!("Successfully connected to Redis.");
let ping_result: String = redis::cmd("PING").query_async(&mut con).await?;
tracing::info!("Redis PING response: {}", ping_result);
tracing::info!("Starting live monitor. Configured workers: {:?}. Press Ctrl+C to exit.", worker_names);
loop {
tokio::select! {
_ = signal::ctrl_c() => {
print!("\r");
println!("Exiting Rhai Worker Monitor...");
break;
}
_ = async {
let mut current_con = con.clone(); // Clone for this iteration
clearscreen::clear().unwrap_or_else(|e| tracing::warn!("Failed to clear screen: {}", e));
println!("Rhai Worker Monitor (Press Ctrl+C to exit)");
println!(
"Polling Redis every {}ms. Last update: {}. Configured workers: {:?}",
POLLING_INTERVAL_MILLISECONDS,
chrono::Local::now().format("%Y-%m-%d %H:%M:%S"),
worker_names
);
let mut all_rhai_tasks: Vec<RhaiTask> = Vec::new();
let mut cursor: isize = 0;
loop {
// SCAN returns a tuple: (new_cursor, keys_array)
let (new_cursor, task_detail_keys): (isize, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg("rhai_task_details:*")
.arg("COUNT")
.arg(SCAN_COUNT)
.query_async(&mut current_con)
.await?;
// Process keys found in this scan iteration
let tasks_futures = stream::iter(task_detail_keys)
.map(|key_with_prefix| {
let mut task_con = current_con.clone();
async move {
let task_id = key_with_prefix.strip_prefix("rhai_task_details:").unwrap_or(&key_with_prefix).to_string();
match task_con.hgetall::<_, HashMap<String, String>>(&key_with_prefix).await {
Ok(details_map) => Some(RhaiTask::from_redis_hash(task_id, &details_map)),
Err(e) => {
tracing::warn!("Could not fetch details for task key {}: {}", key_with_prefix, e);
None
}
}
}
})
.buffer_unordered(10) // Concurrently fetch details for 10 tasks
.collect::<Vec<_>>()
.await;
all_rhai_tasks.extend(tasks_futures.into_iter().flatten());
cursor = new_cursor;
if cursor == 0 { // SCAN returns 0 when iteration is complete
break;
}
}
// Sort tasks by creation date (optional, assuming created_at is parsable)
// For simplicity, we'll skip sorting for now as created_at is a string.
let pending_tasks_count = all_rhai_tasks.iter().filter(|task| task.status.to_lowercase() == "pending").count();
plot::display_queue_plot("Total Pending Tasks", pending_tasks_count).await?;
tasks::display_task_table(&all_rhai_tasks).await?;
sleep(Duration::from_millis(POLLING_INTERVAL_MILLISECONDS)).await;
Result::<()>::Ok(())
} => {}
}
}
Ok(())
}

10
src/monitor/src/lib.rs Normal file
View File

@ -0,0 +1,10 @@
// rhailib/monitor/src/lib.rs
// Declare the modules that make up this crate's library
pub mod cli_logic;
pub mod plot;
pub mod tasks;
// Re-export the main function to be used by the binary (src/main.rs)
// and potentially by other crates if this library is used as a dependency.
pub use cli_logic::start_monitoring;

33
src/monitor/src/main.rs Normal file
View File

@ -0,0 +1,33 @@
// rhailib/monitor/src/main.rs
use anyhow::Result;
use clap::Parser;
// Use the start_monitoring function from the monitor crate's library
use monitor::start_monitoring;
#[derive(Parser, Debug)]
#[clap(author, version, about = "Rhai Worker Monitor CLI", long_about = None)]
struct Args {
/// List of worker names to monitor, comma-separated
#[clap(short, long, value_delimiter = ',', required = true, num_args = 1..)]
workers: Vec<String>,
// TODO: Add other options like Redis connection details if not using a config file or env vars.
}
#[tokio::main]
async fn main() -> Result<()> {
// Initialize logging (e.g., tracing-subscriber)
// Consider making log level configurable via CLI args or env var.
tracing_subscriber::fmt::init();
let args = Args::parse();
tracing::info!("Starting monitor for workers: {:?}", args.workers);
// Call the main logic function from the monitor library
start_monitoring(&args.workers).await?;
tracing::info!("Monitor finished.");
Ok(())
}

23
src/monitor/src/plot.rs Normal file
View File

@ -0,0 +1,23 @@
// rhailib/monitor/src/plot.rs
use anyhow::Result;
/// Placeholder for queue plotting logic.
const MAX_BAR_WIDTH: usize = 50; // Max width of the bar in characters
const BAR_CHAR: char = '█'; // Character to use for the bar
pub async fn display_queue_plot(plot_label: &str, count: usize) -> Result<()> {
let bar_width = std::cmp::min(count, MAX_BAR_WIDTH);
let bar: String = std::iter::repeat(BAR_CHAR).take(bar_width).collect();
// ANSI escape code for green color can be added here if desired
// Example: let green_bar = format!("\x1b[32m{}\x1b[0m", bar);
println!(
"{:<27} [{:<width$}] ({})", // Adjusted label spacing
plot_label,
bar,
count,
width = MAX_BAR_WIDTH
);
Ok(())
}

90
src/monitor/src/tasks.rs Normal file
View File

@ -0,0 +1,90 @@
// rhailib/monitor/src/tasks.rs
use anyhow::Result;
use prettytable::{Cell, Row, Table, format};
use std::collections::HashMap;
#[derive(Debug, Clone)]
pub struct RhaiTask {
pub id: String,
pub script: Option<String>,
pub status: String,
pub created_at: Option<String>, // Keep as string for display, parsing can be complex
pub updated_at: Option<String>, // Keep as string, might be RFC3339 or Unix timestamp
pub client_rpc_id: Option<String>,
pub reply_to_queue: Option<String>,
pub output: Option<String>,
pub error: Option<String>,
}
impl RhaiTask {
pub fn from_redis_hash(task_id: String, details: &HashMap<String, String>) -> Self {
// Helper to get optional string, converting "null" string to None
let get_opt_string = |key: &str| -> Option<String> {
details.get(key).and_then(|s| {
if s.to_lowercase() == "null" || s.is_empty() {
None
} else {
Some(s.clone())
}
})
};
RhaiTask {
id: task_id,
script: get_opt_string("script"),
status: details.get("status").cloned().unwrap_or_else(|| "unknown".to_string()),
created_at: get_opt_string("createdAt"),
updated_at: get_opt_string("updatedAt"),
client_rpc_id: get_opt_string("clientRpcId"),
reply_to_queue: get_opt_string("replyToQueue"),
output: get_opt_string("output"),
error: get_opt_string("error"),
}
}
}
/// Displays all monitored Rhai tasks in a formatted table.
pub async fn display_task_table(tasks: &[RhaiTask]) -> Result<()> {
println!("\nAll Monitored Rhai Tasks:");
if tasks.is_empty() {
println!(" No tasks to display.");
return Ok(());
}
let mut table = Table::new();
table.set_format(*format::consts::FORMAT_BOX_CHARS);
table.add_row(Row::new(vec![
Cell::new("Task ID").style_spec("bFg"),
Cell::new("Status").style_spec("bFg"),
Cell::new("Created At").style_spec("bFg"),
Cell::new("Updated At").style_spec("bFg"),
Cell::new("Details (Output/Error)").style_spec("bFg"),
// Cell::new("Script (Excerpt)").style_spec("bFg"), // Optional: Add if needed
]));
for task in tasks {
let details_str = match (&task.output, &task.error) {
(Some(out), None) => format!("Output: {:.50}", out), // Truncate for display
(None, Some(err)) => format!("Error: {:.50}", err), // Truncate for display
(Some(out), Some(err)) => format!("Output: {:.30}... Error: {:.30}...", out, err),
(None, None) => "N/A".to_string(),
};
// let script_excerpt = task.script.as_ref().map_or("N/A".to_string(), |s| {
// if s.len() > 30 { format!("{:.27}...", s) } else { s.clone() }
// });
table.add_row(Row::new(vec![
Cell::new(&task.id[..std::cmp::min(task.id.len(), 12)]), // Show first 12 chars of ID
Cell::new(&task.status),
Cell::new(task.created_at.as_deref().unwrap_or("N/A")),
Cell::new(task.updated_at.as_deref().unwrap_or("N/A")),
Cell::new(&details_str),
// Cell::new(&script_excerpt),
]));
}
table.printstd();
Ok(())
}

2
src/repl/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
target
temp_db_for_example_worker_default_worker

View File

@ -0,0 +1,5 @@
#V2
.edit
quit
.edit
exit

1752
src/repl/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

21
src/repl/Cargo.toml Normal file
View File

@ -0,0 +1,21 @@
[package]
name = "ui_repl"
version = "0.1.0"
edition = "2024" # Keep 2024 unless issues arise
[dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time", "sync"] } # Added "time" for potential timeouts, "sync" for worker
url = "2" # For parsing Redis URL
tracing = "0.1" # For logging
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
log = "0.4" # rhai_client uses log crate
rustyline = { version = "13.0.0", features = ["derive"] } # For enhanced REPL input
tempfile = "3.8" # For creating temporary files for editing
rhai_client = { path = "../client" }
anyhow = "1.0" # For simpler error handling
worker_lib = { path = "../worker", package = "worker" }
engine = { path = "../engine" }
heromodels = { path = "../../../db/heromodels", features = ["rhai"] }
rhai = { version = "1.18.0" } # Match version used by worker/engine

77
src/repl/README.md Normal file
View File

@ -0,0 +1,77 @@
# Rhai REPL CLI for Circle WebSocket Servers
This crate provides a command-line interface (CLI) to interact with Rhai scripts executed on remote Circle WebSocket servers. It includes both an interactive REPL and a non-interactive example.
## Prerequisites
1. **Circle Orchestrator Running**: Ensure the `circles_orchestrator` is running. This application manages and starts the individual Circle WebSocket servers.
To run the orchestrator:
```bash
cd /path/to/herocode/circles/cmd
cargo run
```
By default, this will start servers based on the `circles.json` configuration (e.g., "Alpha Circle" on `ws://127.0.0.1:8081/ws`).
2. **Redis Server**: Ensure a Redis server is running and accessible at `redis://127.0.0.1:6379` (this is the default used by the orchestrator and its components).
## Usage
Navigate to this crate's directory:
```bash
cd /path/to/herocode/circles/ui_repl
```
### 1. Interactive REPL
The main binary of this crate is an interactive REPL.
**To run with default WebSocket URL (`ws://127.0.0.1:8081/ws`):**
```bash
cargo run
```
**To specify a WebSocket URL:**
```bash
cargo run ws://<your-circle-server-ip>:<port>/ws
# Example for "Beta Circle" if configured on port 8082:
# cargo run ws://127.0.0.1:8082/ws
```
Once connected, you can:
- Type single-line Rhai scripts directly and press Enter.
- Use Vi keybindings for editing the current input line (thanks to `rustyline`).
- Type `.edit` to open your `$EDITOR` (or `vi` by default) for multi-line script input. Save and close the editor to execute the script.
- Type `.run <filepath>` (or `run <filepath>`) to execute a Rhai script from a local file.
- Type `exit` or `quit` to close the REPL.
Command history is saved to `.rhai_repl_history.txt` in the directory where you run the REPL.
### 2. Non-Interactive Example (`connect_and_play`)
This example connects to a WebSocket server, sends a predefined Rhai script, prints the response, and then disconnects.
**To run with default WebSocket URL (`ws://127.0.0.1:8081/ws`):**
```bash
cargo run --example connect_and_play
```
**To specify a WebSocket URL for the example:**
```bash
cargo run --example connect_and_play ws://<your-circle-server-ip>:<port>/ws
# Example:
# cargo run --example connect_and_play ws://127.0.0.1:8082/ws
```
The example script is:
```rhai
let a = 10;
let b = 32;
let message = "Hello from example script!";
message + " Result: " + (a + b)
```
## Logging
Both the REPL and the example use the `tracing` crate for logging. You can control log levels using the `RUST_LOG` environment variable. For example, to see debug logs from the `circle_client_ws` library:
```bash
RUST_LOG=info,circle_client_ws=debug cargo run --example connect_and_play

View File

@ -0,0 +1,142 @@
use rhai_client::{RhaiClient, RhaiClientError, RhaiTaskDetails};
use std::env;
use std::time::Duration;
use std::sync::Arc;
use anyhow::Context;
use tracing_subscriber::EnvFilter;
use tokio::sync::mpsc;
use worker_lib::spawn_rhai_worker;
use engine::create_heromodels_engine;
use heromodels::db::hero::OurDB;
use std::path::PathBuf;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env().add_directive("connect_and_play=info".parse().unwrap()).add_directive("rhai_client=info".parse().unwrap()))
.init();
let args: Vec<String> = env::args().collect();
let redis_url = args.get(1).cloned().unwrap_or_else(|| {
let default_url = "redis://127.0.0.1/".to_string();
println!("No Redis URL provided. Defaulting to: {}", default_url);
default_url
});
let worker_name = args.get(2).cloned().unwrap_or_else(|| {
let default_worker = "default_worker".to_string();
println!("No worker name provided. Defaulting to: {}", default_worker);
default_worker
});
// Define DB path for the worker
let db_path_str = format!("./temp_db_for_example_worker_{}", worker_name);
let db_path = PathBuf::from(&db_path_str);
// Create shutdown channel for the worker
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
// Spawn a worker in the background
let worker_redis_url = redis_url.clone();
let worker_circle_name_for_task = worker_name.clone();
let db_path_for_task = db_path_str.clone();
log::info!("[Main] Spawning worker for circle '{}' with DB path '{}'", worker_circle_name_for_task, db_path_for_task);
let worker_join_handle = tokio::spawn(async move {
log::info!("[BG Worker] Starting for circle '{}' on Redis '{}'", worker_circle_name_for_task, worker_redis_url);
// The `reset: true` in OurDB::new handles pre-cleanup if the directory exists.
let db = Arc::new(OurDB::new(&db_path_for_task, true)
.expect("Failed to create temp DB for example worker"));
let mut engine = create_heromodels_engine(db);
engine.set_max_operations(0);
engine.set_max_expr_depths(0, 0);
engine.set_optimization_level(rhai::OptimizationLevel::Full);
if let Err(e) = spawn_rhai_worker(
1, // dummy circle_id
worker_circle_name_for_task.clone(),
engine,
worker_redis_url.clone(),
shutdown_rx, // Pass the receiver from main
false, // preserve_tasks
).await {
log::error!("[BG Worker] Failed to spawn or worker error for circle '{}': {}", worker_circle_name_for_task, e);
} else {
log::info!("[BG Worker] Worker for circle '{}' shut down gracefully.", worker_circle_name_for_task);
}
});
// Give the worker a moment to start up
tokio::time::sleep(Duration::from_secs(1)).await;
println!("Initializing RhaiClient for Redis at {} to target worker '{}'...", redis_url, worker_name);
let client = RhaiClient::new(&redis_url)
.with_context(|| format!("Failed to create RhaiClient for Redis URL: {}", redis_url))?;
println!("RhaiClient initialized.");
let script = "let a = 10; let b = 32; let message = \"Hello from example script!\"; message + \" Result: \" + (a + b)";
println!("\nSending script:\n```rhai\n{}\n```", script);
let timeout = Duration::from_secs(30);
match client.submit_script_and_await_result(&worker_name, script.to_string(), None, timeout).await {
Ok(task_details) => {
println!("\nWorker response:");
if let Some(ref output) = task_details.output {
println!("Output: {}", output);
}
if let Some(ref error_msg) = task_details.error {
eprintln!("Error: {}", error_msg);
}
if task_details.output.is_none() && task_details.error.is_none() {
println!("Worker finished with no explicit output or error. Status: {}", task_details.status);
}
}
Err(e) => match e {
RhaiClientError::Timeout(task_id) => {
eprintln!("\nError: Script execution timed out for task_id: {}.", task_id);
}
RhaiClientError::RedisError(redis_err) => {
eprintln!("\nError: Redis communication failed: {}. Check Redis connection and server status.", redis_err);
}
RhaiClientError::SerializationError(serde_err) => {
eprintln!("\nError: Failed to serialize/deserialize task data: {}.", serde_err);
}
RhaiClientError::TaskNotFound(task_id) => {
eprintln!("\nError: Task {} not found after submission.", task_id);
}
/* All RhaiClientError variants are handled, so _ arm is not strictly needed
unless RhaiClientError becomes non-exhaustive in the future. */
},
}
println!("\nExample client operations finished. Shutting down worker...");
// Send shutdown signal to the worker
if let Err(e) = shutdown_tx.send(()).await {
eprintln!("[Main] Failed to send shutdown signal to worker: {} (worker might have already exited or an error occurred)", e);
}
// Wait for the worker to finish
log::info!("[Main] Waiting for worker task to join...");
if let Err(e) = worker_join_handle.await {
eprintln!("[Main] Error waiting for worker task to join: {:?}", e);
} else {
log::info!("[Main] Worker task joined successfully.");
}
// Clean up the database directory
log::info!("[Main] Cleaning up database directory: {}", db_path.display());
if db_path.exists() {
if let Err(e) = std::fs::remove_dir_all(&db_path) {
eprintln!("[Main] Failed to remove database directory '{}': {}", db_path.display(), e);
} else {
log::info!("[Main] Successfully removed database directory: {}", db_path.display());
}
} else {
log::info!("[Main] Database directory '{}' not found, no cleanup needed.", db_path.display());
}
println!("Example fully completed and cleaned up.");
Ok(())
}

212
src/repl/src/main.rs Normal file
View File

@ -0,0 +1,212 @@
use tracing_subscriber::EnvFilter;
use rhai_client::{RhaiClient, RhaiClientError, RhaiTaskDetails};
use rustyline::error::ReadlineError;
use rustyline::{DefaultEditor, Config, EditMode};
use std::fs;
use std::process::Command;
use std::env;
use std::time::Duration;
use tempfile::Builder as TempFileBuilder;
use anyhow::Context;
// Default timeout for script execution
const DEFAULT_SCRIPT_TIMEOUT_SECONDS: u64 = 30;
async fn execute_script(client: &RhaiClient, circle_name: &str, script_content: String) {
if script_content.trim().is_empty() {
println!("Script is empty, not sending.");
return;
}
println!("Sending script to worker '{}':\n---\n{}\n---", circle_name, script_content);
let timeout = Duration::from_secs(DEFAULT_SCRIPT_TIMEOUT_SECONDS);
match client.submit_script_and_await_result(circle_name, script_content, None, timeout).await {
Ok(task_details) => {
if let Some(output) = &task_details.output {
println!("worker: {}", output);
}
if let Some(error_msg) = &task_details.error {
eprintln!("Worker error: {}", error_msg);
}
if task_details.output.is_none() && task_details.error.is_none() {
println!("Worker finished with no explicit output or error. Status: {}", task_details.status);
}
}
Err(e) => match e {
RhaiClientError::Timeout(task_id) => {
eprintln!("Error: Script execution timed out for task_id: {}.", task_id);
}
RhaiClientError::RedisError(redis_err) => {
eprintln!("Error: Redis communication failed: {}. Check Redis connection and server status.", redis_err);
}
RhaiClientError::SerializationError(serde_err) => {
eprintln!("Error: Failed to serialize/deserialize task data: {}.", serde_err);
}
RhaiClientError::TaskNotFound(task_id) => {
eprintln!("Error: Task {} not found after submission (this should be rare).", task_id);
}
},
}
}
async fn run_repl(redis_url: String, circle_name: String) -> anyhow::Result<()> {
println!("Initializing Rhai REPL for worker '{}' via Redis at {}...", circle_name, redis_url);
let client = RhaiClient::new(&redis_url)
.with_context(|| format!("Failed to create RhaiClient for Redis URL: {}", redis_url))?;
// No explicit connect() needed for rhai_client, connection is handled per-operation or pooled.
println!("RhaiClient initialized. Ready to send scripts to worker '{}'.", circle_name);
println!("Type Rhai scripts, '.edit' to use $EDITOR, '.run <path>' to execute a file, or 'exit'/'quit'.");
println!("Vi mode enabled for input line.");
let config = Config::builder()
.edit_mode(EditMode::Vi)
.auto_add_history(true) // Automatically add to history
.build();
let mut rl = DefaultEditor::with_config(config)?;
let history_file = ".rhai_repl_history.txt"; // Simple history file in current dir
if rl.load_history(history_file).is_err() {
// No history found or error loading, not critical
}
let prompt = format!("rhai ({}) @ {}> ", circle_name, redis_url);
loop {
let readline = rl.readline(&prompt);
match readline {
Ok(line) => {
let input = line.trim();
if input.eq_ignore_ascii_case("exit") || input.eq_ignore_ascii_case("quit") {
println!("Exiting REPL.");
break;
} else if input.eq_ignore_ascii_case(".edit") {
// Correct way to create a temp file with a suffix
let temp_file = TempFileBuilder::new()
.prefix("rhai_script_") // Optional: add a prefix
.suffix(".rhai")
.tempfile_in(".") // Create in current directory for simplicity
.with_context(|| "Failed to create temp file")?;
// You can pre-populate the temp file if needed:
// use std::io::Write; // Add this import if using write_all
// if let Err(e) = temp_file.as_file().write_all(b"// Start your Rhai script here\n") {
// eprintln!("Failed to write initial content to temp file: {}", e);
// }
let temp_path = temp_file.path().to_path_buf();
let editor_cmd_str = env::var("EDITOR").unwrap_or_else(|_| "vi".to_string());
let mut editor_parts = editor_cmd_str.split_whitespace();
let editor_executable = editor_parts.next().unwrap_or("vi"); // Default to vi if $EDITOR is empty string
let editor_args: Vec<&str> = editor_parts.collect();
println!("Launching editor: '{}' with args: {:?} for script editing. Save and exit editor to execute.", editor_executable, editor_args);
let mut command = Command::new(editor_executable);
command.args(editor_args); // Add any arguments from $EDITOR (like -w)
command.arg(&temp_path); // Add the temp file path as the last argument
let status = command.status();
match status {
Ok(exit_status) if exit_status.success() => {
match fs::read_to_string(&temp_path) {
Ok(script_content) => {
execute_script(&client, &circle_name, script_content).await;
}
Err(e) => eprintln!("Error reading temp file {:?}: {}", temp_path, e),
}
}
Ok(exit_status) => eprintln!("Editor exited with status: {}. Script not executed.", exit_status),
Err(e) => eprintln!("Failed to launch editor '{}': {}. Ensure it's in your PATH.", editor_executable, e), // Changed 'editor' to 'editor_executable'
}
// temp_file is automatically deleted when it goes out of scope
} else if input.starts_with(".run ") || input.starts_with("run ") {
let parts: Vec<&str> = input.splitn(2, ' ').collect();
if parts.len() == 2 {
let file_path = parts[1];
println!("Attempting to run script from file: {}", file_path);
match fs::read_to_string(file_path) {
Ok(script_content) => {
execute_script(&client, &circle_name, script_content).await;
}
Err(e) => eprintln!("Error reading file {}: {}", file_path, e),
}
} else {
eprintln!("Usage: .run <filepath>");
}
} else if !input.is_empty() {
execute_script(&client, &circle_name, input.to_string()).await;
}
// rl.add_history_entry(line.as_str()) is handled by auto_add_history(true)
}
Err(ReadlineError::Interrupted) => { // Ctrl-C
println!("Input interrupted. Type 'exit' or 'quit' to close.");
continue;
}
Err(ReadlineError::Eof) => { // Ctrl-D
println!("Exiting REPL (EOF).");
break;
}
Err(err) => {
eprintln!("Error reading input: {:?}", err);
break;
}
}
}
if rl.save_history(history_file).is_err() {
// Failed to save history, not critical
}
// No explicit disconnect for RhaiClient as it manages connections internally.
println!("Exited REPL.");
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env().add_directive("ui_repl=info".parse()?).add_directive("rhai_client=info".parse()?))
.init();
let args: Vec<String> = env::args().collect();
let redis_url_str = if args.len() > 1 {
args[1].clone()
} else {
let default_url = "redis://127.0.0.1/".to_string();
println!("No Redis URL provided. Defaulting to: {}", default_url);
default_url
};
let circle_name_str = if args.len() > 2 {
args[2].clone()
} else {
let default_circle = "default_worker".to_string();
println!("No worker/circle name provided. Defaulting to: {}", default_circle);
default_circle
};
println!("Usage: {} [redis_url] [worker_name]", args.get(0).map_or("ui_repl", |s| s.as_str()));
println!("Example: {} redis://127.0.0.1/ my_rhai_worker", args.get(0).map_or("ui_repl", |s| s.as_str()));
// Basic validation for Redis URL (scheme)
// A more robust validation might involve trying to parse it with redis::ConnectionInfo
if !redis_url_str.starts_with("redis://") {
eprintln!("Warning: Redis URL '{}' does not start with 'redis://'. Attempting to use it anyway.", redis_url_str);
}
if let Err(e) = run_repl(redis_url_str, circle_name_str).await {
eprintln!("REPL error: {:#}", e);
std::process::exit(1);
}
Ok(())
}

3
src/rhai_engine_ui/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
/target
/dist
Cargo.lock

View File

@ -0,0 +1,30 @@
[package]
name = "rhai-engine-ui"
version = "0.1.0"
edition = "2021"
[dependencies]
yew = { version = "0.21", features = ["csr"] }
wasm-bindgen = "0.2"
wasm-logger = "0.2"
gloo-net = "0.4"
gloo-timers = "0.3.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
web-sys = { version = "0.3", features = ["HtmlInputElement"] }
log = "0.4"
chrono = { version = "0.4", features = ["serde"] }
wasm-bindgen-futures = "0.4"
# Server-side dependencies (optional)
tokio = { version = "1", features = ["full"], optional = true }
axum = { version = "0.7", optional = true }
tower = { version = "0.4", optional = true }
tower-http = { version = "0.5.0", features = ["fs", "cors"], optional = true }
rand = { version = "0.8", optional = true }
redis = { version = "0.25", features = ["tokio-comp"], optional = true }
deadpool-redis = { version = "0.15.0", features = ["rt_tokio_1"], optional = true }
[features]
# This feature enables the server-side components
server = ["tokio", "axum", "tower", "tower-http", "rand", "redis", "deadpool-redis"]

View File

@ -0,0 +1,42 @@
# Rhai Engine Worker UI
A Yew-based WASM interface to monitor Rhai workers.
## Prerequisites
- Rust: Install from [rust-lang.org](https://www.rust-lang.org/tools/install)
- Trunk: Install with `cargo install trunk`
- A backend service providing the necessary API endpoints (see below).
## Backend API Requirements
This UI expects a backend service to be running that can provide data from Redis. The UI will make requests to the following (example) endpoints:
- `GET /api/worker/{worker_name}/tasks_and_stats`: Returns initial `WorkerData` including a list of `TaskSummary` and initial `QueueStats`.
- `WorkerData`: `{ "queue_stats": { "current_size": u32, "color_code": "string" }, "tasks": [TaskSummary] }`
- `TaskSummary`: `{ "hash": "string", "created_at": i64, "status": "string" }`
- `GET /api/worker/{worker_name}/queue_stats`: Returns current `QueueStats` for polling.
- `QueueStats`: `{ "current_size": u32, "color_code": "string" }`
- `GET /api/task/{task_hash}`: Returns `TaskDetails`.
- `TaskDetails`: `{ "hash": "string", "created_at": i64, "status": "string", "script_content": "string", "result": "optional_string", "error": "optional_string" }`
**Note:** The API endpoints are currently hardcoded with relative paths (e.g., `/api/...`). This assumes the backend API is served from the same host and port as the Trunk development server, or that a proxy is configured.
## Development
1. Navigate to the `rhai_engine_ui` directory:
```bash
cd /Users/timurgordon/code/git.ourworld.tf/herocode/rhailib/rhai_engine_ui/
```
2. Run the development server:
```bash
trunk serve --port 8081
```
3. Open your browser to `http://127.0.0.1:8081`.
## Building for Release
```bash
trunk build --release
```
This will output static files to the `dist` directory.

View File

@ -0,0 +1,2 @@
[build]
target = "index.html"

View File

@ -0,0 +1,15 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8" />
<title>Rhai Worker UI</title>
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link href="https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600;700&display=swap" rel="stylesheet">
<link data-trunk rel="css" href="styles.css" />
<!-- Trunk will inject a script tag here for the WASM loader -->
</head>
<body>
<!-- The Yew app will be rendered here -->
</body>
</html>

View File

@ -0,0 +1,350 @@
use yew::prelude::*;
use gloo_net::http::Request;
use gloo_timers::callback::Interval;
use serde::{Deserialize, Serialize};
use wasm_bindgen_futures::spawn_local;
use web_sys::HtmlInputElement;
use yew::{html, Component, Context, Html, TargetCast};
// --- Data Structures (placeholders, to be refined based on backend API) ---
#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
pub struct QueueStats {
pub current_size: u32,
pub color_code: String, // e.g., "green", "yellow", "red"
}
#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
pub struct TaskSummary {
pub hash: String,
pub created_at: i64,
pub status: String,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct TaskDetails {
pub hash: String,
pub created_at: i64,
pub status: String,
pub script_content: String,
pub result: Option<String>,
pub error: Option<String>,
}
// Combined structure for initial fetch
#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
pub struct WorkerDataResponse {
pub queue_stats: Option<QueueStats>,
pub tasks: Vec<TaskSummary>,
}
// --- Component ---
pub enum Msg {
UpdateWorkerName(String),
FetchData,
SetWorkerData(Result<WorkerDataResponse, String>),
SetQueueStats(Result<QueueStats, String>),
ViewTaskDetails(String), // Task hash
SetTaskDetails(Result<TaskDetails, String>),
ClearTaskDetails,
IntervalTick, // For interval timer, to trigger queue stats fetch
}
pub struct App {
worker_name_input: String,
worker_name_to_monitor: Option<String>,
tasks_list: Vec<TaskSummary>,
current_queue_stats: Option<QueueStats>,
selected_task_details: Option<TaskDetails>,
error_message: Option<String>,
is_loading_initial_data: bool,
is_loading_task_details: bool,
queue_poll_timer: Option<Interval>,
}
impl Component for App {
type Message = Msg;
type Properties = ();
fn create(_ctx: &Context<Self>) -> Self {
Self {
worker_name_input: "".to_string(),
worker_name_to_monitor: None,
tasks_list: Vec::new(),
current_queue_stats: None,
selected_task_details: None,
error_message: None,
is_loading_initial_data: false,
is_loading_task_details: false,
queue_poll_timer: None,
}
}
fn update(&mut self, ctx: &Context<Self>, msg: Self::Message) -> bool {
match msg {
Msg::UpdateWorkerName(name) => {
self.worker_name_input = name;
true
}
Msg::FetchData => {
if self.worker_name_input.trim().is_empty() {
self.error_message = Some("Please enter a worker name.".to_string());
return true;
}
let worker_name = self.worker_name_input.trim().to_string();
self.worker_name_to_monitor = Some(worker_name.clone());
self.error_message = None;
self.tasks_list.clear();
self.current_queue_stats = None;
self.selected_task_details = None;
self.is_loading_initial_data = true;
let link = ctx.link().clone();
let tasks_url = format!("/api/worker/{}/tasks_and_stats", worker_name);
spawn_local(async move {
match Request::get(&tasks_url).send().await {
Ok(response) => {
if response.ok() {
match response.json::<WorkerDataResponse>().await {
Ok(data) => link.send_message(Msg::SetWorkerData(Ok(data))),
Err(e) => link.send_message(Msg::SetWorkerData(Err(format!("Failed to parse worker data: {}", e)))),
}
} else {
link.send_message(Msg::SetWorkerData(Err(format!("API error: {} {}", response.status(), response.status_text()))));
}
}
Err(e) => link.send_message(Msg::SetWorkerData(Err(format!("Network error fetching worker data: {}", e)))),
}
});
// Set up polling for queue stats
let link_for_timer = ctx.link().clone();
let timer = Interval::new(5000, move || { // Poll every 5 seconds
link_for_timer.send_message(Msg::IntervalTick);
});
if let Some(old_timer) = self.queue_poll_timer.take() {
old_timer.cancel(); // Cancel previous timer if any
}
self.queue_poll_timer = Some(timer);
true
}
Msg::IntervalTick => {
if let Some(worker_name) = &self.worker_name_to_monitor {
let queue_stats_url = format!("/api/worker/{}/queue_stats", worker_name);
let link = ctx.link().clone();
spawn_local(async move {
match Request::get(&queue_stats_url).send().await {
Ok(response) => {
if response.ok() {
match response.json::<QueueStats>().await {
Ok(stats) => link.send_message(Msg::SetQueueStats(Ok(stats))),
Err(e) => link.send_message(Msg::SetQueueStats(Err(format!("Failed to parse queue stats: {}", e)))),
}
} else {
link.send_message(Msg::SetQueueStats(Err(format!("API error (queue_stats): {} {}", response.status(), response.status_text()))));
}
}
Err(e) => link.send_message(Msg::SetQueueStats(Err(format!("Network error fetching queue stats: {}", e)))),
}
});
}
false // No direct re-render, SetQueueStats will trigger it
}
Msg::SetWorkerData(Ok(data)) => {
self.tasks_list = data.tasks;
self.current_queue_stats = data.queue_stats;
self.error_message = None;
self.is_loading_initial_data = false;
true
}
Msg::SetWorkerData(Err(err_msg)) => {
self.error_message = Some(err_msg);
self.is_loading_initial_data = false;
if let Some(timer) = self.queue_poll_timer.take() {
timer.cancel();
}
true
}
Msg::SetQueueStats(Ok(stats)) => {
self.current_queue_stats = Some(stats);
// Don't clear main error message here, as this is a background update
true
}
Msg::SetQueueStats(Err(err_msg)) => {
log::error!("Failed to update queue stats: {}", err_msg);
// Optionally show a non-blocking error for queue stats
self.current_queue_stats = None;
true
}
Msg::ViewTaskDetails(hash) => {
self.is_loading_task_details = true;
self.selected_task_details = None; // Clear previous details
let task_details_url = format!("/api/task/{}", hash);
let link = ctx.link().clone();
spawn_local(async move {
match Request::get(&task_details_url).send().await {
Ok(response) => {
if response.ok() {
match response.json::<TaskDetails>().await {
Ok(details) => link.send_message(Msg::SetTaskDetails(Ok(details))),
Err(e) => link.send_message(Msg::SetTaskDetails(Err(format!("Failed to parse task details: {}", e)))),
}
} else {
link.send_message(Msg::SetTaskDetails(Err(format!("API error (task_details): {} {}", response.status(), response.status_text()))));
}
}
Err(e) => link.send_message(Msg::SetTaskDetails(Err(format!("Network error fetching task details: {}", e)))),
}
});
true
}
Msg::SetTaskDetails(Ok(details)) => {
self.selected_task_details = Some(details);
self.error_message = None; // Clear general error if task details load
self.is_loading_task_details = false;
true
}
Msg::SetTaskDetails(Err(err_msg)) => {
self.error_message = Some(format!("Error loading task details: {}", err_msg));
self.selected_task_details = None;
self.is_loading_task_details = false;
true
}
Msg::ClearTaskDetails => {
self.selected_task_details = None;
true
}
}
}
fn view(&self, ctx: &Context<Self>) -> Html {
let link = ctx.link();
let on_worker_name_input = link.callback(|e: InputEvent| {
let input: HtmlInputElement = e.target_unchecked_into();
Msg::UpdateWorkerName(input.value())
});
html! {
<div class="container">
<h1>{ "Rhai Worker Monitor" }</h1>
<div class="input-group">
<input type="text"
placeholder="Enter Worker Name (e.g., worker_default)"
value={self.worker_name_input.clone()}
oninput={on_worker_name_input.clone()}
disabled={self.is_loading_initial_data}
onkeypress={link.callback(move |e: KeyboardEvent| {
if e.key() == "Enter" { Msg::FetchData } else { Msg::UpdateWorkerName(e.target_unchecked_into::<HtmlInputElement>().value()) }
})}
/>
<button onclick={link.callback(|_| Msg::FetchData)} disabled={self.is_loading_initial_data || self.worker_name_input.trim().is_empty()}>
{ if self.is_loading_initial_data { "Loading..." } else { "Load Worker Data" } }
</button>
</div>
if let Some(err) = &self.error_message {
<p class="error">{ err }</p>
}
if self.worker_name_to_monitor.is_some() && !self.is_loading_initial_data && self.error_message.is_none() {
<h2>{ format!("Monitoring: {}", self.worker_name_to_monitor.as_ref().unwrap()) }</h2>
<h3>{ "Queue Status" }</h3>
<div class="queue-visualization">
{
if let Some(stats) = &self.current_queue_stats {
// TODO: Implement actual color coding and bar visualization
html! { <p>{format!("Tasks in queue: {} ({})", stats.current_size, stats.color_code)}</p> }
} else {
html! { <p>{ "Loading queue stats..." }</p> }
}
}
</div>
<h3>{ "Tasks" }</h3>
{ self.view_tasks_table(ctx) }
{ self.view_selected_task_details(ctx) }
} else if self.is_loading_initial_data {
<p>{ "Loading worker data..." }</p>
}
</div>
}
}
}
impl App {
fn view_tasks_table(&self, ctx: &Context<Self>) -> Html {
if self.tasks_list.is_empty() && self.worker_name_to_monitor.is_some() && !self.is_loading_initial_data {
return html! { <p>{ "No tasks found for this worker, or worker not found." }</p> };
}
if !self.tasks_list.is_empty() {
html! {
<table class="task-table">
<thead>
<tr>
<th>{ "Hash (click to view)" }</th>
<th>{ "Created At (UTC)" }</th>
<th>{ "Status" }</th>
</tr>
</thead>
<tbody>
{ for self.tasks_list.iter().map(|task| self.view_task_row(ctx, task)) }
</tbody>
</table>
}
} else {
html! {}
}
}
fn view_task_row(&self, ctx: &Context<Self>, task: &TaskSummary) -> Html {
let task_hash_clone = task.hash.clone();
let created_at_str = chrono::DateTime::from_timestamp(task.created_at, 0)
.map_or_else(|| "Invalid date".to_string(), |dt| dt.format("%Y-%m-%d %H:%M:%S").to_string());
html! {
<tr onclick={ctx.link().callback(move |_| Msg::ViewTaskDetails(task_hash_clone.clone()))}
style="cursor: pointer;">
<td>{ task.hash.chars().take(12).collect::<String>() }{ "..." }</td>
<td>{ created_at_str }</td>
<td>{ &task.status }</td>
</tr>
}
}
fn view_selected_task_details(&self, ctx: &Context<Self>) -> Html {
if self.is_loading_task_details {
return html! { <p>{ "Loading task details..." }</p> };
}
if let Some(details) = &self.selected_task_details {
let created_at_str = chrono::DateTime::from_timestamp(details.created_at, 0)
.map_or_else(|| "Invalid date".to_string(), |dt| dt.format("%Y-%m-%d %H:%M:%S UTC").to_string());
html! {
<div class="task-details-modal">
<h4>{ format!("Task Details: {}", details.hash) }</h4>
<p><strong>{ "Created At: " }</strong>{ created_at_str }</p>
<p><strong>{ "Status: " }</strong>{ &details.status }</p>
<p><strong>{ "Script Content:" }</strong></p>
<pre>{ &details.script_content }</pre>
if let Some(result) = &details.result {
<p><strong>{ "Result:" }</strong></p>
<pre>{ result }</pre>
}
if let Some(error) = &details.error {
<p><strong>{ "Error:" }</strong></p>
<pre style="color: red;">{ error }</pre>
}
<button onclick={ctx.link().callback(|_| Msg::ClearTaskDetails)}>{ "Close Details" }</button>
</div>
}
} else {
html! {}
}
}
}

View File

@ -0,0 +1,155 @@
// The 'app' module is shared between the server and the client.
mod app;
// --- SERVER-SIDE CODE --- //
#[cfg(feature = "server")]
mod server {
use axum::{
extract::{Path, State},
http::{Method, StatusCode},
routing::get,
Json,
Router,
};
use deadpool_redis::{Config, Pool, Runtime};
use redis::{from_redis_value, AsyncCommands, FromRedisValue, Value};
use std::collections::HashMap;
use std::env;
use std::net::SocketAddr;
use tower_http::cors::{Any, CorsLayer};
use tower_http::services::ServeDir;
// Import the shared application state and data structures
use crate::app::{QueueStats, TaskDetails, TaskSummary, WorkerDataResponse};
const REDIS_TASK_DETAILS_PREFIX: &str = "rhai_task_details:";
const REDIS_QUEUE_PREFIX: &str = "rhai_tasks:";
// The main function to run the server
pub async fn run() {
let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1/".to_string());
let cfg = Config::from_url(redis_url);
let pool = cfg.create_pool(Some(Runtime::Tokio1)).expect("Failed to create Redis pool");
let cors = CorsLayer::new().allow_methods([Method::GET]).allow_origin(Any);
let app = Router::new()
.route("/api/worker/:worker_name/tasks_and_stats", get(get_worker_data))
.route("/api/worker/:worker_name/queue_stats", get(get_queue_stats))
.route("/api/task/:hash", get(get_task_details))
.nest_service("/", ServeDir::new("dist"))
.with_state(pool)
.layer(cors);
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
println!("Backend server listening on http://{}", addr);
println!("Serving static files from './dist' directory.");
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
axum::serve(listener, app).await.unwrap();
}
// --- API Handlers (Live Redis Data) ---
async fn get_worker_data(
State(pool): State<Pool>,
Path(worker_name): Path<String>,
) -> Result<Json<WorkerDataResponse>, (StatusCode, String)> {
let mut conn = pool.get().await.map_err(internal_error)?;
let queue_key = format!("{}{}", REDIS_QUEUE_PREFIX, worker_name);
let task_ids: Vec<String> = conn.lrange(&queue_key, 0, -1).await.map_err(internal_error)?;
let mut tasks = Vec::new();
for task_id in task_ids {
let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id);
let task_details: redis::Value = conn.hgetall(&task_key).await.map_err(internal_error)?;
if let Ok(summary) = task_summary_from_redis_value(&task_details) {
tasks.push(summary);
}
}
let queue_stats = get_queue_stats_internal(&mut conn, &worker_name).await?;
Ok(Json(WorkerDataResponse { tasks, queue_stats: Some(queue_stats) }))
}
async fn get_queue_stats(
State(pool): State<Pool>,
Path(worker_name): Path<String>,
) -> Result<Json<QueueStats>, (StatusCode, String)> {
let mut conn = pool.get().await.map_err(internal_error)?;
let stats = get_queue_stats_internal(&mut conn, &worker_name).await?;
Ok(Json(stats))
}
async fn get_task_details(
State(pool): State<Pool>,
Path(hash): Path<String>,
) -> Result<Json<TaskDetails>, (StatusCode, String)> {
let mut conn = pool.get().await.map_err(internal_error)?;
let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, hash);
let task_details: redis::Value = conn.hgetall(&task_key).await.map_err(internal_error)?;
let details = task_details_from_redis_value(&task_details).map_err(internal_error)?;
Ok(Json(details))
}
// --- Internal Helper Functions ---
async fn get_queue_stats_internal(
conn: &mut deadpool_redis::Connection,
worker_name: &str,
) -> Result<QueueStats, (StatusCode, String)> {
let queue_key = format!("{}{}", REDIS_QUEUE_PREFIX, worker_name);
let size: u32 = conn.llen(&queue_key).await.map_err(internal_error)?;
let color_code = match size {
0..=10 => "green",
11..=50 => "yellow",
_ => "red",
}.to_string();
Ok(QueueStats { current_size: size, color_code })
}
fn internal_error<E: std::error::Error>(err: E) -> (StatusCode, String) {
(StatusCode::INTERNAL_SERVER_ERROR, err.to_string())
}
fn task_summary_from_redis_value(v: &Value) -> redis::RedisResult<TaskSummary> {
let map: HashMap<String, String> = from_redis_value(v)?;
Ok(TaskSummary {
hash: map.get("hash").cloned().unwrap_or_default(),
created_at: map.get("createdAt").and_then(|s| s.parse().ok()).unwrap_or_default(),
status: map.get("status").cloned().unwrap_or_else(|| "Unknown".to_string()),
})
}
fn task_details_from_redis_value(v: &Value) -> redis::RedisResult<TaskDetails> {
let map: HashMap<String, String> = from_redis_value(v)?;
Ok(TaskDetails {
hash: map.get("hash").cloned().unwrap_or_default(),
created_at: map.get("createdAt").and_then(|s| s.parse().ok()).unwrap_or_default(),
status: map.get("status").cloned().unwrap_or_else(|| "Unknown".to_string()),
script_content: map.get("script").cloned().unwrap_or_default(),
result: map.get("output").cloned(),
error: map.get("error").cloned(),
})
}
}
// --- MAIN ENTRY POINTS --- //
// Main function for the server binary
#[cfg(feature = "server")]
#[tokio::main]
async fn main() {
server::run().await;
}
// Main function for the WASM client (compiles when 'server' feature is not enabled)
#[cfg(not(feature = "server"))]
fn main() {
wasm_logger::init(wasm_logger::Config::default());
log::info!("Rhai Worker UI starting...");
yew::Renderer::<app::App>::new().render();
}

View File

@ -0,0 +1,173 @@
/* --- Dark, Sleek, and Modern UI --- */
:root {
--bg-color: #1a1a1a;
--primary-color: #252525;
--secondary-color: #333333;
--font-color: #e0e0e0;
--highlight-color: #00aaff;
--border-color: #444444;
--error-color: #ff4d4d;
--error-bg-color: rgba(255, 77, 77, 0.1);
}
body {
font-family: 'Inter', sans-serif;
margin: 0;
padding: 40px 20px;
background-color: var(--bg-color);
color: var(--font-color);
-webkit-font-smoothing: antialiased;
-moz-osx-font-smoothing: grayscale;
}
.container {
background-color: transparent;
max-width: 900px;
margin: auto;
}
h1, h2, h3, h4 {
color: var(--font-color);
font-weight: 600;
margin-bottom: 20px;
}
h1 {
text-align: center;
font-size: 2.5em;
letter-spacing: -1px;
}
.input-group {
margin-bottom: 30px;
display: flex;
gap: 10px;
}
input[type="text"] {
flex-grow: 1;
padding: 12px 15px;
border: 1px solid var(--border-color);
border-radius: 6px;
font-size: 1em;
background-color: var(--primary-color);
color: var(--font-color);
transition: border-color 0.3s, box-shadow 0.3s;
}
input[type="text"]:focus {
outline: none;
border-color: var(--highlight-color);
box-shadow: 0 0 0 3px rgba(0, 170, 255, 0.2);
}
button {
padding: 12px 20px;
background-color: var(--highlight-color);
color: #ffffff;
border: none;
border-radius: 6px;
cursor: pointer;
font-size: 1em;
font-weight: 500;
transition: background-color 0.3s;
}
button:hover {
background-color: #0088cc;
}
button:disabled {
background-color: var(--secondary-color);
cursor: not-allowed;
}
.error {
color: var(--error-color);
margin-top: 20px;
text-align: center;
padding: 12px;
border: 1px solid var(--error-color);
background-color: var(--error-bg-color);
border-radius: 6px;
}
.task-table {
width: 100%;
border-collapse: collapse;
margin-top: 30px;
}
.task-table th, .task-table td {
border-bottom: 1px solid var(--border-color);
padding: 15px;
text-align: left;
}
.task-table th {
font-weight: 600;
color: #a0a0a0;
text-transform: uppercase;
font-size: 0.85em;
letter-spacing: 0.5px;
}
.task-table tr {
transition: background-color 0.2s;
}
.task-table tr:hover {
background-color: var(--primary-color);
cursor: pointer;
}
.queue-visualization {
margin-top: 30px;
padding: 25px;
border: 1px solid var(--border-color);
background-color: var(--primary-color);
border-radius: 8px;
text-align: center;
font-size: 1.2em;
font-weight: 500;
}
.task-details-modal {
margin-top: 30px;
padding: 25px;
border: 1px solid var(--border-color);
background-color: var(--primary-color);
border-radius: 8px;
}
.task-details-modal h4 {
margin-top: 0;
font-size: 1.5em;
}
.task-details-modal p {
margin: 12px 0;
color: #c0c0c0;
}
.task-details-modal p strong {
color: var(--font-color);
font-weight: 500;
}
.task-details-modal pre {
background-color: var(--bg-color);
padding: 15px;
border-radius: 6px;
white-space: pre-wrap;
word-break: break-all;
max-height: 250px;
overflow-y: auto;
border: 1px solid var(--border-color);
font-family: 'Courier New', Courier, monospace;
}
.task-details-modal button {
margin-top: 20px;
}

BIN
src/worker/.DS_Store vendored Normal file

Binary file not shown.

View File

@ -1 +1,2 @@
/target /target
worker_rhai_temp_db

View File

@ -9,7 +9,7 @@ path = "src/lib.rs"
[[bin]] [[bin]]
name = "worker" name = "worker"
path = "src/bin/worker.rs" path = "cmd/worker.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@ -25,3 +25,5 @@ clap = { version = "4.4", features = ["derive"] }
uuid = { version = "1.6", features = ["v4", "serde"] } # Though task_id is string, uuid might be useful uuid = { version = "1.6", features = ["v4", "serde"] } # Though task_id is string, uuid might be useful
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
rhai_client = { path = "../client" } rhai_client = { path = "../client" }
engine = { path = "../engine" }
heromodels = { path = "../../../db/heromodels", features = ["rhai"] }

View File

@ -1,18 +1,86 @@
use rhai::Engine; use worker_lib::spawn_rhai_worker;
use worker_lib::{run_worker_loop, Args}; // Use the library name defined in Cargo.toml use engine::create_heromodels_engine;
use clap::Parser; // Required for Args::parse() to be in scope use heromodels::db::hero::OurDB;
use std::sync::Arc;
use clap::Parser;
use tokio::sync::mpsc;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Circle name to listen to
#[arg(short, long, default_value = "default")]
circle: String,
/// Redis URL
#[arg(short, long, default_value = "redis://localhost:6379")]
redis_url: String,
/// Worker ID for identification
#[arg(short, long, default_value = "worker_1")]
worker_id: String,
/// Preserve task details after completion (for benchmarking)
#[arg(long, default_value = "false")]
preserve_tasks: bool,
}
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
env_logger::init(); env_logger::init();
let args = Args::parse(); let args = Args::parse();
log::info!("Rhai Worker (binary) starting with default engine."); log::info!("Rhai Worker (binary) starting with performance-optimized engine.");
log::info!("Worker ID: {}, Circle: {}, Redis: {}", args.worker_id, args.circle, args.redis_url);
let engine = Engine::new(); // Initialize database with OurDB for the Rhai engine
// If specific default configurations are needed for the binary's engine, set them up here. // Using a temporary/in-memory like database for the worker
// For example: engine.set_max_operations(1_000_000); let db = Arc::new(OurDB::new("worker_rhai_temp_db", true).expect("Failed to create temporary DB for Rhai engine"));
let mut engine = create_heromodels_engine(db);
run_worker_loop(engine, args).await // Performance optimizations for benchmarking
engine.set_max_operations(0); // Unlimited operations for performance testing
engine.set_max_expr_depths(0, 0); // Unlimited expression depth
engine.set_max_string_size(0); // Unlimited string size
engine.set_max_array_size(0); // Unlimited array size
engine.set_max_map_size(0); // Unlimited map size
// Enable full optimization for maximum performance
engine.set_optimization_level(rhai::OptimizationLevel::Full);
log::info!("Engine configured for maximum performance");
// Create shutdown channel (for graceful shutdown, though not used in benchmarks)
let (_shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
// Spawn the worker
let worker_handle = spawn_rhai_worker(
1, // circle_id (not used but required)
args.circle,
engine,
args.redis_url,
shutdown_rx,
args.preserve_tasks,
);
// Wait for the worker to complete
match worker_handle.await {
Ok(result) => {
match result {
Ok(_) => {
log::info!("Worker completed successfully");
Ok(())
}
Err(e) => {
log::error!("Worker failed: {}", e);
Err(e)
}
}
}
Err(e) => {
log::error!("Worker task panicked: {}", e);
Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
}
}
} }

View File

@ -1,28 +1,17 @@
use chrono::Utc; use chrono::Utc;
use clap::Parser; use log::{debug, error, info};
use log::{debug, error, info}; // Removed warn as it wasn't used in the loop
use redis::AsyncCommands; use redis::AsyncCommands;
use rhai::{Engine, Scope}; // EvalAltResult is not directly returned by the loop use rhai::{Engine, Scope};
use std::collections::HashMap; // For hgetall result use std::collections::HashMap;
use tokio::task::JoinHandle;
// Re-export RhaiTaskDetails from rhai_client if needed by examples, use tokio::sync::mpsc; // For shutdown signal
// or examples can depend on rhai_client directly. use rhai_client::RhaiTaskDetails; // Import for constructing the reply message
// For now, the worker logic itself just interacts with the hash fields. use serde_json; // For serializing the reply message
const REDIS_TASK_DETAILS_PREFIX: &str = "rhai_task_details:"; const REDIS_TASK_DETAILS_PREFIX: &str = "rhai_task_details:";
const REDIS_QUEUE_PREFIX: &str = "rhai_tasks:"; const REDIS_QUEUE_PREFIX: &str = "rhai_tasks:";
const BLPOP_TIMEOUT_SECONDS: usize = 5; const BLPOP_TIMEOUT_SECONDS: usize = 5;
#[derive(Parser, Debug, Clone)] // Added Clone for potential use in examples
#[clap(author, version, about, long_about = None)]
pub struct Args {
#[clap(long, value_parser, default_value = "redis://127.0.0.1/")]
pub redis_url: String,
#[clap(short, long, value_parser, required = true, num_args = 1..)]
pub circles: Vec<String>,
}
// This function updates specific fields in the Redis hash. // This function updates specific fields in the Redis hash.
// It doesn't need to know the full RhaiTaskDetails struct, only the field names. // It doesn't need to know the full RhaiTaskDetails struct, only the field names.
async fn update_task_status_in_redis( async fn update_task_status_in_redis(
@ -35,110 +24,209 @@ async fn update_task_status_in_redis(
let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id); let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id);
let mut updates: Vec<(&str, String)> = vec![ let mut updates: Vec<(&str, String)> = vec![
("status", status.to_string()), ("status", status.to_string()),
("updatedAt", Utc::now().to_rfc3339()), // Ensure this field name matches what rhai_client sets/expects ("updatedAt", Utc::now().timestamp().to_string()),
]; ];
if let Some(out) = output { if let Some(out) = output {
updates.push(("output", out)); // Ensure this field name matches updates.push(("output", out));
} }
if let Some(err) = error_msg { if let Some(err) = error_msg {
updates.push(("error", err)); // Ensure this field name matches updates.push(("error", err));
} }
debug!("Updating task {} in Redis with status: {}, updates: {:?}", task_id, status, updates); debug!("Updating task {} in Redis with status: {}, updates: {:?}", task_id, status, updates);
conn.hset_multiple::<_, _, _, ()>(&task_key, &updates).await?; conn.hset_multiple::<_, _, _, ()>(&task_key, &updates).await?;
Ok(()) Ok(())
} }
pub async fn run_worker_loop(engine: Engine, args: Args) -> Result<(), Box<dyn std::error::Error>> { pub fn spawn_rhai_worker(
info!("Rhai Worker Loop starting. Connecting to Redis at {}", args.redis_url); _circle_id: u32, // For logging or specific logic if needed in the future
info!("Worker Loop will listen for tasks for circles: {:?}", args.circles); circle_name: String,
engine: Engine,
redis_url: String,
mut shutdown_rx: mpsc::Receiver<()>, // Add shutdown receiver
preserve_tasks: bool, // Flag to control task cleanup
) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
tokio::spawn(async move {
let queue_key = format!("{}{}", REDIS_QUEUE_PREFIX, circle_name.replace(" ", "_").to_lowercase());
info!(
"Rhai Worker for Circle '{}' starting. Connecting to Redis at {}. Listening on queue: {}. Waiting for tasks or shutdown signal.",
circle_name, redis_url, queue_key
);
let redis_client = redis::Client::open(args.redis_url.as_str())?; let redis_client = match redis::Client::open(redis_url.as_str()) {
let mut redis_conn = redis_client.get_multiplexed_async_connection().await?; Ok(client) => client,
info!("Worker Loop successfully connected to Redis."); Err(e) => {
error!("Worker for Circle '{}': Failed to open Redis client: {}", circle_name, e);
let queue_keys: Vec<String> = args return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
.circles }
.iter() };
.map(|name| format!("{}{}", REDIS_QUEUE_PREFIX, name.replace(" ", "_").to_lowercase())) let mut redis_conn = match redis_client.get_multiplexed_async_connection().await {
.collect(); Ok(conn) => conn,
Err(e) => {
info!("Worker Loop listening on Redis queues: {:?}", queue_keys); error!("Worker for Circle '{}': Failed to get Redis connection: {}", circle_name, e);
return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
}
};
info!("Worker for Circle '{}' successfully connected to Redis.", circle_name);
loop { loop {
let response: Option<(String, String)> = redis_conn let blpop_keys = vec![queue_key.clone()];
.blpop(&queue_keys, BLPOP_TIMEOUT_SECONDS as f64) tokio::select! {
.await?; // Listen for shutdown signal
_ = shutdown_rx.recv() => {
info!("Worker for Circle '{}': Shutdown signal received. Terminating loop.", circle_name);
break;
}
// Listen for tasks from Redis
blpop_result = redis_conn.blpop(&blpop_keys, BLPOP_TIMEOUT_SECONDS as f64) => {
debug!("Worker for Circle '{}': Attempting BLPOP on queue: {}", circle_name, queue_key);
let response: Option<(String, String)> = match blpop_result {
Ok(resp) => resp,
Err(e) => {
error!("Worker for Circle '{}': Redis BLPOP error on queue {}: {}. Worker for this circle might stop.", circle_name, queue_key, e);
return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
}
};
if let Some((queue_name, task_id)) = response { if let Some((_queue_name_recv, task_id)) = response {
info!("Worker Loop received task_id: {} from queue: {}", task_id, queue_name); info!("Worker for Circle '{}' received task_id: {} from queue: {}", circle_name, task_id, _queue_name_recv);
debug!("Worker for Circle '{}', Task {}: Processing started.", circle_name, task_id);
let task_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id); let task_details_key = format!("{}{}", REDIS_TASK_DETAILS_PREFIX, task_id);
debug!("Worker for Circle '{}', Task {}: Attempting HGETALL from key: {}", circle_name, task_id, task_details_key);
let task_details_map: Result<HashMap<String, String>, _> = let task_details_map_result: Result<HashMap<String, String>, _> =
redis_conn.hgetall(&task_key).await; redis_conn.hgetall(&task_details_key).await;
match task_details_map { match task_details_map_result {
Ok(details_map) => { Ok(details_map) => {
debug!("Worker for Circle '{}', Task {}: HGETALL successful. Details: {:?}", circle_name, task_id, details_map);
let script_content_opt = details_map.get("script").cloned(); let script_content_opt = details_map.get("script").cloned();
let reply_to_queue_opt = details_map.get("replyToQueue").cloned();
let client_rpc_id_str_opt = details_map.get("clientRpcId").cloned();
let created_at_str_opt = details_map.get("createdAt").cloned();
if let Some(script_content) = script_content_opt { if let Some(script_content) = script_content_opt {
info!("Worker Loop processing task_id: {}. Script: {:.50}...", task_id, script_content); info!("Worker for Circle '{}' processing task_id: {}. Script: {:.50}...", circle_name, task_id, script_content);
update_task_status_in_redis(&mut redis_conn, &task_id, "processing", None, None).await?; debug!("Worker for Circle '{}', Task {}: Attempting to update status to 'processing'.", circle_name, task_id);
if let Err(e) = update_task_status_in_redis(&mut redis_conn, &task_id, "processing", None, None).await {
error!("Worker for Circle '{}', Task {}: Failed to update status to 'processing': {}", circle_name, task_id, e);
} else {
debug!("Worker for Circle '{}', Task {}: Status updated to 'processing'.", circle_name, task_id);
}
let mut scope = Scope::new(); let mut scope = Scope::new();
// Examples can show how to pre-populate the scope via the engine or here debug!("Worker for Circle '{}', Task {}: Evaluating script with Rhai engine.", circle_name, task_id);
let mut final_status = "error".to_string(); // Default to error
let mut final_output: Option<String> = None;
let mut final_error_msg: Option<String> = None;
match engine.eval_with_scope::<rhai::Dynamic>(&mut scope, &script_content) { match engine.eval_with_scope::<rhai::Dynamic>(&mut scope, &script_content) {
Ok(result) => { Ok(result) => {
let output_str = format!("{:?}", result); let output_str = if result.is::<String>() {
info!("Worker Loop task {} completed. Output: {}", task_id, output_str); // If the result is a string, we can unwrap it directly.
update_task_status_in_redis( // This moves `result`, which is fine because it's the last time we use it in this branch.
&mut redis_conn, result.into_string().unwrap()
&task_id, } else {
"completed", result.to_string()
Some(output_str), };
None, info!("Worker for Circle '{}' task {} completed. Output: {}", circle_name, task_id, output_str);
) final_status = "completed".to_string();
.await?; final_output = Some(output_str);
} }
Err(e) => { Err(e) => {
let error_str = format!("{:?}", *e); // Dereference EvalAltResult let error_str = format!("{:?}", *e);
error!("Worker Loop task {} failed. Error: {}", task_id, error_str); error!("Worker for Circle '{}' task {} script evaluation failed. Error: {}", circle_name, task_id, error_str);
update_task_status_in_redis( final_error_msg = Some(error_str);
// final_status remains "error"
}
}
debug!("Worker for Circle '{}', Task {}: Attempting to update status to '{}'.", circle_name, task_id, final_status);
if let Err(e) = update_task_status_in_redis(
&mut redis_conn, &mut redis_conn,
&task_id, &task_id,
"error", &final_status,
None, final_output.clone(), // Clone for task hash update
Some(error_str), final_error_msg.clone(), // Clone for task hash update
) ).await {
.await?; error!("Worker for Circle '{}', Task {}: Failed to update final status to '{}': {}", circle_name, task_id, final_status, e);
} else {
debug!("Worker for Circle '{}', Task {}: Final status updated to '{}'.", circle_name, task_id, final_status);
} }
// Send to reply queue if specified
if let Some(reply_q) = reply_to_queue_opt {
let client_rpc_id = client_rpc_id_str_opt.and_then(|s| serde_json::from_str(&s).ok());
let created_at = created_at_str_opt
.and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(Utc::now); // Fallback, though createdAt should exist
let reply_details = RhaiTaskDetails {
script: script_content.clone(), // Include script for context in reply
status: final_status, // The final status
client_rpc_id,
output: final_output, // The final output
error: final_error_msg, // The final error
created_at, // Original creation time
updated_at: Utc::now(), // Time of this final update/reply
reply_to_queue: None, // This field is not relevant for the message content itself
};
match serde_json::to_string(&reply_details) {
Ok(reply_json) => {
let lpush_result: redis::RedisResult<i64> = redis_conn.lpush(&reply_q, &reply_json).await;
match lpush_result {
Ok(_) => debug!("Worker for Circle '{}', Task {}: Successfully sent result to reply queue {}", circle_name, task_id, reply_q),
Err(e_lpush) => error!("Worker for Circle '{}', Task {}: Failed to LPUSH result to reply queue {}: {}", circle_name, task_id, reply_q, e_lpush),
}
}
Err(e_json) => {
error!("Worker for Circle '{}', Task {}: Failed to serialize reply details for queue {}: {}", circle_name, task_id, reply_q, e_json);
}
}
}
// Clean up task details based on preserve_tasks flag
if !preserve_tasks {
// The worker is responsible for cleaning up the task details hash.
if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await {
error!("Worker for Circle '{}', Task {}: Failed to delete task details key '{}': {}", circle_name, task_id, task_details_key, e);
} else {
debug!("Worker for Circle '{}', Task {}: Cleaned up task details key '{}'.", circle_name, task_id, task_details_key);
} }
} else { } else {
debug!("Worker for Circle '{}', Task {}: Preserving task details (preserve_tasks=true)", circle_name, task_id);
}
} else { // Script content not found in hash
error!( error!(
"Worker Loop: Could not find script content for task_id: {} in Redis hash: {}", "Worker for Circle '{}', Task {}: Script content not found in Redis hash. Details map: {:?}",
task_id, task_key circle_name, task_id, details_map
); );
update_task_status_in_redis( // Clean up invalid task details based on preserve_tasks flag
&mut redis_conn, if !preserve_tasks {
&task_id, // Even if the script is not found, the worker should clean up the invalid task hash.
"error", if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await {
None, error!("Worker for Circle '{}', Task {}: Failed to delete invalid task details key '{}': {}", circle_name, task_id, task_details_key, e);
Some("Script content not found in Redis hash".to_string()), }
) } else {
.await?; debug!("Worker for Circle '{}', Task {}: Preserving invalid task details (preserve_tasks=true)", circle_name, task_id);
}
} }
} }
Err(e) => { Err(e) => {
error!( error!(
"Worker Loop: Failed to fetch details for task_id: {} from Redis. Error: {:?}", "Worker for Circle '{}', Task {}: Failed to fetch details (HGETALL) from Redis for key {}. Error: {:?}",
task_id, e circle_name, task_id, task_details_key, e
); );
} }
} }
} else { } else {
debug!("Worker Loop: BLPOP timed out. No new tasks."); debug!("Worker for Circle '{}': BLPOP timed out on queue {}. No new tasks. Checking for shutdown signal again.", circle_name, queue_key);
} }
} } // End of blpop_result match
// Loop is infinite, Ok(()) is effectively unreachable unless loop breaks } // End of tokio::select!
} // End of loop
info!("Worker for Circle '{}' has shut down.", circle_name);
Ok(())
})
} }

158
start_ws_servers.sh Executable file
View File

@ -0,0 +1,158 @@
#!/bin/bash
# Exit immediately if a command exits with a non-zero status.
# set -e # We will handle errors manually for cleanup
# Instead of set -e, we'll check command statuses and exit if needed after attempting cleanup.
# Default to not force killing processes
FORCE_KILL=false
# Parse command-line options
while getopts "f" opt; do
case ${opt} in
f )
FORCE_KILL=true
;;
\? )
echo "Invalid option: -$OPTARG" 1>&2
exit 1
;;
esac
done
shift $((OPTIND -1))
# Array to store PIDs of background processes
BG_PIDS=()
# Cleanup function
cleanup() {
echo "Caught signal, cleaning up background processes..."
for pid in "${BG_PIDS[@]}"; do
if ps -p "$pid" > /dev/null; then # Check if process exists
echo "Stopping process $pid..."
kill "$pid"
fi
done
# Wait for all background processes to terminate
for pid in "${BG_PIDS[@]}"; do
if ps -p "$pid" > /dev/null; then
wait "$pid" 2>/dev/null # Suppress "No such process" if already gone
fi
done
echo "All background processes stopped."
exit 0 # Exit script after cleanup
}
# Trap SIGINT (Ctrl+C) and SIGTERM
trap cleanup SIGINT SIGTERM
# Define circles and their base port
# The client will need to know these port assignments.
# Circle names should match what's in your mock data for consistency,
# but for the WS server, it's what the server identifies itself as.
# The client will use the lowercase_with_underscores version for the path.
# Define circles and their ports using indexed arrays
CIRCLE_NAMES=(
"OurWorld"
"My Personal Space"
"threefold"
"circles_app"
)
CIRCLE_PORTS=(
"9000"
"9001"
"9002"
"9003"
)
# Add more circles and their ports here if needed, ensuring arrays match
# Build the WebSocket server first
echo "Building circle_server_ws..."
cargo build --package circle_server_ws
if [ $? -ne 0 ]; then echo "Failed to build circle_server_ws"; cleanup; exit 1; fi
echo "Building rhai_worker..."
cargo build --package rhai_worker
if [ $? -ne 0 ]; then echo "Failed to build rhai_worker"; cleanup; exit 1; fi
# Paths to the compiled binaries
WS_SERVER_BINARY="./target/debug/circle_server_ws"
RHAI_WORKER_BINARY="./target/debug/rhai_worker"
if [ ! -f "$WS_SERVER_BINARY" ]; then
echo "Error: WebSocket server binary not found at $WS_SERVER_BINARY after build."
cleanup
exit 1
fi
if [ ! -f "$RHAI_WORKER_BINARY" ]; then
echo "Error: Rhai worker binary not found at $RHAI_WORKER_BINARY after build."
cleanup
exit 1
fi
echo "Starting WebSocket servers..."
for i in "${!CIRCLE_NAMES[@]}"; do
NAME="${CIRCLE_NAMES[i]}"
PORT="${CIRCLE_PORTS[i]}"
if [ "$FORCE_KILL" = true ]; then
echo "Checking if port $PORT is in use (force mode)..."
# lsof -i :<port> -t lists PIDs listening on the port
# The output might be empty or multiple PIDs.
# We'll kill any PID found.
PIDS_ON_PORT=$(lsof -i ":$PORT" -t 2>/dev/null || true) # Suppress lsof error if port not in use, || true ensures command doesn't fail script
if [ -n "$PIDS_ON_PORT" ]; then
for PID_TO_KILL in $PIDS_ON_PORT; do
echo "Port $PORT is in use by PID $PID_TO_KILL. Forcing kill..."
kill -9 "$PID_TO_KILL" # Force kill
# Add a small delay to give the OS time to free the port
sleep 0.5
done
else
echo "Port $PORT is free."
fi
fi
# The circle name passed to the server is the "identity" name.
# The client will still connect to ws://localhost:PORT/ws
echo "Starting server for '$NAME' on port $PORT..."
# Run in background
"$WS_SERVER_BINARY" --port "$PORT" --circle-name "$NAME" &
BG_PIDS+=($!) # Store PID of the last backgrounded process
done
echo "All WebSocket servers launched."
# Prepare circle names for rhai_worker
# It expects names like "OurWorld", "My Personal Space"
# We can directly use the CIRCLE_NAMES array
echo "Starting Rhai worker for circles: ${CIRCLE_NAMES[@]}..."
# Run rhai_worker in the background
# Assuming default Redis URL redis://127.0.0.1/
"$RHAI_WORKER_BINARY" --circles "${CIRCLE_NAMES[@]}" &
BG_PIDS+=($!) # Store PID of the Rhai worker
echo "Rhai worker launched."
echo "All processes launched. Press Ctrl+C to stop all servers and the worker."
# Wait for all background PIDs.
# If any of them exit prematurely, this script will also exit.
# The trap will handle cleanup if Ctrl+C is pressed.
for pid in "${BG_PIDS[@]}"; do
wait "$pid"
# If a process exits with an error, its exit code will be propagated by wait.
# The script will then exit due to `wait` itself exiting with that code.
# The trap should still run on SIGINT/SIGTERM.
# For other signals or unexpected exits, the trap might not run.
# More robust error handling for individual process failures could be added here.
done
# If all processes exited cleanly (e.g., were killed by the trap),
# the script will reach here. The trap's exit 0 will handle this.
# If they exited due to an error, `wait` would have exited the script.
cleanup # Call cleanup if all jobs finished normally (e.g. if they self-terminate)