Restructure: move src to core/ crate, move cmd/supervisor.rs to core/src/bin/supervisor.rs, create workspace

This commit is contained in:
Timur Gordon
2025-11-06 23:30:18 +01:00
parent 0a617ad359
commit 6d518599b8
39 changed files with 824 additions and 140 deletions

190
core/src/app.rs Normal file
View File

@@ -0,0 +1,190 @@
//! # Hero Supervisor Application
//!
//! Simplified supervisor application that wraps a built Supervisor instance.
//! Use SupervisorBuilder to construct the supervisor with all configuration,
//! then pass it to SupervisorApp for runtime management.
use crate::Supervisor;
#[cfg(feature = "mycelium")]
use crate::mycelium::MyceliumIntegration;
use log::{info, error, debug};
#[cfg(feature = "mycelium")]
use std::sync::Arc;
#[cfg(feature = "mycelium")]
use tokio::sync::Mutex;
/// Main supervisor application
pub struct SupervisorApp {
pub supervisor: Supervisor,
pub mycelium_url: String,
pub topic: String,
}
impl SupervisorApp {
/// Create a new supervisor application with a built supervisor
pub fn new(supervisor: Supervisor, mycelium_url: String, topic: String) -> Self {
Self {
supervisor,
mycelium_url,
topic,
}
}
/// Start the complete supervisor application
/// This method handles the entire application lifecycle:
/// - Starts all configured runners
/// - Connects to Mycelium daemon for message transport
/// - Sets up graceful shutdown handling
/// - Keeps the application running
pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> {
info!("Starting Hero Supervisor Application");
// Start all configured runners
self.start_all().await?;
// Start Mycelium integration
self.start_mycelium_integration().await?;
// Set up graceful shutdown
self.setup_graceful_shutdown().await;
// Keep the application running
info!("Supervisor is running. Press Ctrl+C to shutdown.");
self.run_main_loop().await;
Ok(())
}
/// Start the Mycelium integration
async fn start_mycelium_integration(&self) -> Result<(), Box<dyn std::error::Error>> {
#[cfg(feature = "mycelium")]
{
// Skip Mycelium if URL is empty
if self.mycelium_url.is_empty() {
info!("Mycelium integration disabled (no URL provided)");
return Ok(());
}
info!("Starting Mycelium integration...");
let supervisor_for_mycelium = Arc::new(Mutex::new(self.supervisor.clone()));
let mycelium_url = self.mycelium_url.clone();
let topic = self.topic.clone();
let mycelium_integration = MyceliumIntegration::new(
supervisor_for_mycelium,
mycelium_url,
topic,
);
// Start the Mycelium integration in a background task
let integration_handle = tokio::spawn(async move {
if let Err(e) = mycelium_integration.start().await {
error!("Mycelium integration error: {}", e);
}
});
// Give the integration a moment to start
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
info!("Mycelium integration started successfully");
// Store the handle for potential cleanup
std::mem::forget(integration_handle); // For now, let it run in background
}
#[cfg(not(feature = "mycelium"))]
{
info!("Mycelium integration not enabled (compile with --features mycelium)");
}
Ok(())
}
/// Set up graceful shutdown handling
async fn setup_graceful_shutdown(&self) {
tokio::spawn(async move {
tokio::signal::ctrl_c().await.expect("Failed to listen for ctrl+c");
info!("Received shutdown signal");
std::process::exit(0);
});
}
/// Main application loop
async fn run_main_loop(&self) {
// Keep the main thread alive
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
/// Start all configured runners
pub async fn start_all(&mut self) -> Result<(), Box<dyn std::error::Error>> {
info!("Starting all runners");
let results = self.supervisor.start_all().await;
let mut failed_count = 0;
for (runner_id, result) in results {
match result {
Ok(_) => info!("Runner {} started successfully", runner_id),
Err(e) => {
error!("Failed to start runner {}: {}", runner_id, e);
failed_count += 1;
}
}
}
if failed_count == 0 {
info!("All runners started successfully");
} else {
error!("Failed to start {} runners", failed_count);
}
Ok(())
}
/// Stop all configured runners
pub async fn stop_all(&mut self, force: bool) -> Result<(), Box<dyn std::error::Error>> {
info!("Stopping all runners (force: {})", force);
let results = self.supervisor.stop_all(force).await;
let mut failed_count = 0;
for (runner_id, result) in results {
match result {
Ok(_) => info!("Runner {} stopped successfully", runner_id),
Err(e) => {
error!("Failed to stop runner {}: {}", runner_id, e);
failed_count += 1;
}
}
}
if failed_count == 0 {
info!("All runners stopped successfully");
} else {
error!("Failed to stop {} runners", failed_count);
}
Ok(())
}
/// Get status of all runners
pub async fn get_status(&self) -> Result<Vec<(String, String)>, Box<dyn std::error::Error>> {
debug!("Getting status of all runners");
let statuses = self.supervisor.get_all_runner_status().await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error>)?;
let status_strings: Vec<(String, String)> = statuses
.into_iter()
.map(|(runner_id, status)| {
let status_str = format!("{:?}", status);
(runner_id, status_str)
})
.collect();
Ok(status_strings)
}
}

134
core/src/auth.rs Normal file
View File

