Merge branch 'main' of https://git.ourworld.tf/herocode/hero
This commit is contained in:
@@ -1,4 +1,34 @@
|
||||
use hero_job::Job;
|
||||
//! # Actor Trait Abstraction
|
||||
//!
|
||||
//! This module provides a trait-based abstraction for Rhai actors that eliminates
|
||||
//! code duplication between synchronous and asynchronous actor implementations.
|
||||
//!
|
||||
//! The `Actor` trait defines the common interface and behavior, while specific
|
||||
//! implementations handle job processing differently (sync vs async).
|
||||
//!
|
||||
//! ## Architecture
|
||||
//!
|
||||
//! ```text
|
||||
//! ┌─────────────────┐ ┌─────────────────┐
|
||||
//! │ SyncActor │ │ AsyncActor │
|
||||
//! │ │ │ │
|
||||
//! │ process_job() │ │ process_job() │
|
||||
//! │ (sequential) │ │ (concurrent) │
|
||||
//! └─────────────────┘ └─────────────────┘
|
||||
//! │ │
|
||||
//! └───────┬───────────────┘
|
||||
//! │
|
||||
//! ┌───────▼───────┐
|
||||
//! │ Actor Trait │
|
||||
//! │ │
|
||||
//! │ spawn() │
|
||||
//! │ config │
|
||||
//! │ common loop │
|
||||
//! └───────────────┘
|
||||
//! ```
|
||||
|
||||
use hero_job::{Job, ScriptType};
|
||||
use hero_job::keys;
|
||||
use log::{debug, error, info};
|
||||
use redis::AsyncCommands;
|
||||
|
||||
@@ -7,7 +37,7 @@ use std::time::Duration;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use crate::{initialize_redis_connection, NAMESPACE_PREFIX, BLPOP_TIMEOUT_SECONDS};
|
||||
use crate::{initialize_redis_connection, BLPOP_TIMEOUT_SECONDS};
|
||||
|
||||
/// Configuration for actor instances
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -91,11 +121,14 @@ pub trait Actor: Send + Sync + 'static {
|
||||
tokio::spawn(async move {
|
||||
let actor_id = self.actor_id();
|
||||
let redis_url = self.redis_url();
|
||||
let queue_key = format!("{}{}", NAMESPACE_PREFIX, actor_id);
|
||||
// Canonical work queue based on script type (instance/group selection can be added later)
|
||||
let script_type = derive_script_type_from_actor_id(actor_id);
|
||||
let queue_key = keys::work_type(&script_type);
|
||||
info!(
|
||||
"{} Actor '{}' starting. Connecting to Redis at {}. Listening on queue: {}",
|
||||
"{} Actor '{}' starting. Type {:?}. Connecting to Redis at {}. Listening on queue: {}",
|
||||
self.actor_type(),
|
||||
actor_id,
|
||||
script_type,
|
||||
redis_url,
|
||||
queue_key
|
||||
);
|
||||
@@ -222,78 +255,18 @@ pub fn spawn_actor<W: Actor>(
|
||||
actor.spawn(shutdown_rx)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::engine::create_heromodels_engine;
|
||||
|
||||
// Mock actor for testing
|
||||
struct MockActor;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Actor for MockActor {
|
||||
async fn process_job(
|
||||
&self,
|
||||
_job: Job,
|
||||
_redis_conn: &mut redis::aio::MultiplexedConnection,
|
||||
) {
|
||||
// Mock implementation - do nothing
|
||||
// Engine would be owned by the actor implementation as a field
|
||||
}
|
||||
|
||||
fn actor_type(&self) -> &'static str {
|
||||
"Mock"
|
||||
}
|
||||
|
||||
fn actor_id(&self) -> &str {
|
||||
"mock_actor"
|
||||
}
|
||||
|
||||
fn redis_url(&self) -> &str {
|
||||
"redis://localhost:6379"
|
||||
}
|
||||
fn derive_script_type_from_actor_id(actor_id: &str) -> ScriptType {
|
||||
let lower = actor_id.to_lowercase();
|
||||
if lower.contains("sal") {
|
||||
ScriptType::SAL
|
||||
} else if lower.contains("osis") {
|
||||
ScriptType::OSIS
|
||||
} else if lower.contains("python") {
|
||||
ScriptType::Python
|
||||
} else if lower.contains("v") {
|
||||
ScriptType::V
|
||||
} else {
|
||||
// Default to OSIS when uncertain
|
||||
ScriptType::OSIS
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_actor_config_creation() {
|
||||
let config = ActorConfig::new(
|
||||
"test_actor".to_string(),
|
||||
"/tmp".to_string(),
|
||||
"redis://localhost:6379".to_string(),
|
||||
false,
|
||||
);
|
||||
|
||||
assert_eq!(config.actor_id, "test_actor");
|
||||
assert_eq!(config.db_path, "/tmp");
|
||||
assert_eq!(config.redis_url, "redis://localhost:6379");
|
||||
assert!(!config.preserve_tasks);
|
||||
assert!(config.default_timeout.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_actor_config_with_timeout() {
|
||||
let timeout = Duration::from_secs(300);
|
||||
let config = ActorConfig::new(
|
||||
"test_actor".to_string(),
|
||||
"/tmp".to_string(),
|
||||
"redis://localhost:6379".to_string(),
|
||||
false,
|
||||
).with_default_timeout(timeout);
|
||||
|
||||
assert_eq!(config.default_timeout, Some(timeout));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_spawn_actor_function() {
|
||||
let (_shutdown_tx, shutdown_rx) = mpsc::channel(1);
|
||||
let actor = Arc::new(MockActor);
|
||||
|
||||
let handle = spawn_actor(actor, shutdown_rx);
|
||||
|
||||
// The actor should be created successfully
|
||||
assert!(!handle.is_finished());
|
||||
|
||||
// Abort the actor for cleanup
|
||||
handle.abort();
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,4 +1,5 @@
|
||||
use hero_job::{Job, JobStatus};
|
||||
use hero_job::{Job, JobStatus, ScriptType};
|
||||
use hero_job::keys;
|
||||
use log::{debug, error, info};
|
||||
use redis::AsyncCommands;
|
||||
use rhai::{Dynamic, Engine};
|
||||
@@ -217,10 +218,11 @@ pub fn spawn_rhai_actor(
|
||||
preserve_tasks: bool,
|
||||
) -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>> {
|
||||
tokio::spawn(async move {
|
||||
let queue_key = format!("{}{}", NAMESPACE_PREFIX, actor_id);
|
||||
let script_type = derive_script_type_from_actor_id(&actor_id);
|
||||
let queue_key = keys::work_type(&script_type);
|
||||
info!(
|
||||
"Rhai Actor for Actor ID '{}' starting. Connecting to Redis at {}. Listening on queue: {}. Waiting for tasks or shutdown signal.",
|
||||
actor_id, redis_url, queue_key
|
||||
"Rhai Actor '{}' starting. Type {:?}. Connecting to Redis at {}. Listening on queue: {}. Waiting for tasks or shutdown signal.",
|
||||
actor_id, script_type, redis_url, queue_key
|
||||
);
|
||||
|
||||
let mut redis_conn = initialize_redis_connection(&actor_id, &redis_url).await?;
|
||||
@@ -259,6 +261,23 @@ pub fn spawn_rhai_actor(
|
||||
})
|
||||
}
|
||||
|
||||
// Helper to derive script type from actor_id for canonical queue selection
|
||||
fn derive_script_type_from_actor_id(actor_id: &str) -> ScriptType {
|
||||
let lower = actor_id.to_lowercase();
|
||||
if lower.contains("sal") {
|
||||
ScriptType::SAL
|
||||
} else if lower.contains("osis") {
|
||||
ScriptType::OSIS
|
||||
} else if lower.contains("python") {
|
||||
ScriptType::Python
|
||||
} else if lower == "v" || lower.contains(":v") || lower.contains(" v") {
|
||||
ScriptType::V
|
||||
} else {
|
||||
// Default to OSIS when uncertain
|
||||
ScriptType::OSIS
|
||||
}
|
||||
}
|
||||
|
||||
// Re-export the main trait-based interface for convenience
|
||||
pub use actor_trait::{Actor, ActorConfig, spawn_actor};
|
||||
|
||||
|
@@ -10,6 +10,7 @@ use crossterm::{
|
||||
execute,
|
||||
};
|
||||
use hero_job::{Job, JobStatus, ScriptType};
|
||||
use hero_job::keys;
|
||||
|
||||
use ratatui::{
|
||||
backend::{Backend, CrosstermBackend},
|
||||
@@ -457,9 +458,9 @@ impl App {
|
||||
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
|
||||
job.store_in_redis(&mut conn).await?;
|
||||
|
||||
// Add to work queue
|
||||
let queue_name = format!("hero:job:actor_queue:{}", self.actor_id.to_lowercase());
|
||||
let _: () = conn.lpush(&queue_name, &job_id).await?;
|
||||
// Add to work queue (canonical type queue)
|
||||
let queue_name = keys::work_type(&self.job_form.script_type);
|
||||
let _: () = conn.lpush(&queue_name, &job.id).await?;
|
||||
|
||||
self.status_message = Some(format!("Job {} dispatched successfully", job_id));
|
||||
|
||||
|
@@ -387,3 +387,47 @@ impl Job {
|
||||
Ok(job_ids)
|
||||
}
|
||||
}
|
||||
|
||||
// Canonical Redis key builders for queues and hashes
|
||||
pub mod keys {
|
||||
use super::{NAMESPACE_PREFIX, ScriptType};
|
||||
|
||||
// hero:job:{job_id}
|
||||
pub fn job_hash(job_id: &str) -> String {
|
||||
format!("{}{}", NAMESPACE_PREFIX, job_id)
|
||||
}
|
||||
|
||||
// hero:q:reply:{job_id}
|
||||
pub fn reply(job_id: &str) -> String {
|
||||
format!("hero:q:reply:{}", job_id)
|
||||
}
|
||||
|
||||
// hero:q:work:type:{script_type}
|
||||
pub fn work_type(script_type: &ScriptType) -> String {
|
||||
format!("hero:q:work:type:{}", script_type.actor_queue_suffix())
|
||||
}
|
||||
|
||||
// hero:q:work:type:{script_type}:group:{group}
|
||||
pub fn work_group(script_type: &ScriptType, group: &str) -> String {
|
||||
format!(
|
||||
"hero:q:work:type:{}:group:{}",
|
||||
script_type.actor_queue_suffix(),
|
||||
group
|
||||
)
|
||||
}
|
||||
|
||||
// hero:q:work:type:{script_type}:group:{group}:inst:{instance}
|
||||
pub fn work_instance(script_type: &ScriptType, group: &str, instance: &str) -> String {
|
||||
format!(
|
||||
"hero:q:work:type:{}:group:{}:inst:{}",
|
||||
script_type.actor_queue_suffix(),
|
||||
group,
|
||||
instance
|
||||
)
|
||||
}
|
||||
|
||||
// hero:q:ctl:type:{script_type}
|
||||
pub fn stop_type(script_type: &ScriptType) -> String {
|
||||
format!("hero:q:ctl:type:{}", script_type.actor_queue_suffix())
|
||||
}
|
||||
}
|
||||
|
52
core/supervisor/examples/simple_job.rs
Normal file
52
core/supervisor/examples/simple_job.rs
Normal file
@@ -0,0 +1,52 @@
|
||||
use hero_supervisor::{SupervisorBuilder, ScriptType};
|
||||
use hero_job::JobBuilder as CoreJobBuilder;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
// 1) Build a Supervisor
|
||||
let supervisor = SupervisorBuilder::new()
|
||||
.redis_url("redis://127.0.0.1/")
|
||||
.build()
|
||||
.await?;
|
||||
|
||||
// 2) Build a Job (using core job builder to set caller_id, context_id)
|
||||
let job = CoreJobBuilder::new()
|
||||
.caller_id("02abc...caller") // required
|
||||
.context_id("02def...context") // required
|
||||
.script_type(ScriptType::OSIS) // select the OSIS actor (matches configured osis_actor_1)
|
||||
.script("40 + 3") // simple Rhai script
|
||||
.timeout(std::time::Duration::from_secs(10))
|
||||
.build()?; // returns hero_job::Job
|
||||
|
||||
let job_id = job.id.clone();
|
||||
|
||||
// 3a) Store the job in Redis
|
||||
supervisor.create_job(&job).await?;
|
||||
|
||||
// 3b) Start the job (pushes ID to the actor’s Redis queue)
|
||||
supervisor.start_job(&job_id).await?;
|
||||
|
||||
// 3c) Wait until finished, then fetch output
|
||||
use tokio::time::sleep;
|
||||
|
||||
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10);
|
||||
loop {
|
||||
let status = supervisor.get_job_status(&job_id).await?;
|
||||
if status == hero_supervisor::JobStatus::Finished {
|
||||
break;
|
||||
}
|
||||
if std::time::Instant::now() >= deadline {
|
||||
println!("Job {} timed out waiting for completion (status: {:?})", job_id, status);
|
||||
break;
|
||||
}
|
||||
sleep(std::time::Duration::from_millis(250)).await;
|
||||
}
|
||||
|
||||
if let Some(output) = supervisor.get_job_output(&job_id).await? {
|
||||
println!("Job {} output: {}", job_id, output);
|
||||
} else {
|
||||
println!("Job {} completed with no output field set", job_id);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
@@ -408,7 +408,8 @@ impl Supervisor {
|
||||
|
||||
/// Get the hardcoded actor queue key for the script type
|
||||
fn get_actor_queue_key(&self, script_type: &ScriptType) -> String {
|
||||
format!("{}actor_queue:{}", NAMESPACE_PREFIX, script_type.actor_queue_suffix())
|
||||
// Canonical type queue
|
||||
hero_job::keys::work_type(script_type)
|
||||
}
|
||||
|
||||
pub fn new_job(&self) -> JobBuilder {
|
||||
@@ -586,14 +587,9 @@ impl Supervisor {
|
||||
job_id: String,
|
||||
script_type: &ScriptType
|
||||
) -> Result<(), SupervisorError> {
|
||||
let actor_queue_key = self.get_actor_queue_key(script_type);
|
||||
|
||||
// lpush also infers its types, RV is typically i64 (length of list) or () depending on exact command variant
|
||||
// For `redis::AsyncCommands::lpush`, it's `RedisResult<R>` where R: FromRedisValue
|
||||
// Often this is the length of the list. Let's allow inference or specify if needed.
|
||||
let _: redis::RedisResult<i64> =
|
||||
conn.lpush(&actor_queue_key, job_id.clone()).await;
|
||||
|
||||
// Canonical dispatch to type queue
|
||||
let actor_queue_key = hero_job::keys::work_type(script_type);
|
||||
let _: redis::RedisResult<i64> = conn.lpush(&actor_queue_key, job_id.clone()).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -675,7 +671,8 @@ impl Supervisor {
|
||||
) -> Result<String, SupervisorError> {
|
||||
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
|
||||
|
||||
let reply_queue_key = format!("{}:reply:{}", NAMESPACE_PREFIX, job.id); // Derived from the passed job_id
|
||||
// Canonical reply queue
|
||||
let reply_queue_key = hero_job::keys::reply(&job.id);
|
||||
|
||||
self.create_job_using_connection(
|
||||
&mut conn,
|
||||
@@ -692,13 +689,48 @@ impl Supervisor {
|
||||
job.timeout
|
||||
);
|
||||
|
||||
self.await_response_from_connection(
|
||||
&mut conn,
|
||||
&job.id,
|
||||
&reply_queue_key,
|
||||
job.timeout,
|
||||
)
|
||||
.await
|
||||
// Some actors update the job hash directly and do not use reply queues.
|
||||
// Poll the job hash for output until timeout to support both models.
|
||||
let start_time = std::time::Instant::now();
|
||||
|
||||
loop {
|
||||
// If output is present in the job hash, return it immediately
|
||||
match self.get_job_output(&job.id).await {
|
||||
Ok(Some(output)) => {
|
||||
// Optional: cleanup reply queue in case it was created
|
||||
let _: redis::RedisResult<i32> = conn.del(&reply_queue_key).await;
|
||||
return Ok(output);
|
||||
}
|
||||
Ok(None) => {
|
||||
// Check for error state
|
||||
match self.get_job_status(&job.id).await {
|
||||
Ok(JobStatus::Error) => {
|
||||
// Try to read the error field for context
|
||||
let mut conn2 = self.redis_client.get_multiplexed_async_connection().await?;
|
||||
let job_key = format!("{}{}", NAMESPACE_PREFIX, job.id);
|
||||
let err: Option<String> = conn2.hget(&job_key, "error").await.ok();
|
||||
return Err(SupervisorError::InvalidInput(
|
||||
err.unwrap_or_else(|| "Job failed".to_string())
|
||||
));
|
||||
}
|
||||
_ => {
|
||||
// keep polling
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
// Ignore transient read errors and continue polling
|
||||
}
|
||||
}
|
||||
|
||||
if start_time.elapsed() >= job.timeout {
|
||||
// On timeout, ensure any reply queue is cleaned up and return a Timeout error
|
||||
let _: redis::RedisResult<i32> = conn.del(&reply_queue_key).await;
|
||||
return Err(SupervisorError::Timeout(job.id.clone()));
|
||||
}
|
||||
|
||||
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
|
||||
}
|
||||
}
|
||||
|
||||
// Method to get job status
|
||||
@@ -772,7 +804,7 @@ impl Supervisor {
|
||||
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
|
||||
|
||||
// Get job details to determine script type and appropriate actor
|
||||
let job_key = format!("{}job:{}", NAMESPACE_PREFIX, job_id);
|
||||
let job_key = format!("{}{}", NAMESPACE_PREFIX, job_id);
|
||||
let job_data: std::collections::HashMap<String, String> = conn.hgetall(&job_key).await?;
|
||||
|
||||
if job_data.is_empty() {
|
||||
@@ -787,7 +819,8 @@ impl Supervisor {
|
||||
.map_err(|e| SupervisorError::InvalidInput(format!("Invalid script type: {}", e)))?;
|
||||
|
||||
// Use hardcoded stop queue key for this script type
|
||||
let stop_queue_key = format!("{}stop_queue:{}", NAMESPACE_PREFIX, script_type.actor_queue_suffix());
|
||||
// Stop queue per protocol: hero:stop_queue:{suffix}
|
||||
let stop_queue_key = format!("hero:stop_queue:{}", script_type.actor_queue_suffix());
|
||||
|
||||
// Push job ID to the stop queue
|
||||
conn.lpush::<_, _, ()>(&stop_queue_key, job_id).await?;
|
||||
@@ -799,7 +832,7 @@ impl Supervisor {
|
||||
/// Get logs for a job by reading from its log file
|
||||
pub async fn get_job_logs(&self, job_id: &str) -> Result<Option<String>, SupervisorError> {
|
||||
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
|
||||
let job_key = format!("{}job:{}", NAMESPACE_PREFIX, job_id);
|
||||
let job_key = format!("{}{}", NAMESPACE_PREFIX, job_id);
|
||||
|
||||
// Get the job data to find the log path
|
||||
let result_map: Option<std::collections::HashMap<String, String>> =
|
||||
@@ -922,7 +955,7 @@ impl Supervisor {
|
||||
for job_id in ready_job_ids {
|
||||
// Get job data to determine script type and select actor
|
||||
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
|
||||
let job_key = format!("{}job:{}", NAMESPACE_PREFIX, job_id);
|
||||
let job_key = format!("{}{}", NAMESPACE_PREFIX, job_id);
|
||||
let job_data: std::collections::HashMap<String, String> = conn.hgetall(&job_key).await?;
|
||||
|
||||
if let Some(script_type_str) = job_data.get("script_type") {
|
||||
|
Reference in New Issue
Block a user