Files
supervisor/cmd/supervisor.rs
Timur Gordon a47157aa71 fix: keep OpenRPC ServerHandle alive to prevent server shutdown
The ServerHandle was being dropped immediately after spawning, causing the
OpenRPC server to shut down. Now we properly await handle.stopped() to keep
the server running.
2025-10-27 14:34:22 +01:00

159 lines
5.5 KiB
Rust

//! # Hero Supervisor Binary
//!
//! Main supervisor binary that manages multiple actors and listens to jobs over Redis.
//! The supervisor builds with actor configuration, starts actors, and dispatches jobs
//! to the appropriate runners based on the job's runner field.
use hero_supervisor::{SupervisorApp, SupervisorBuilder};
use clap::Parser;
use log::{info, error};
use std::path::PathBuf;
/// Command line arguments for the supervisor
#[derive(Parser, Debug)]
#[command(name = "supervisor")]
#[command(about = "Hero Supervisor - manages multiple actors and dispatches jobs")]
struct Args {
/// Path to the configuration TOML file
#[arg(short, long, value_name = "FILE")]
config: Option<PathBuf>,
/// Redis URL for job queue
#[arg(long, default_value = "redis://localhost:6379")]
redis_url: String,
/// Namespace for Redis keys
#[arg(long, default_value = "")]
namespace: String,
/// Admin secrets (can be specified multiple times)
#[arg(long = "admin-secret", value_name = "SECRET")]
admin_secrets: Vec<String>,
/// User secrets (can be specified multiple times)
#[arg(long = "user-secret", value_name = "SECRET")]
user_secrets: Vec<String>,
/// Register secrets (can be specified multiple times)
#[arg(long = "register-secret", value_name = "SECRET")]
register_secrets: Vec<String>,
/// Mycelium daemon URL
#[arg(long, default_value = "http://127.0.0.1:8990")]
mycelium_url: String,
/// Mycelium topic for supervisor RPC messages
#[arg(long, default_value = "supervisor.rpc")]
topic: String,
/// Port for OpenRPC HTTP server
#[arg(long, default_value = "3030")]
port: u16,
/// Bind address for OpenRPC HTTP server
#[arg(long, default_value = "127.0.0.1")]
bind_address: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize logging
env_logger::init();
info!("Starting Hero Supervisor");
// Parse command line arguments
let args = Args::parse();
// Create and initialize supervisor using builder pattern
let mut builder = SupervisorBuilder::new()
.redis_url(&args.redis_url)
.namespace(&args.namespace);
// Add secrets from CLI arguments
if !args.admin_secrets.is_empty() {
info!("Adding {} admin secret(s)", args.admin_secrets.len());
builder = builder.admin_secrets(args.admin_secrets);
}
if !args.user_secrets.is_empty() {
info!("Adding {} user secret(s)", args.user_secrets.len());
builder = builder.user_secrets(args.user_secrets);
}
if !args.register_secrets.is_empty() {
info!("Adding {} register secret(s)", args.register_secrets.len());
builder = builder.register_secrets(args.register_secrets);
}
let supervisor = match args.config {
Some(_config_path) => {
info!("Loading configuration from config file not yet implemented");
// For now, use CLI configuration
builder.build().await?
}
None => {
info!("Using CLI configuration");
builder.build().await?
}
};
// Print startup information
let server_url = format!("http://{}:{}", args.bind_address, args.port);
println!("\n╔════════════════════════════════════════════════════════════╗");
println!("║ Hero Supervisor Started ║");
println!("╚════════════════════════════════════════════════════════════╝");
println!(" 📡 OpenRPC Server: {}", server_url);
println!(" 🔗 Redis: {}", args.redis_url);
#[cfg(feature = "mycelium")]
if !args.mycelium_url.is_empty() {
println!(" 🌐 Mycelium: {}", args.mycelium_url);
} else {
println!(" 🌐 Mycelium: Disabled");
}
#[cfg(not(feature = "mycelium"))]
println!(" 🌐 Mycelium: Not compiled (use --features mycelium)");
println!("╚════════════════════════════════════════════════════════════╝\n");
// Start OpenRPC server in background
use std::sync::Arc;
use tokio::sync::Mutex;
use hero_supervisor::openrpc::start_http_openrpc_server;
let supervisor_arc = Arc::new(Mutex::new(supervisor.clone()));
let bind_addr = args.bind_address.clone();
let port = args.port;
tokio::spawn(async move {
info!("Starting OpenRPC server on {}:{}", bind_addr, port);
match start_http_openrpc_server(supervisor_arc, &bind_addr, port).await {
Ok(handle) => {
info!("OpenRPC server started successfully");
// Keep the server running by holding the handle
handle.stopped().await;
error!("OpenRPC server stopped unexpectedly");
}
Err(e) => {
error!("OpenRPC server error: {}", e);
}
}
});
// Give the server a moment to start
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
let mut app = SupervisorApp::new(supervisor, args.mycelium_url, args.topic);
// Start the complete supervisor application
app.start().await?;
Ok(())
}