@@ -0,0 +1,134 @@
//! Authentication and API key management
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use uuid::Uuid;
/// API key scope/permission level
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ApiKeyScope {
/// Full access - can manage keys, runners, jobs
Admin,
/// Can register new runners
Registrar,
/// Can create and manage jobs
User,
}
impl ApiKeyScope {
pub fn as_str(&self) -> &'static str {
match self {
ApiKeyScope::Admin => "admin",
ApiKeyScope::Registrar => "registrar",
ApiKeyScope::User => "user",
}
}
}
/// An API key with metadata
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ApiKey {
/// The actual key value (UUID or custom string)
pub key: String,
/// Human-readable name for the key
pub name: String,
/// Permission scope
pub scope: ApiKeyScope,
/// When the key was created
pub created_at: String,
/// Optional expiration timestamp
pub expires_at: Option<String>,
}
impl ApiKey {
/// Create a new API key with a generated UUID
pub fn new(name: String, scope: ApiKeyScope) -> Self {
Self {
key: Uuid::new_v4().to_string(),
name,
scope,
created_at: chrono::Utc::now().to_rfc3339(),
expires_at: None,
}
}
/// Create a new API key with a specific key value
pub fn with_key(key: String, name: String, scope: ApiKeyScope) -> Self {
Self {
key,
name,
scope,
created_at: chrono::Utc::now().to_rfc3339(),
expires_at: None,
}
}
}
/// API key store
#[derive(Debug, Clone, Default)]
pub struct ApiKeyStore {
/// Map of key -> ApiKey
keys: HashMap<String, ApiKey>,
}
impl ApiKeyStore {
pub fn new() -> Self {
Self {
keys: HashMap::new(),
}
}
/// Add a new API key
pub fn add_key(&mut self, key: ApiKey) {
self.keys.insert(key.key.clone(), key);
}
/// Remove an API key by its key value
pub fn remove_key(&mut self, key: &str) -> Option<ApiKey> {
self.keys.remove(key)
}
/// Get an API key by its key value
pub fn get_key(&self, key: &str) -> Option<&ApiKey> {
self.keys.get(key)
}
/// Verify a key and return its metadata if valid
pub fn verify_key(&self, key: &str) -> Option<&ApiKey> {
self.get_key(key)
}
/// List all keys with a specific scope
pub fn list_keys_by_scope(&self, scope: ApiKeyScope) -> Vec<&ApiKey> {
self.keys
.values()
.filter(|k| k.scope == scope)
.collect()
}
/// List all keys
pub fn list_all_keys(&self) -> Vec<&ApiKey> {
self.keys.values().collect()
}
/// Count keys by scope
pub fn count_by_scope(&self, scope: ApiKeyScope) -> usize {
self.keys.values().filter(|k| k.scope == scope).count()
}
/// Bootstrap with an initial admin key
pub fn bootstrap_admin_key(&mut self, name: String) -> ApiKey {
let key = ApiKey::new(name, ApiKeyScope::Admin);
self.add_key(key.clone());
key
}
}
/// Response for auth verification
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuthVerifyResponse {
pub valid: bool,
pub name: String,
pub scope: String,
}

176
core/src/bin/supervisor.rs Normal file
View File

@@ -0,0 +1,176 @@
//! # Hero Supervisor Binary
//!
//! Main supervisor binary that manages multiple actors and listens to jobs over Redis.
//! The supervisor builds with actor configuration, starts actors, and dispatches jobs
//! to the appropriate runners based on the job's runner field.
use hero_supervisor::{SupervisorApp, SupervisorBuilder};
use clap::Parser;
use log::{info, error};
use std::path::PathBuf;
/// Command line arguments for the supervisor
#[derive(Parser, Debug)]
#[command(name = "supervisor")]
#[command(about = "Hero Supervisor - manages multiple actors and dispatches jobs")]
struct Args {
/// Path to the configuration TOML file
#[arg(short, long, value_name = "FILE")]
config: Option<PathBuf>,
/// Redis URL for job queue
#[arg(long, default_value = "redis://localhost:6379")]
redis_url: String,
/// Namespace for Redis keys
#[arg(long, default_value = "")]
namespace: String,
/// Admin secrets (can be specified multiple times)
#[arg(long = "admin-secret", value_name = "SECRET")]
admin_secrets: Vec<String>,
/// User secrets (can be specified multiple times)
#[arg(long = "user-secret", value_name = "SECRET")]
user_secrets: Vec<String>,
/// Register secrets (can be specified multiple times)
#[arg(long = "register-secret", value_name = "SECRET")]
register_secrets: Vec<String>,
/// Mycelium daemon URL
#[arg(long, default_value = "http://127.0.0.1:8990")]
mycelium_url: String,
/// Mycelium topic for supervisor RPC messages
#[arg(long, default_value = "supervisor.rpc")]
topic: String,
/// Port for OpenRPC HTTP server
#[arg(long, default_value = "3030")]
port: u16,
/// Bind address for OpenRPC HTTP server
#[arg(long, default_value = "127.0.0.1")]
bind_address: String,
/// Bootstrap an initial admin API key with the given name
#[arg(long = "bootstrap-admin-key", value_name = "NAME")]
bootstrap_admin_key: Option<String>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize logging
env_logger::init();
info!("Starting Hero Supervisor");
// Parse command line arguments
let args = Args::parse();
// Create and initialize supervisor using builder pattern
let mut builder = SupervisorBuilder::new()
.redis_url(&args.redis_url)
.namespace(&args.namespace);
// Add secrets from CLI arguments
if !args.admin_secrets.is_empty() {
info!("Adding {} admin secret(s)", args.admin_secrets.len());
builder = builder.admin_secrets(args.admin_secrets);
}
if !args.user_secrets.is_empty() {
info!("Adding {} user secret(s)", args.user_secrets.len());
builder = builder.user_secrets(args.user_secrets);
}
if !args.register_secrets.is_empty() {
info!("Adding {} register secret(s)", args.register_secrets.len());
builder = builder.register_secrets(args.register_secrets);
}
let supervisor = match args.config {
Some(_config_path) => {
info!("Loading configuration from config file not yet implemented");
// For now, use CLI configuration
builder.build().await?
}
None => {
info!("Using CLI configuration");
builder.build().await?
}
};
// Bootstrap admin key if requested
if let Some(admin_key_name) = args.bootstrap_admin_key {
info!("Bootstrapping admin API key: {}", admin_key_name);
let admin_key = supervisor.bootstrap_admin_key(admin_key_name).await;
println!("\n╔════════════════════════════════════════════════════════════╗");
println!("║ 🔑 Admin API Key Created ║");
println!("╚════════════════════════════════════════════════════════════╝");
println!(" Name: {}", admin_key.name);
println!(" Key: {}", admin_key.key);
println!(" Scope: {}", admin_key.scope.as_str());
println!(" ⚠️ SAVE THIS KEY - IT WILL NOT BE SHOWN AGAIN!");
println!("╚════════════════════════════════════════════════════════════╝\n");
}
// Print startup information
let server_url = format!("http://{}:{}", args.bind_address, args.port);
println!("\n╔════════════════════════════════════════════════════════════╗");
println!("║ Hero Supervisor Started ║");
println!("╚════════════════════════════════════════════════════════════╝");
println!(" 📡 OpenRPC Server: {}", server_url);
println!(" 🔗 Redis: {}", args.redis_url);
#[cfg(feature = "mycelium")]
if !args.mycelium_url.is_empty() {
println!(" 🌐 Mycelium: {}", args.mycelium_url);
} else {
println!(" 🌐 Mycelium: Disabled");
}
#[cfg(not(feature = "mycelium"))]
println!(" 🌐 Mycelium: Not compiled (use --features mycelium)");
println!("╚════════════════════════════════════════════════════════════╝\n");
// Start OpenRPC server in background
use std::sync::Arc;
use tokio::sync::Mutex;
use hero_supervisor::openrpc::start_http_openrpc_server;
let supervisor_arc = Arc::new(Mutex::new(supervisor.clone()));
let bind_addr = args.bind_address.clone();
let port = args.port;
tokio::spawn(async move {
info!("Starting OpenRPC server on {}:{}", bind_addr, port);
match start_http_openrpc_server(supervisor_arc, &bind_addr, port).await {
Ok(handle) => {
info!("OpenRPC server started successfully");
// Keep the server running by holding the handle
handle.stopped().await;
error!("OpenRPC server stopped unexpectedly");
}
Err(e) => {
error!("OpenRPC server error: {}", e);
}
}
});
// Give the server a moment to start
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
let mut app = SupervisorApp::new(supervisor, args.mycelium_url, args.topic);
// Start the complete supervisor application
app.start().await?;
Ok(())
}

