// Core modules pub mod async_runner; pub mod sync_runner; pub mod runner_trait; pub mod script_mode; // Public exports for convenience pub use runner_trait::{Runner, RunnerConfig, spawn_runner}; pub use async_runner::{AsyncRunner, spawn_async_runner}; pub use sync_runner::{SyncRunner, SyncRunnerConfig, spawn_sync_runner}; // Re-export job types from hero-job crate pub use hero_job::{Job, JobStatus, JobError, JobBuilder, JobSignature}; // Re-export job client pub use hero_job_client::{Client, ClientBuilder}; pub use redis::AsyncCommands; use log::{error, info}; const BLPOP_TIMEOUT_SECONDS: usize = 5; /// Initialize Redis connection for the runner pub async fn initialize_redis_connection( runner_id: &str, redis_url: &str, ) -> Result> { let redis_client = redis::Client::open(redis_url) .map_err(|e| { error!("Runner for Runner ID '{}': Failed to open Redis client: {}", runner_id, e); e })?; let redis_conn = redis_client.get_multiplexed_async_connection().await .map_err(|e| { error!("Runner for Runner ID '{}': Failed to get Redis connection: {}", runner_id, e); e })?; info!("Runner for Runner ID '{}' successfully connected to Redis.", runner_id); Ok(redis_conn) } // /// Load job from Redis using the supervisor's Job API // pub async fn load_job_from_redis( // redis_conn: &mut redis::aio::MultiplexedConnection, // job_id: &str, // runner_id: &str, // ) -> Result { // debug!("Runner '{}', Job {}: Loading job from Redis", runner_id, job_id); // // Load job data from Redis hash // let job_data: std::collections::HashMap = redis_conn.hgetall(&client.job_key(job_id)).await // .map_err(JobError::Redis)?; // if job_data.is_empty() { // return Err(JobError::NotFound(job_id.to_string())); // } // // Parse job from hash data using the supervisor's Job struct // let job = Job { // id: job_id.to_string(), // caller_id: job_data.get("caller_id").unwrap_or(&"".to_string()).clone(), // context_id: job_data.get("context_id").unwrap_or(&"".to_string()).clone(), // payload: job_data.get("payload").unwrap_or(&"".to_string()).clone(), // runner: job_data.get("runner").unwrap_or(&"default".to_string()).clone(), // executor: job_data.get("executor").unwrap_or(&"rhai".to_string()).clone(), // timeout: job_data.get("timeout").and_then(|s| s.parse().ok()).unwrap_or(300), // env_vars: serde_json::from_str(job_data.get("env_vars").unwrap_or(&"{}".to_string())) // .map_err(JobError::Serialization)?, // created_at: job_data.get("created_at") // .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) // .map(|dt| dt.with_timezone(&chrono::Utc)) // .unwrap_or_else(chrono::Utc::now), // updated_at: job_data.get("updated_at") // .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) // .map(|dt| dt.with_timezone(&chrono::Utc)) // .unwrap_or_else(chrono::Utc::now), // }; // Ok(job) // }