This commit is contained in:
Timur Gordon
2025-08-08 09:54:50 +02:00
parent e66bee09cf
commit 89a3abee63
78 changed files with 12411 additions and 423 deletions

60
cmd/actor.rs Normal file
View File

@@ -0,0 +1,60 @@
use actor_system::AsyncWorker;
use clap::Parser;
use log::info;
use std::sync::Arc;
use tokio::sync::mpsc;
#[derive(Parser, Debug)]
#[command(name = "actor_system")]
#[command(about = "System Actor - Asynchronous job processing actor")]
struct Args {
/// Database path
#[arg(short, long, default_value = "/tmp/system_db")]
db_path: String,
/// Redis URL
#[arg(short, long, default_value = "redis://localhost:6379")]
redis_url: String,
/// Preserve completed tasks in Redis
#[arg(short, long)]
preserve_tasks: bool,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
env_logger::init();
let args = Args::parse();
info!("Starting System Actor");
// Create shutdown channel
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
// Setup signal handler for graceful shutdown
let shutdown_tx_clone = shutdown_tx.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.expect("Failed to listen for Ctrl+C");
info!("Received Ctrl+C, initiating shutdown...");
let _ = shutdown_tx_clone.send(()).await;
});
// Create and start the actor
let actor = Arc::new(
AsyncWorker::builder()
.db_path(args.db_path)
.redis_url(args.redis_url)
.build()?
);
let handle = baobab_actor::spawn_actor(actor, shutdown_rx);
info!("System Actor started, waiting for jobs...");
// Wait for the actor to complete
handle.await??;
info!("System Actor shutdown complete");
Ok(())
}

View File