3
core/src/job.rs Normal file
View File

@@ -0,0 +1,3 @@
// Re-export job types from the hero-job crate
pub use hero_job::{Job, JobBuilder, JobStatus, JobError};
use hero_job_client::{Client, ClientBuilder};

25
core/src/lib.rs Normal file
View File

@@ -0,0 +1,25 @@
//! Hero Supervisor - Actor management for the Hero ecosystem.
//!
//! See README.md for detailed documentation and usage examples.
pub mod runner;
pub mod job;
pub mod supervisor;
pub mod app;
pub mod openrpc;
pub mod auth;
pub mod services;
#[cfg(feature = "mycelium")]
pub mod mycelium;
// Re-export main types for convenience
pub use runner::{Runner, RunnerConfig, RunnerResult, RunnerStatus};
// pub use sal_service_manager::{ProcessManager, SimpleProcessManager, TmuxProcessManager};
pub use supervisor::{Supervisor, SupervisorBuilder, ProcessManagerType};
pub use hero_job::{Job, JobBuilder, JobStatus, JobError};
use hero_job_client::{Client, ClientBuilder};
pub use app::SupervisorApp;
#[cfg(feature = "mycelium")]
pub use mycelium::{MyceliumIntegration, MyceliumServer};

519
core/src/mycelium.rs Normal file
View File

