move rhailib to herolib
This commit is contained in:
259
rhailib/_archive/worker/src/lib.rs
Normal file
259
rhailib/_archive/worker/src/lib.rs
Normal file
@@ -0,0 +1,259 @@
|
||||
use chrono::Utc;
|
||||
use log::{debug, error, info};
|
||||
use redis::AsyncCommands;
|
||||
use rhai::{Dynamic, Engine};
|
||||
use rhai_dispatcher::RhaiTaskDetails; // Import for constructing the reply message
|
||||
use serde_json;
|
||||
use std::collections::HashMap;
|
||||
use tokio::sync::mpsc; // For shutdown signal
|
||||
use tokio::task::JoinHandle; // For serializing the reply message
|
||||
|
||||
const NAMESPACE_PREFIX: &str = "rhailib:";
|
||||
const BLPOP_TIMEOUT_SECONDS: usize = 5;
|
||||
|
||||
// This function updates specific fields in the Redis hash.
|
||||
// It doesn't need to know the full RhaiTaskDetails struct, only the field names.
|
||||
async fn update_task_status_in_redis(
|
||||
conn: &mut redis::aio::MultiplexedConnection,
|
||||
task_id: &str,
|
||||
status: &str,
|
||||
output: Option<String>,
|
||||
error_msg: Option<String>,
|
||||
) -> redis::RedisResult<()> {
|
||||
let task_key = format!("{}{}", NAMESPACE_PREFIX, task_id);
|
||||
let mut updates: Vec<(&str, String)> = vec![
|
||||
("status", status.to_string()),
|
||||
("updatedAt", Utc::now().timestamp().to_string()),
|
||||
];
|
||||
if let Some(out) = output {
|
||||
updates.push(("output", out));
|
||||
}
|
||||
if let Some(err) = error_msg {
|
||||
updates.push(("error", err));
|
||||
}
|
||||
debug!(
|
||||
"Updating task {} in Redis with status: {}, updates: {:?}",
|
||||
task_id, status, updates
|
||||
);
|
||||
conn.hset_multiple::<_, _, _, ()>(&task_key, &updates)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn spawn_rhai_worker(
|
||||
worker_id: String,
|
||||
db_path: String,
|
||||
mut engine: Engine,
|
||||
redis_url: String,
|
||||
mut shutdown_rx: mpsc::Receiver<()>, // Add shutdown receiver
|
||||
preserve_tasks: bool, // Flag to control task cleanup
|
||||
) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
|
||||
tokio::spawn(async move {
|
||||
let queue_key = format!("{}{}", NAMESPACE_PREFIX, worker_id);
|
||||
info!(
|
||||
"Rhai Worker for Worker ID '{}' starting. Connecting to Redis at {}. Listening on queue: {}. Waiting for tasks or shutdown signal.",
|
||||
worker_id, redis_url, queue_key
|
||||
);
|
||||
|
||||
let redis_client = match redis::Client::open(redis_url.as_str()) {
|
||||
Ok(client) => client,
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Worker for Worker ID '{}': Failed to open Redis client: {}",
|
||||
worker_id, e
|
||||
);
|
||||
return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
|
||||
}
|
||||
};
|
||||
let mut redis_conn = match redis_client.get_multiplexed_async_connection().await {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Worker for Worker ID '{}': Failed to get Redis connection: {}",
|
||||
worker_id, e
|
||||
);
|
||||
return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
|
||||
}
|
||||
};
|
||||
info!(
|
||||
"Worker for Worker ID '{}' successfully connected to Redis.",
|
||||
worker_id
|
||||
);
|
||||
|
||||
loop {
|
||||
let blpop_keys = vec![queue_key.clone()];
|
||||
tokio::select! {
|
||||
// Listen for shutdown signal
|
||||
_ = shutdown_rx.recv() => {
|
||||
info!("Worker for Worker ID '{}': Shutdown signal received. Terminating loop.", worker_id.clone());
|
||||
break;
|
||||
}
|
||||
// Listen for tasks from Redis
|
||||
blpop_result = redis_conn.blpop(&blpop_keys, BLPOP_TIMEOUT_SECONDS as f64) => {
|
||||
debug!("Worker for Worker ID '{}': Attempting BLPOP on queue: {}", worker_id.clone(), queue_key);
|
||||
let response: Option<(String, String)> = match blpop_result {
|
||||
Ok(resp) => resp,
|
||||
Err(e) => {
|
||||
error!("Worker '{}': Redis BLPOP error on queue {}: {}. Worker for this circle might stop.", worker_id, queue_key, e);
|
||||
return Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
|
||||
}
|
||||
};
|
||||
|
||||
if let Some((_queue_name_recv, task_id)) = response {
|
||||
info!("Worker '{}' received task_id: {} from queue: {}", worker_id, task_id, _queue_name_recv);
|
||||
debug!("Worker '{}', Task {}: Processing started.", worker_id, task_id);
|
||||
|
||||
let task_details_key = format!("{}{}", NAMESPACE_PREFIX, task_id);
|
||||
debug!("Worker '{}', Task {}: Attempting HGETALL from key: {}", worker_id, task_id, task_details_key);
|
||||
|
||||
let task_details_map_result: Result<HashMap<String, String>, _> =
|
||||
redis_conn.hgetall(&task_details_key).await;
|
||||
|
||||
match task_details_map_result {
|
||||
Ok(details_map) => {
|
||||
debug!("Worker '{}', Task {}: HGETALL successful. Details: {:?}", worker_id, task_id, details_map);
|
||||
let script_content_opt = details_map.get("script").cloned();
|
||||
let created_at_str_opt = details_map.get("createdAt").cloned();
|
||||
let caller_id = details_map.get("callerId").cloned().expect("callerId field missing from Redis hash");
|
||||
|
||||
let context_id = details_map.get("contextId").cloned().expect("contextId field missing from Redis hash");
|
||||
if context_id.is_empty() {
|
||||
error!("Worker '{}', Task {}: contextId field missing from Redis hash", worker_id, task_id);
|
||||
return Err("contextId field missing from Redis hash".into());
|
||||
}
|
||||
if caller_id.is_empty() {
|
||||
error!("Worker '{}', Task {}: callerId field missing from Redis hash", worker_id, task_id);
|
||||
return Err("callerId field missing from Redis hash".into());
|
||||
}
|
||||
|
||||
if let Some(script_content) = script_content_opt {
|
||||
info!("Worker '{}' processing task_id: {}. Script: {:.50}...", context_id, task_id, script_content);
|
||||
debug!("Worker for Context ID '{}', Task {}: Attempting to update status to 'processing'.", context_id, task_id);
|
||||
if let Err(e) = update_task_status_in_redis(&mut redis_conn, &task_id, "processing", None, None).await {
|
||||
error!("Worker for Context ID '{}', Task {}: Failed to update status to 'processing': {}", context_id, task_id, e);
|
||||
} else {
|
||||
debug!("Worker for Context ID '{}', Task {}: Status updated to 'processing'.", context_id, task_id);
|
||||
}
|
||||
|
||||
let mut db_config = rhai::Map::new();
|
||||
db_config.insert("DB_PATH".into(), db_path.clone().into());
|
||||
db_config.insert("CALLER_ID".into(), caller_id.clone().into());
|
||||
db_config.insert("CONTEXT_ID".into(), context_id.clone().into());
|
||||
engine.set_default_tag(Dynamic::from(db_config)); // Or pass via CallFnOptions
|
||||
|
||||
debug!("Worker for Context ID '{}', Task {}: Evaluating script with Rhai engine.", context_id, task_id);
|
||||
|
||||
let mut final_status = "error".to_string(); // Default to error
|
||||
let mut final_output: Option<String> = None;
|
||||
let mut final_error_msg: Option<String> = None;
|
||||
|
||||
match engine.eval::<rhai::Dynamic>(&script_content) {
|
||||
Ok(result) => {
|
||||
let output_str = if result.is::<String>() {
|
||||
// If the result is a string, we can unwrap it directly.
|
||||
// This moves `result`, which is fine because it's the last time we use it in this branch.
|
||||
result.into_string().unwrap()
|
||||
} else {
|
||||
result.to_string()
|
||||
};
|
||||
info!("Worker for Context ID '{}' task {} completed. Output: {}", context_id, task_id, output_str);
|
||||
final_status = "completed".to_string();
|
||||
final_output = Some(output_str);
|
||||
}
|
||||
Err(e) => {
|
||||
let error_str = format!("{:?}", *e);
|
||||
error!("Worker for Context ID '{}' task {} script evaluation failed. Error: {}", context_id, task_id, error_str);
|
||||
final_error_msg = Some(error_str);
|
||||
// final_status remains "error"
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Worker for Context ID '{}', Task {}: Attempting to update status to '{}'.", context_id, task_id, final_status);
|
||||
if let Err(e) = update_task_status_in_redis(
|
||||
&mut redis_conn,
|
||||
&task_id,
|
||||
&final_status,
|
||||
final_output.clone(), // Clone for task hash update
|
||||
final_error_msg.clone(), // Clone for task hash update
|
||||
).await {
|
||||
error!("Worker for Context ID '{}', Task {}: Failed to update final status to '{}': {}", context_id, task_id, final_status, e);
|
||||
} else {
|
||||
debug!("Worker for Context ID '{}', Task {}: Final status updated to '{}'.", context_id, task_id, final_status);
|
||||
}
|
||||
|
||||
// Send to reply queue if specified
|
||||
|
||||
let created_at = created_at_str_opt
|
||||
.and_then(|s| chrono::DateTime::parse_from_rfc3339(&s).ok())
|
||||
.map(|dt| dt.with_timezone(&Utc))
|
||||
.unwrap_or_else(Utc::now); // Fallback, though createdAt should exist
|
||||
|
||||
let reply_details = RhaiTaskDetails {
|
||||
task_id: task_id.to_string(), // Add the task_id
|
||||
script: script_content.clone(), // Include script for context in reply
|
||||
status: final_status, // The final status
|
||||
output: final_output, // The final output
|
||||
error: final_error_msg, // The final error
|
||||
created_at, // Original creation time
|
||||
updated_at: Utc::now(), // Time of this final update/reply
|
||||
caller_id: caller_id.clone(),
|
||||
context_id: context_id.clone(),
|
||||
worker_id: worker_id.clone(),
|
||||
};
|
||||
let reply_queue_key = format!("{}:reply:{}", NAMESPACE_PREFIX, task_id);
|
||||
match serde_json::to_string(&reply_details) {
|
||||
Ok(reply_json) => {
|
||||
let lpush_result: redis::RedisResult<i64> = redis_conn.lpush(&reply_queue_key, &reply_json).await;
|
||||
match lpush_result {
|
||||
Ok(_) => debug!("Worker for Context ID '{}', Task {}: Successfully sent result to reply queue {}", context_id, task_id, reply_queue_key),
|
||||
Err(e_lpush) => error!("Worker for Context ID '{}', Task {}: Failed to LPUSH result to reply queue {}: {}", context_id, task_id, reply_queue_key, e_lpush),
|
||||
}
|
||||
}
|
||||
Err(e_json) => {
|
||||
error!("Worker for Context ID '{}', Task {}: Failed to serialize reply details for queue {}: {}", context_id, task_id, reply_queue_key, e_json);
|
||||
}
|
||||
}
|
||||
// Clean up task details based on preserve_tasks flag
|
||||
if !preserve_tasks {
|
||||
// The worker is responsible for cleaning up the task details hash.
|
||||
if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await {
|
||||
error!("Worker for Context ID '{}', Task {}: Failed to delete task details key '{}': {}", context_id, task_id, task_details_key, e);
|
||||
} else {
|
||||
debug!("Worker for Context ID '{}', Task {}: Cleaned up task details key '{}'.", context_id, task_id, task_details_key);
|
||||
}
|
||||
} else {
|
||||
debug!("Worker for Context ID '{}', Task {}: Preserving task details (preserve_tasks=true)", context_id, task_id);
|
||||
}
|
||||
} else { // Script content not found in hash
|
||||
error!(
|
||||
"Worker for Context ID '{}', Task {}: Script content not found in Redis hash. Details map: {:?}",
|
||||
context_id, task_id, details_map
|
||||
);
|
||||
// Clean up invalid task details based on preserve_tasks flag
|
||||
if !preserve_tasks {
|
||||
// Even if the script is not found, the worker should clean up the invalid task hash.
|
||||
if let Err(e) = redis_conn.del::<_, ()>(&task_details_key).await {
|
||||
error!("Worker for Context ID '{}', Task {}: Failed to delete invalid task details key '{}': {}", context_id, task_id, task_details_key, e);
|
||||
}
|
||||
} else {
|
||||
debug!("Worker for Context ID '{}', Task {}: Preserving invalid task details (preserve_tasks=true)", context_id, task_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Worker '{}', Task {}: Failed to fetch details (HGETALL) from Redis for key {}. Error: {:?}",
|
||||
worker_id, task_id, task_details_key, e
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
debug!("Worker '{}': BLPOP timed out on queue {}. No new tasks. Checking for shutdown signal again.", &worker_id, &queue_key);
|
||||
}
|
||||
} // End of blpop_result match
|
||||
} // End of tokio::select!
|
||||
} // End of loop
|
||||
info!("Worker '{}' has shut down.", worker_id);
|
||||
Ok(())
|
||||
})
|
||||
}
|
Reference in New Issue
Block a user