@@ -1,302 +0,0 @@
//! System Worker Binary - Asynchronous actor for high-throughput concurrent processing
use clap::Parser;
use log::{error, info, warn};
use baobab_actor::async_actor_impl::AsyncWorker;
use baobab_actor::config::{ConfigError, WorkerConfig};
use baobab_actor::engine::create_heromodels_engine;
use baobab_actor::actor_trait::{spawn_actor, WorkerConfig as TraitWorkerConfig};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::signal;
use tokio::sync::mpsc;
#[derive(Parser, Debug)]
#[command(
name = "system",
version = "0.1.0",
about = "System Worker - Asynchronous Worker with Concurrent Job Processing",
long_about = "An asynchronous actor for Hero framework that processes multiple jobs \
concurrently with timeout support. Ideal for high-throughput scenarios \
where jobs can be executed in parallel."
)]
struct Args {
/// Path to TOML configuration file
#[arg(short, long, help = "Path to TOML configuration file")]
config: PathBuf,
/// Override actor ID from config
#[arg(long, help = "Override actor ID from configuration file")]
actor_id: Option<String>,
/// Override Redis URL from config
#[arg(long, help = "Override Redis URL from configuration file")]
redis_url: Option<String>,
/// Override database path from config
#[arg(long, help = "Override database path from configuration file")]
db_path: Option<String>,
/// Override default timeout in seconds
#[arg(long, help = "Override default job timeout in seconds")]
timeout: Option<u64>,
/// Enable verbose logging (debug level)
#[arg(short, long, help = "Enable verbose logging")]
verbose: bool,
/// Disable timestamps in log output
#[arg(long, help = "Remove timestamps from log output")]
no_timestamp: bool,
/// Show actor statistics periodically
#[arg(long, help = "Show periodic actor statistics")]
show_stats: bool,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let args = Args::parse();
// Load configuration from TOML file
let mut config = match WorkerConfig::from_file(&args.config) {
Ok(config) => config,
Err(e) => {
eprintln!("Failed to load configuration from {:?}: {}", args.config, e);
std::process::exit(1);
}
};
// Validate that this is an async actor configuration
if !config.is_async() {
eprintln!("Error: System actor requires an async actor configuration");
eprintln!("Expected: [actor_type] type = \"async\"");
eprintln!("Found: {:?}", config.actor_type);
std::process::exit(1);
}
// Apply command line overrides
if let Some(actor_id) = args.actor_id {
config.actor_id = actor_id;
}
if let Some(redis_url) = args.redis_url {
config.redis_url = redis_url;
}
if let Some(db_path) = args.db_path {
config.db_path = db_path;
}
// Override timeout if specified
if let Some(timeout_secs) = args.timeout {
if let baobab_actor::config::WorkerType::Async { ref mut default_timeout_seconds } = config.actor_type {
*default_timeout_seconds = timeout_secs;
}
}
// Configure logging
setup_logging(&config, args.verbose, args.no_timestamp)?;
info!("🚀 System Worker starting...");
info!("Worker ID: {}", config.actor_id);
info!("Redis URL: {}", config.redis_url);
info!("Database Path: {}", config.db_path);
info!("Preserve Tasks: {}", config.preserve_tasks);
if let Some(timeout) = config.get_default_timeout() {
info!("Default Timeout: {:?}", timeout);
}
// Create Rhai engine
let engine = create_heromodels_engine();
info!("✅ Rhai engine initialized");
// Create actor configuration for the trait-based interface
let mut actor_config = TraitWorkerConfig::new(
config.actor_id.clone(),
config.db_path.clone(),
config.redis_url.clone(),
config.preserve_tasks,
);
// Add timeout configuration for async actor
if let Some(timeout) = config.get_default_timeout() {
actor_config = actor_config.with_default_timeout(timeout);
}
// Create async actor instance
let actor = Arc::new(AsyncWorker::default());
info!("✅ Async actor instance created");
// Setup shutdown signal handling
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
// Spawn shutdown signal handler
let shutdown_tx_clone = shutdown_tx.clone();
tokio::spawn(async move {
if let Err(e) = signal::ctrl_c().await {
error!("Failed to listen for shutdown signal: {}", e);
return;
}
info!("🛑 Shutdown signal received");
if let Err(e) = shutdown_tx_clone.send(()).await {
error!("Failed to send shutdown signal: {}", e);
}
});
// Spawn statistics reporter if requested
if args.show_stats {
let actor_stats = Arc::clone(&actor);
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
let running_count = actor_stats.running_job_count().await;
if running_count > 0 {
info!("📊 Worker Stats: {} jobs currently running", running_count);
} else {
info!("📊 Worker Stats: No jobs currently running");
}
}
});
}
// Spawn the actor
info!("🔄 Starting actor loop...");
let actor_handle = spawn_actor(actor, engine, shutdown_rx);
// Wait for the actor to complete
match actor_handle.await {
Ok(Ok(())) => {
info!("✅ System Worker shut down gracefully");
}
Ok(Err(e)) => {
error!("❌ System Worker encountered an error: {}", e);
std::process::exit(1);
}
Err(e) => {
error!("❌ Failed to join actor task: {}", e);
std::process::exit(1);
}
}
Ok(())
}
/// Setup logging based on configuration and command line arguments
fn setup_logging(
config: &WorkerConfig,
verbose: bool,
no_timestamp: bool,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut builder = env_logger::Builder::new();
// Determine log level
let log_level = if verbose {
"debug"
} else {
&config.logging.level
};
// Set log level
builder.filter_level(match log_level.to_lowercase().as_str() {
"trace" => log::LevelFilter::Trace,
"debug" => log::LevelFilter::Debug,
"info" => log::LevelFilter::Info,
"warn" => log::LevelFilter::Warn,
"error" => log::LevelFilter::Error,
_ => {
warn!("Invalid log level: {}. Using 'info'", log_level);
log::LevelFilter::Info
}
});
// Configure timestamps
let show_timestamps = !no_timestamp && config.logging.timestamps;
if !show_timestamps {
builder.format_timestamp(None);
}
builder.init();
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
#[test]
fn test_config_validation() {
let config_toml = r#"
actor_id = "test_system"
redis_url = "redis://localhost:6379"
db_path = "/tmp/test_db"
[actor_type]
type = "async"
default_timeout_seconds = 600
[logging]
level = "info"
"#;
let mut temp_file = NamedTempFile::new().unwrap();
temp_file.write_all(config_toml.as_bytes()).unwrap();
let config = WorkerConfig::from_file(temp_file.path()).unwrap();
assert!(!config.is_sync());
assert!(config.is_async());
assert_eq!(config.actor_id, "test_system");
assert_eq!(config.get_default_timeout(), Some(Duration::from_secs(600)));
}
#[test]
fn test_sync_config_rejection() {
let config_toml = r#"
actor_id = "test_system"
redis_url = "redis://localhost:6379"
db_path = "/tmp/test_db"
[actor_type]
type = "sync"
[logging]
level = "info"
"#;
let mut temp_file = NamedTempFile::new().unwrap();
temp_file.write_all(config_toml.as_bytes()).unwrap();
let config = WorkerConfig::from_file(temp_file.path()).unwrap();
assert!(config.is_sync());
assert!(!config.is_async());
// This would be rejected in main() function
}
#[test]
fn test_timeout_override() {
let config_toml = r#"
actor_id = "test_system"
redis_url = "redis://localhost:6379"
db_path = "/tmp/test_db"
[actor_type]
type = "async"
default_timeout_seconds = 300
"#;
let mut temp_file = NamedTempFile::new().unwrap();
temp_file.write_all(config_toml.as_bytes()).unwrap();
let mut config = WorkerConfig::from_file(temp_file.path()).unwrap();
assert_eq!(config.get_default_timeout(), Some(Duration::from_secs(300)));
// Test timeout override
if let baobab_actor::config::WorkerType::Async { ref mut default_timeout_seconds } = config.actor_type {
*default_timeout_seconds = 600;
}
assert_eq!(config.get_default_timeout(), Some(Duration::from_secs(600)));
}
}