@@ -0,0 +1,519 @@
//! # Mycelium Integration for Hero Supervisor
//!
//! This module integrates the supervisor with Mycelium's message transport system.
//! Instead of running its own server, it connects to an existing Mycelium daemon
//! and listens for incoming supervisor RPC messages via HTTP REST API.
use std::sync::Arc;
use tokio::sync::Mutex;
use serde_json::{Value, json};
use log::{info, error, debug, trace};
use base64::Engine;
use reqwest::Client as HttpClient;
use crate::Supervisor;
use tokio::time::{sleep, Duration};
/// Mycelium integration that connects to a Mycelium daemon and handles supervisor RPC messages
pub struct MyceliumIntegration {
supervisor: Arc<Mutex<Supervisor>>,
mycelium_url: String,
http_client: HttpClient,
topic: String,
running: Arc<Mutex<bool>>,
}
impl MyceliumIntegration {
pub fn new(supervisor: Arc<Mutex<Supervisor>>, mycelium_url: String, topic: String) -> Self {
Self {
supervisor,
mycelium_url,
http_client: HttpClient::new(),
topic,
running: Arc::new(Mutex::new(false)),
}
}
/// Start listening for messages on the Mycelium network
pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
info!("Starting Mycelium integration with daemon at {}", self.mycelium_url);
// Skip connection test for now due to API compatibility issues
// TODO: Fix Mycelium API compatibility
info!("Skipping connection test - assuming Mycelium daemon is running");
// Set running flag
{
let mut running = self.running.lock().await;
*running = true;
}
info!("Mycelium integration started successfully, listening on topic: {}", self.topic);
// Start message polling loop
let supervisor = Arc::clone(&self.supervisor);
let http_client = self.http_client.clone();
let mycelium_url = self.mycelium_url.clone();
let topic = self.topic.clone();
let running = Arc::clone(&self.running);
tokio::spawn(async move {
Self::message_loop(supervisor, http_client, mycelium_url, topic, running).await;
});
Ok(())
}
/// Test connection to Mycelium daemon using JSON-RPC
async fn test_connection(&self) -> Result<(), Box<dyn std::error::Error>> {
let test_request = json!({
"jsonrpc": "2.0",
"method": "getInfo",
"params": [],
"id": 1
});
let response = self.http_client
.post(&self.mycelium_url)
.json(&test_request)
.send()
.await?;
if response.status().is_success() {
let result: Value = response.json().await?;
if result.get("result").is_some() {
info!("Successfully connected to Mycelium daemon at {}", self.mycelium_url);
Ok(())
} else {
error!("Mycelium daemon returned error: {}", result);
Err("Mycelium daemon returned error".into())
}
} else {
let status = response.status();
let text = response.text().await.unwrap_or_default();
error!("Failed to connect to Mycelium daemon: {} - {}", status, text);
Err(format!("Mycelium connection failed: {}", status).into())
}
}
/// Handle incoming supervisor RPC message (called by Mycelium daemon via pushMessage)
pub async fn handle_supervisor_message(
&self,
payload_b64: &str,
reply_info: Option<(String, String)>,
) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
// Decode the base64 payload
let payload_bytes = base64::engine::general_purpose::STANDARD
.decode(payload_b64.as_bytes())?;
let payload_str = String::from_utf8(payload_bytes)?;
info!("Received supervisor message: {}", payload_str);
// Parse the JSON-RPC request
let request: Value = serde_json::from_str(&payload_str)?;
debug!("Decoded supervisor RPC: {}", request);
// Extract method and params from supervisor JSON-RPC
let method = request.get("method")
.and_then(|v| v.as_str())
.ok_or("missing method")?;
let rpc_params = request.get("params")
.cloned()
.unwrap_or(json!([]));
let rpc_id = request.get("id").cloned();
// Route to appropriate supervisor method
let result = self.route_supervisor_call(method, rpc_params).await?;
// If we have reply info, send the response back via Mycelium
if let Some((src_ip, _msg_id)) = reply_info {
let supervisor_response = json!({
"jsonrpc": "2.0",
"id": rpc_id,
"result": result
});
let response_b64 = base64::engine::general_purpose::STANDARD
.encode(serde_json::to_string(&supervisor_response)?.as_bytes());
info!("Sending response back to client at {}: {}", src_ip, supervisor_response);
// Send reply back to the client
match self.send_reply(&src_ip, &response_b64).await {
Ok(()) => info!("✅ Response sent successfully to {}", src_ip),
Err(e) => error!("❌ Failed to send response to {}: {}", src_ip, e),
}
}
Ok(Some("handled".to_string()))
}
/// Send a reply message back to a client using Mycelium JSON-RPC
async fn send_reply(
&self,
dst_ip: &str,
payload_b64: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Send response to a dedicated response topic
let response_topic = "supervisor.response";
let topic_b64 = base64::engine::general_purpose::STANDARD.encode(response_topic.as_bytes());
let message_info = json!({
"dst": { "ip": dst_ip },
"topic": topic_b64,
"payload": payload_b64 // payload_b64 is already base64 encoded
});
let push_request = json!({
"jsonrpc": "2.0",
"method": "pushMessage",
"params": [message_info, null],
"id": 1
});
let response = self.http_client
.post(&self.mycelium_url)
.json(&push_request)
.send()
.await?;
if response.status().is_success() {
let result: Value = response.json().await?;
if result.get("result").is_some() {
debug!("Sent reply to {}", dst_ip);
Ok(())
} else {
error!("Failed to send reply, Mycelium error: {}", result);
Err("Mycelium pushMessage failed".into())
}
} else {
let status = response.status();
let text = response.text().await.unwrap_or_default();
error!("Failed to send reply: {} - {}", status, text);
Err(format!("Failed to send reply: {}", status).into())
}
}
/// Route supervisor method calls to the appropriate supervisor functions
async fn route_supervisor_call(
&self,
method: &str,
params: Value,
) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
let mut supervisor_guard = self.supervisor.lock().await;
match method {
"list_runners" => {
// list_runners doesn't require parameters
let runners = supervisor_guard.list_runners();
Ok(json!(runners))
}
"register_runner" => {
if let Some(param_obj) = params.as_array().and_then(|arr| arr.get(0)) {
let secret = param_obj.get("secret")
.and_then(|v| v.as_str())
.ok_or("missing secret")?;
let name = param_obj.get("name")
.and_then(|v| v.as_str())
.ok_or("missing name")?;
let queue = param_obj.get("queue")
.and_then(|v| v.as_str())
.ok_or("missing queue")?;
supervisor_guard.register_runner(secret, name, queue).await?;
Ok(json!("success"))
} else {
Err("invalid register_runner params".into())
}
}
"start_runner" => {
if let Some(actor_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) {
supervisor_guard.start_runner(actor_id).await?;
Ok(json!("success"))
} else {
Err("invalid start_runner params".into())
}
}
"stop_runner" => {
if let Some(arr) = params.as_array() {
let actor_id = arr.get(0).and_then(|v| v.as_str()).ok_or("missing actor_id")?;
let force = arr.get(1).and_then(|v| v.as_bool()).unwrap_or(false);
supervisor_guard.stop_runner(actor_id, force).await?;
Ok(json!("success"))
} else {
Err("invalid stop_runner params".into())
}
}
"get_runner_status" => {
if let Some(actor_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) {
let status = supervisor_guard.get_runner_status(actor_id).await?;
Ok(json!(format!("{:?}", status)))
} else {
Err("invalid get_runner_status params".into())
}
}
"get_all_runner_status" => {
let statuses = supervisor_guard.get_all_runner_status().await?;
let status_map: std::collections::HashMap<String, String> = statuses
.into_iter()
.map(|(id, status)| (id, format!("{:?}", status)))
.collect();
Ok(json!(status_map))
}
"start_all" => {
let results = supervisor_guard.start_all().await;
let status_results: Vec<(String, String)> = results
.into_iter()
.map(|(id, result)| {
let status = match result {
Ok(_) => "started".to_string(),
Err(e) => format!("error: {}", e),
};
(id, status)
})
.collect();
Ok(json!(status_results))
}
"stop_all" => {
let force = params.as_array()
.and_then(|arr| arr.get(0))
.and_then(|v| v.as_bool())
.unwrap_or(false);
let results = supervisor_guard.stop_all(force).await;
let status_results: Vec<(String, String)> = results
.into_iter()
.map(|(id, result)| {
let status = match result {
Ok(_) => "stopped".to_string(),
Err(e) => format!("error: {}", e),
};
(id, status)
})
.collect();
Ok(json!(status_results))
}
"job.run" => {
// Run job and wait for result (blocking)
if let Some(param_obj) = params.as_array().and_then(|arr| arr.get(0)) {
let _secret = param_obj.get("secret")
.and_then(|v| v.as_str())
.ok_or("missing secret")?;
let job_value = param_obj.get("job")
.ok_or("missing job")?;
let timeout = param_obj.get("timeout")
.and_then(|v| v.as_u64())
.unwrap_or(60);
// Deserialize the job
let job: hero_job::Job = serde_json::from_value(job_value.clone())
.map_err(|e| format!("invalid job format: {}", e))?;
let job_id = job.id.clone();
let runner_name = job.runner.clone();
// Verify signatures
job.verify_signatures()
.map_err(|e| format!("signature verification failed: {}", e))?;
info!("Job {} signature verification passed for signatories: {:?}",
job_id, job.signatories());
// Queue and wait for result
let mut supervisor_guard = self.supervisor.lock().await;
let result = supervisor_guard.queue_and_wait(&runner_name, job, timeout)
.await
.map_err(|e| format!("job execution failed: {}", e))?;
Ok(json!({
"job_id": job_id,
"status": "completed",
"result": result
}))
} else {
Err("invalid job.run params".into())
}
}
"job.start" => {
// Start job without waiting (non-blocking)
if let Some(param_obj) = params.as_array().and_then(|arr| arr.get(0)) {
let _secret = param_obj.get("secret")
.and_then(|v| v.as_str())
.ok_or("missing secret")?;
let job_value = param_obj.get("job")
.ok_or("missing job")?;
// Deserialize the job
let job: hero_job::Job = serde_json::from_value(job_value.clone())
.map_err(|e| format!("invalid job format: {}", e))?;
let job_id = job.id.clone();
let runner_name = job.runner.clone();
// Verify signatures
job.verify_signatures()
.map_err(|e| format!("signature verification failed: {}", e))?;
info!("Job {} signature verification passed for signatories: {:?}",
job_id, job.signatories());
// Queue the job without waiting
let mut supervisor_guard = self.supervisor.lock().await;
supervisor_guard.queue_job_to_runner(&runner_name, job)
.await
.map_err(|e| format!("failed to queue job: {}", e))?;
Ok(json!({
"job_id": job_id,
"status": "queued"
}))
} else {
Err("invalid job.start params".into())
}
}
"job.status" => {
if let Some(_job_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) {
// TODO: Implement actual job status lookup
Ok(json!({"status": "completed"}))
} else {
Err("invalid job.status params".into())
}
}
"job.result" => {
if let Some(_job_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) {
// TODO: Implement actual job result lookup
Ok(json!({"success": "job completed successfully"}))
} else {
Err("invalid job.result params".into())
}
}
"rpc.discover" => {
let methods = vec![
"list_runners", "register_runner", "start_runner", "stop_runner",
"get_runner_status", "get_all_runner_status", "start_all", "stop_all",
"job.run", "job.start", "job.status", "job.result", "rpc.discover"
];
Ok(json!(methods))
}
_ => {
error!("Unknown method: {}", method);
Err(format!("unknown method: {}", method).into())
}
}
}
/// Message polling loop that listens for incoming messages
async fn message_loop(
supervisor: Arc<Mutex<Supervisor>>,
http_client: HttpClient,
mycelium_url: String,
topic: String,
running: Arc<Mutex<bool>>,
) {
info!("Starting message polling loop for topic: {} (base64: {})", topic, base64::engine::general_purpose::STANDARD.encode(topic.as_bytes()));
while {
let running_guard = running.lock().await;
*running_guard
} {
// Poll for messages using Mycelium JSON-RPC API
// Topic needs to be base64 encoded for the RPC API
let topic_b64 = base64::engine::general_purpose::STANDARD.encode(topic.as_bytes());
let poll_request = json!({
"jsonrpc": "2.0",
"method": "popMessage",
"params": [null, 1, topic_b64], // Reduced timeout to 1 second
"id": 1
});
debug!("Polling for messages with request: {}", poll_request);
match tokio::time::timeout(
Duration::from_secs(10),
http_client.post(&mycelium_url).json(&poll_request).send()
).await {
Ok(Ok(response)) => {
if response.status().is_success() {
match response.json::<Value>().await {
Ok(rpc_response) => {
if let Some(message) = rpc_response.get("result") {
debug!("Received message: {}", message);
// Extract message details
if let (Some(payload), Some(src_ip), Some(msg_id)) = (
message.get("payload").and_then(|v| v.as_str()),
message.get("srcIp").and_then(|v| v.as_str()),
message.get("id").and_then(|v| v.as_str()),
) {
// Create a temporary integration instance to handle the message
let integration = MyceliumIntegration {
supervisor: Arc::clone(&supervisor),
mycelium_url: mycelium_url.clone(),
http_client: http_client.clone(),
topic: topic.clone(),
running: Arc::clone(&running),
};
let reply_info = Some((src_ip.to_string(), msg_id.to_string()));
if let Err(e) = integration.handle_supervisor_message(payload, reply_info).await {
error!("Error handling supervisor message: {}", e);
}
}
} else if let Some(error) = rpc_response.get("error") {
let error_code = error.get("code").and_then(|c| c.as_i64()).unwrap_or(0);
if error_code == -32014 {
// Timeout - no message available, continue polling
trace!("No messages available (timeout)");
} else {
error!("Mycelium RPC error: {}", error);
sleep(Duration::from_secs(1)).await;
}
} else {
trace!("No messages available");
}
}
Err(e) => {
error!("Failed to parse RPC response JSON: {}", e);
}
}
} else {
let status = response.status();
let text = response.text().await.unwrap_or_default();
error!("Message polling error: {} - {}", status, text);
sleep(Duration::from_secs(1)).await;
}
}
Ok(Err(e)) => {
error!("HTTP request failed: {}", e);
sleep(Duration::from_secs(1)).await;
}
Err(_) => {
error!("Polling request timed out after 10 seconds");
sleep(Duration::from_secs(1)).await;
}
}
}
info!("Message polling loop stopped");
}
}
// Legacy type alias for backward compatibility
pub type MyceliumServer = MyceliumIntegration;

1299
core/src/openrpc.rs Normal file

File diff suppressed because it is too large Load Diff

230
core/src/openrpc/tests.rs Normal file
View File

@@ -0,0 +1,230 @@
//! Tests for the new job API methods
#[cfg(test)]
mod job_api_tests {
use super::super::*;
use crate::supervisor::{Supervisor, SupervisorBuilder};
use crate::job::{Job, JobBuilder};
use std::sync::Arc;
use tokio::sync::Mutex;
use serde_json::json;
async fn create_test_supervisor() -> Arc<Mutex<Supervisor>> {
let supervisor = SupervisorBuilder::new()
.redis_url("redis://localhost:6379")
.namespace("test_job_api")
.build()
.await
.unwrap_or_else(|_| Supervisor::default());
let mut supervisor = supervisor;
supervisor.add_admin_secret("test-admin-secret".to_string());
supervisor.add_user_secret("test-user-secret".to_string());
Arc::new(Mutex::new(supervisor))
}
fn create_test_job() -> Job {
JobBuilder::new()
.id("test-job-123".to_string())
.caller_id("test-client".to_string())
.context_id("test-context".to_string())
.script("print('Hello World')".to_string())
.script_type(crate::job::ScriptType::Osis)
.timeout(30)
.build()
.unwrap()
}
#[tokio::test]
async fn test_jobs_create() {
let supervisor = create_test_supervisor().await;
let job = create_test_job();
let params = RunJobParams {
secret: "test-user-secret".to_string(),
job: job.clone(),
};
let result = supervisor.jobs_create(params).await;
assert!(result.is_ok());
let job_id = result.unwrap();
assert_eq!(job_id, job.id);
}
#[tokio::test]
async fn test_jobs_create_invalid_secret() {
let supervisor = create_test_supervisor().await;
let job = create_test_job();
let params = RunJobParams {
secret: "invalid-secret".to_string(),
job,
};
let result = supervisor.jobs_create(params).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_jobs_list() {
let supervisor = create_test_supervisor().await;
let result = supervisor.jobs_list().await;
// Should not error even if Redis is not available (will return empty list or error)
// The important thing is that the method signature works
assert!(result.is_ok() || result.is_err());
}
#[tokio::test]
async fn test_job_run_success_format() {
let supervisor = create_test_supervisor().await;
let job = create_test_job();
let params = RunJobParams {
secret: "test-user-secret".to_string(),
job,
};
let result = supervisor.job_run(params).await;
// The result should be a JobResult enum
match result {
Ok(JobResult::Success { success: _ }) => {
// Success case - job executed and returned output
},
Ok(JobResult::Error { error: _ }) => {
// Error case - job failed but method worked
},
Err(_) => {
// Method error (authentication, etc.)
// This is acceptable for testing without actual runners
}
}
}
#[tokio::test]
async fn test_job_start() {
let supervisor = create_test_supervisor().await;
let params = StartJobParams {
secret: "test-user-secret".to_string(),
job_id: "test-job-123".to_string(),
};
let result = supervisor.job_start(params).await;
// Should fail gracefully if job doesn't exist
assert!(result.is_err() || result.is_ok());
}
#[tokio::test]
async fn test_job_start_invalid_secret() {
let supervisor = create_test_supervisor().await;
let params = StartJobParams {
secret: "invalid-secret".to_string(),
job_id: "test-job-123".to_string(),
};
let result = supervisor.job_start(params).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_job_status() {
let supervisor = create_test_supervisor().await;
let result = supervisor.job_status("test-job-123".to_string()).await;
// Should return error for non-existent job
assert!(result.is_err());
}
#[tokio::test]
async fn test_job_result() {
let supervisor = create_test_supervisor().await;
let result = supervisor.job_result("test-job-123".to_string()).await;
// Should return error for non-existent job
assert!(result.is_err());
}
#[test]
fn test_job_result_enum_serialization() {
let success_result = JobResult::Success {
success: "Job completed successfully".to_string(),
};
let serialized = serde_json::to_string(&success_result).unwrap();
assert!(serialized.contains("success"));
assert!(serialized.contains("Job completed successfully"));
let error_result = JobResult::Error {
error: "Job failed with error".to_string(),
};
let serialized = serde_json::to_string(&error_result).unwrap();
assert!(serialized.contains("error"));
assert!(serialized.contains("Job failed with error"));
}
#[test]
fn test_job_status_response_serialization() {
let status_response = JobStatusResponse {
job_id: "test-job-123".to_string(),
status: "running".to_string(),
created_at: "2023-01-01T00:00:00Z".to_string(),
started_at: Some("2023-01-01T00:00:05Z".to_string()),
completed_at: None,
};
let serialized = serde_json::to_string(&status_response).unwrap();
assert!(serialized.contains("test-job-123"));
assert!(serialized.contains("running"));
assert!(serialized.contains("2023-01-01T00:00:00Z"));
assert!(serialized.contains("2023-01-01T00:00:05Z"));
let deserialized: JobStatusResponse = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized.job_id, "test-job-123");
assert_eq!(deserialized.status, "running");
assert_eq!(deserialized.started_at, Some("2023-01-01T00:00:05Z".to_string()));
assert_eq!(deserialized.completed_at, None);
}
#[test]
fn test_start_job_params_serialization() {
let params = StartJobParams {
secret: "test-secret".to_string(),
job_id: "job-123".to_string(),
};
let serialized = serde_json::to_string(&params).unwrap();
assert!(serialized.contains("test-secret"));
assert!(serialized.contains("job-123"));
let deserialized: StartJobParams = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized.secret, "test-secret");
assert_eq!(deserialized.job_id, "job-123");
}
#[test]
fn test_method_naming_convention() {
// Test that method names follow the jobs./job. convention
// These should be the actual method names in the trait
let jobs_methods = vec!["jobs.create", "jobs.list"];
let job_methods = vec!["job.run", "job.start", "job.status", "job.result"];
// Verify naming convention
for method in jobs_methods {
assert!(method.starts_with("jobs."));
}
for method in job_methods {
assert!(method.starts_with("job."));
}
}
}