156
cmd/terminal_ui.rs Normal file
View File

@@ -0,0 +1,156 @@
//! Simplified main function for Baobab Actor TUI
//!
//! This binary provides a clean entry point for the actor monitoring and job dispatch interface.
use anyhow::{Result, Context};
use baobab_actor::terminal_ui::{App, setup_and_run_tui};
use clap::Parser;
use hero_job::ScriptType;
use log::{info, warn, error};
use std::path::PathBuf;
use std::process::{Child, Command};
use tokio::signal;
#[derive(Parser)]
#[command(name = "baobab-actor-tui")]
#[command(about = "Terminal UI for Baobab Actor - Monitor and dispatch jobs to a single actor")]
struct Args {
/// Redis URL for job queue
#[arg(short, long, default_value = "redis://localhost:6379")]
redis_url: String,
/// Enable verbose logging
#[arg(short, long)]
verbose: bool,
}
/// Initialize logging based on verbosity level
fn init_logging(verbose: bool) {
if verbose {
env_logger::Builder::from_default_env()
.filter_level(log::LevelFilter::Debug)
.init();
} else {
env_logger::Builder::from_default_env()
.filter_level(log::LevelFilter::Info)
.init();
}
}
/// Create and configure the TUI application
fn create_app(args: &Args) -> Result<App> {
let actor_id = "sal".to_string();
// Get the crate root directory
let crate_root = std::env::var("CARGO_MANIFEST_DIR")
.unwrap_or_else(|_| ".".to_string());
let crate_root = PathBuf::from(crate_root);
let actor_path = crate_root.join("target/debug/actor_system");
let example_dir = Some(crate_root.join("examples/scripts"));
let mut app = App::new(
actor_id,
actor_path,
args.redis_url.clone(),
example_dir,
)?;
// Set the correct script type for the system actor
// System actor processes SAL (System Abstraction Layer) scripts, not OSIS scripts
app.job_form.script_type = ScriptType::SAL;
Ok(app)
}
/// Spawn the actor binary as a background process
fn spawn_actor_process(_args: &Args) -> Result<Child> {
// Get the crate root directory
let crate_root = std::env::var("CARGO_MANIFEST_DIR")
.unwrap_or_else(|_| ".".to_string());
let actor_path = PathBuf::from(crate_root).join("target/debug/actor_system");
info!("🎬 Spawning actor process: {}", actor_path.display());
let mut cmd = Command::new(&actor_path);
// Redirect stdout and stderr to null to prevent logs from interfering with TUI
cmd.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null());
// Spawn the process
let child = cmd
.spawn()
.with_context(|| format!("Failed to spawn actor process: {}", actor_path.display()))?;
info!("✅ Actor process spawned with PID: {}", child.id());
Ok(child)
}
/// Cleanup function to terminate actor process
fn cleanup_actor_process(mut actor_process: Child) {
info!("🧹 Cleaning up actor process...");
match actor_process.try_wait() {
Ok(Some(status)) => {
info!("Actor process already exited with status: {}", status);
}
Ok(None) => {
info!("Terminating actor process...");
if let Err(e) = actor_process.kill() {
error!("Failed to kill actor process: {}", e);
} else {
match actor_process.wait() {
Ok(status) => info!("Actor process terminated with status: {}", status),
Err(e) => error!("Failed to wait for actor process: {}", e),
}
}
}
Err(e) => {
error!("Failed to check actor process status: {}", e);
}
}
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
// Initialize logging
init_logging(args.verbose);
let crate_root = std::env::var("CARGO_MANIFEST_DIR")
.unwrap_or_else(|_| ".".to_string());
info!("🚀 Starting Baobab Actor TUI...");
info!("Actor ID: sal (System Actor)");
info!("Actor Path: {}/target/debug/actor_system", crate_root);
info!("Redis URL: {}", args.redis_url);
info!("Script Type: SAL");
info!("Example Directory: {}/examples/scripts", crate_root);
// Spawn the actor process first
let actor_process = spawn_actor_process(&args)?;
// Give the actor a moment to start up
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
// Create app and run TUI
let app = create_app(&args)?;
// Set up signal handling for graceful shutdown
let result = tokio::select! {
tui_result = setup_and_run_tui(app) => {
info!("TUI exited");
tui_result
}
_ = signal::ctrl_c() => {
info!("Received Ctrl+C, shutting down...");
Ok(())
}
};
// Clean up the actor process
cleanup_actor_process(actor_process);
result
}