207
core/src/runner.rs Normal file
View File

@@ -0,0 +1,207 @@
//! Runner implementation for actor process management.
// use sal_service_manager::{ProcessManagerError as ServiceProcessManagerError, ProcessStatus, ProcessConfig};
/// Simple process status enum to replace sal_service_manager dependency
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum ProcessStatus {
NotStarted,
Starting,
Running,
Stopping,
Stopped,
Failed,
Error(String),
}
/// Simple process config to replace sal_service_manager dependency
#[derive(Debug, Clone)]
pub struct ProcessConfig {
pub command: String,
pub args: Vec<String>,
pub working_dir: Option<String>,
pub env_vars: Vec<(String, String)>,
}
impl ProcessConfig {
pub fn new(command: String, args: Vec<String>, working_dir: Option<String>, env_vars: Vec<(String, String)>) -> Self {
Self {
command,
args,
working_dir,
env_vars,
}
}
}
/// Simple process manager error to replace sal_service_manager dependency
#[derive(Debug, thiserror::Error)]
pub enum ProcessManagerError {
#[error("Process execution failed: {0}")]
ExecutionFailed(String),
#[error("Process not found: {0}")]
ProcessNotFound(String),
#[error("IO error: {0}")]
IoError(String),
}
use std::path::PathBuf;
/// Represents the current status of an actor/runner (alias for ProcessStatus)
pub type RunnerStatus = ProcessStatus;
/// Log information structure with serialization support
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct LogInfo {
pub timestamp: String,
pub level: String,
pub message: String,
}
/// Runner configuration and state (merged from RunnerConfig)
#[derive(Debug, Clone)]
pub struct Runner {
/// Unique identifier for the runner
pub id: String,
pub name: String,
pub namespace: String,
/// Path to the actor binary
pub command: PathBuf, // Command to run runner by, used only if supervisor is used to run runners
/// Redis URL for job queue
pub redis_url: String,
/// Additional command-line arguments
pub extra_args: Vec<String>,
}
impl Runner {
/// Create a new runner from configuration
pub fn from_config(config: RunnerConfig) -> Self {
Self {
id: config.id,
name: config.name,
namespace: config.namespace,
command: config.command,
redis_url: config.redis_url,
extra_args: config.extra_args,
}
}
/// Create a new runner with extra arguments
pub fn with_args(
id: String,
name: String,
namespace: String,
command: PathBuf,
redis_url: String,
extra_args: Vec<String>,
) -> Self {
Self {
id,
name,
namespace,
command,
redis_url,
extra_args,
}
}
/// Get the queue key for this runner with the given namespace
pub fn get_queue(&self) -> String {
if self.namespace == "" {
format!("runner:{}", self.name)
} else {
format!("{}:runner:{}", self.namespace, self.name)
}
}
}
/// Result type for runner operations
pub type RunnerResult<T> = Result<T, RunnerError>;
/// Errors that can occur during runner operations
#[derive(Debug, thiserror::Error)]
pub enum RunnerError {
#[error("Actor '{actor_id}' not found")]
ActorNotFound { actor_id: String },
#[error("Actor '{actor_id}' is already running")]
ActorAlreadyRunning { actor_id: String },
#[error("Actor '{actor_id}' is not running")]
ActorNotRunning { actor_id: String },
#[error("Failed to start actor '{actor_id}': {reason}")]
StartupFailed { actor_id: String, reason: String },
#[error("Failed to stop actor '{actor_id}': {reason}")]
StopFailed { actor_id: String, reason: String },
#[error("Timeout waiting for actor '{actor_id}' to start")]
StartupTimeout { actor_id: String },
#[error("Job queue error for actor '{actor_id}': {reason}")]
QueueError { actor_id: String, reason: String },
#[error("Process manager error: {source}")]
ProcessManagerError {
#[from]
source: ProcessManagerError,
},
#[error("Configuration error: {reason}")]
ConfigError { reason: String },
#[error("Invalid secret: {0}")]
InvalidSecret(String),
#[error("IO error: {source}")]
IoError {
#[from]
source: std::io::Error,
},
#[error("Redis error: {source}")]
RedisError {
#[from]
source: redis::RedisError,
},
#[error("Job error: {source}")]
JobError {
#[from]
source: hero_job::JobError,
},
#[error("Job client error: {source}")]
JobClientError {
#[from]
source: hero_job_client::ClientError,
},
#[error("Job '{job_id}' not found")]
JobNotFound { job_id: String },
#[error("Authentication error: {message}")]
AuthenticationError { message: String },
}
// Type alias for backward compatibility
pub type RunnerConfig = Runner;
/// Convert Runner to ProcessConfig
pub fn runner_to_process_config(config: &Runner) -> ProcessConfig {
let mut args = vec![
config.id.clone(), // First positional argument is the runner ID
"--redis-url".to_string(),
config.redis_url.clone(),
];
// Add extra arguments (e.g., context configurations)
args.extend(config.extra_args.clone());
ProcessConfig::new(
config.command.to_string_lossy().to_string(),
args,
Some("/tmp".to_string()), // Default working directory since Runner doesn't have working_dir field
vec![]
)
}

312
core/src/services.rs Normal file
View File

@@ -0,0 +1,312 @@
//! Service layer for persistent storage of keys, runners, and jobs
//!
//! This module provides database/storage services for the supervisor.
//! Currently uses in-memory storage, but designed to be easily extended
//! to use Redis, PostgreSQL, or other persistent storage backends.
use crate::auth::{ApiKey, ApiKeyScope};
use crate::job::Job;
use crate::runner::Runner;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use serde::{Deserialize, Serialize};
/// Service for managing API keys
#[derive(Debug, Clone)]
pub struct ApiKeyService {
store: Arc<Mutex<HashMap<String, ApiKey>>>,
}
impl ApiKeyService {
/// Create a new API key service
pub fn new() -> Self {
Self {
store: Arc::new(Mutex::new(HashMap::new())),
}
}
/// Store an API key
pub async fn store(&self, key: ApiKey) -> Result<(), String> {
let mut store = self.store.lock().await;
store.insert(key.key.clone(), key);
Ok(())
}
/// Get an API key by its key string
pub async fn get(&self, key: &str) -> Option<ApiKey> {
let store = self.store.lock().await;
store.get(key).cloned()
}
/// List all API keys
pub async fn list(&self) -> Vec<ApiKey> {
let store = self.store.lock().await;
store.values().cloned().collect()
}
/// Remove an API key
pub async fn remove(&self, key: &str) -> Option<ApiKey> {
let mut store = self.store.lock().await;
store.remove(key)
}
/// Count API keys by scope
pub async fn count_by_scope(&self, scope: ApiKeyScope) -> usize {
let store = self.store.lock().await;
store.values().filter(|k| k.scope == scope).count()
}
/// Clear all API keys (for testing)
pub async fn clear(&self) {
let mut store = self.store.lock().await;
store.clear();
}
}
impl Default for ApiKeyService {
fn default() -> Self {
Self::new()
}
}
/// Service for managing runners
#[derive(Debug, Clone)]
pub struct RunnerService {
store: Arc<Mutex<HashMap<String, RunnerMetadata>>>,
}
/// Metadata about a runner for storage
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RunnerMetadata {
pub id: String,
pub name: String,
pub queue: String,
pub registered_at: String,
pub registered_by: String, // API key name that registered this runner
}
impl RunnerService {
/// Create a new runner service
pub fn new() -> Self {
Self {
store: Arc::new(Mutex::new(HashMap::new())),
}
}
/// Store runner metadata
pub async fn store(&self, metadata: RunnerMetadata) -> Result<(), String> {
let mut store = self.store.lock().await;
store.insert(metadata.id.clone(), metadata);
Ok(())
}
/// Get runner metadata by ID
pub async fn get(&self, id: &str) -> Option<RunnerMetadata> {
let store = self.store.lock().await;
store.get(id).cloned()
}
/// List all runners
pub async fn list(&self) -> Vec<RunnerMetadata> {
let store = self.store.lock().await;
store.values().cloned().collect()
}
/// Remove a runner
pub async fn remove(&self, id: &str) -> Option<RunnerMetadata> {
let mut store = self.store.lock().await;
store.remove(id)
}
/// Count total runners
pub async fn count(&self) -> usize {
let store = self.store.lock().await;
store.len()
}
/// Clear all runners (for testing)
pub async fn clear(&self) {
let mut store = self.store.lock().await;
store.clear();
}
}
impl Default for RunnerService {
fn default() -> Self {
Self::new()
}
}
/// Service for managing jobs
#[derive(Debug, Clone)]
pub struct JobService {
store: Arc<Mutex<HashMap<String, JobMetadata>>>,
}
/// Metadata about a job for storage
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobMetadata {
pub job_id: String,
pub runner: String,
pub created_at: String,
pub created_by: String, // API key name that created this job
pub status: String,
pub job: Job,
}
impl JobService {
/// Create a new job service
pub fn new() -> Self {
Self {
store: Arc::new(Mutex::new(HashMap::new())),
}
}
/// Store job metadata
pub async fn store(&self, metadata: JobMetadata) -> Result<(), String> {
let mut store = self.store.lock().await;
store.insert(metadata.job_id.clone(), metadata);
Ok(())
}
/// Get job metadata by ID
pub async fn get(&self, job_id: &str) -> Option<JobMetadata> {
let store = self.store.lock().await;
store.get(job_id).cloned()
}
/// List all jobs
pub async fn list(&self) -> Vec<JobMetadata> {
let store = self.store.lock().await;
store.values().cloned().collect()
}
/// List jobs by runner
pub async fn list_by_runner(&self, runner: &str) -> Vec<JobMetadata> {
let store = self.store.lock().await;
store.values()
.filter(|j| j.runner == runner)
.cloned()
.collect()
}
/// List jobs by creator (API key name)
pub async fn list_by_creator(&self, creator: &str) -> Vec<JobMetadata> {
let store = self.store.lock().await;
store.values()
.filter(|j| j.created_by == creator)
.cloned()
.collect()
}
/// Update job status
pub async fn update_status(&self, job_id: &str, status: String) -> Result<(), String> {
let mut store = self.store.lock().await;
if let Some(metadata) = store.get_mut(job_id) {
metadata.status = status;
Ok(())
} else {
Err(format!("Job not found: {}", job_id))
}
}
/// Remove a job
pub async fn remove(&self, job_id: &str) -> Option<JobMetadata> {
let mut store = self.store.lock().await;
store.remove(job_id)
}
/// Count total jobs
pub async fn count(&self) -> usize {
let store = self.store.lock().await;
store.len()
}
/// Clear all jobs (for testing)
pub async fn clear(&self) {
let mut store = self.store.lock().await;
store.clear();
}
}
impl Default for JobService {
fn default() -> Self {
Self::new()
}
}
/// Combined service container for all storage services
#[derive(Debug, Clone)]
pub struct Services {
pub api_keys: ApiKeyService,
pub runners: RunnerService,
pub jobs: JobService,
}
impl Services {
/// Create a new services container
pub fn new() -> Self {
Self {
api_keys: ApiKeyService::new(),
runners: RunnerService::new(),
jobs: JobService::new(),
}
}
/// Clear all data (for testing)
pub async fn clear_all(&self) {
self.api_keys.clear().await;
self.runners.clear().await;
self.jobs.clear().await;
}
}
impl Default for Services {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_api_key_service() {
let service = ApiKeyService::new();
let key = ApiKey {
key: "test-key".to_string(),
name: "test".to_string(),
scope: ApiKeyScope::User,
};
service.store(key.clone()).await.unwrap();
assert_eq!(service.get("test-key").await.unwrap().name, "test");
assert_eq!(service.list().await.len(), 1);
service.remove("test-key").await;
assert!(service.get("test-key").await.is_none());
}
#[tokio::test]
async fn test_runner_service() {
let service = RunnerService::new();
let metadata = RunnerMetadata {
id: "runner1".to_string(),
name: "runner1".to_string(),
queue: "queue1".to_string(),
registered_at: "2024-01-01".to_string(),
registered_by: "admin".to_string(),
};
service.store(metadata.clone()).await.unwrap();
assert_eq!(service.get("runner1").await.unwrap().name, "runner1");
assert_eq!(service.count().await, 1);
service.remove("runner1").await;
assert!(service.get("runner1").await.is_none());
}
}

1107
core/src/supervisor.rs Normal file

File diff suppressed because it is too large Load Diff