fix mycelium impl

This commit is contained in:
Timur Gordon
2025-09-01 16:34:24 +02:00
parent 44b1dd4249
commit 77b4c66a10
6 changed files with 667 additions and 338 deletions

View File

@@ -5,8 +5,7 @@
//! then pass it to SupervisorApp for runtime management.
use crate::Supervisor;
use crate::openrpc::start_openrpc_servers;
use crate::mycelium::MyceliumServer;
use crate::mycelium::MyceliumIntegration;
use log::{info, error, debug};
use std::sync::Arc;
use tokio::sync::Mutex;
@@ -14,24 +13,24 @@ use tokio::sync::Mutex;
/// Main supervisor application
pub struct SupervisorApp {
pub supervisor: Supervisor,
pub bind_address: String,
pub port: u16,
pub mycelium_url: String,
pub topic: String,
}
impl SupervisorApp {
/// Create a new supervisor application with a built supervisor
pub fn new(supervisor: Supervisor, bind_address: String, port: u16) -> Self {
pub fn new(supervisor: Supervisor, mycelium_url: String, topic: String) -> Self {
Self {
supervisor,
bind_address,
port,
mycelium_url,
topic,
}
}
/// Start the complete supervisor application
/// This method handles the entire application lifecycle:
/// - Starts all configured runners
/// - Launches the OpenRPC server
/// - Connects to Mycelium daemon for message transport
/// - Sets up graceful shutdown handling
/// - Keeps the application running
pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> {
@@ -40,11 +39,8 @@ impl SupervisorApp {
// Start all configured runners
self.start_all().await?;
// Start OpenRPC server
self.start_openrpc_server().await?;
// Start Mycelium server
self.start_mycelium_server().await?;
// Start Mycelium integration
self.start_mycelium_integration().await?;
// Set up graceful shutdown
self.setup_graceful_shutdown().await;
@@ -56,27 +52,33 @@ impl SupervisorApp {
Ok(())
}
/// Start the OpenRPC server
async fn start_openrpc_server(&self) -> Result<(), Box<dyn std::error::Error>> {
info!("Starting OpenRPC server...");
/// Start the Mycelium integration
async fn start_mycelium_integration(&self) -> Result<(), Box<dyn std::error::Error>> {
info!("Starting Mycelium integration...");
let supervisor_for_openrpc = Arc::new(Mutex::new(self.supervisor.clone()));
let bind_address = self.bind_address.clone();
let port = self.port;
let supervisor_for_mycelium = Arc::new(Mutex::new(self.supervisor.clone()));
let mycelium_url = self.mycelium_url.clone();
let topic = self.topic.clone();
// Start the OpenRPC server in a background task
let server_handle = tokio::spawn(async move {
if let Err(e) = start_openrpc_servers(supervisor_for_openrpc, &bind_address, port).await {
error!("OpenRPC server error: {}", e);
let mycelium_integration = MyceliumIntegration::new(
supervisor_for_mycelium,
mycelium_url,
topic,
);
// Start the Mycelium integration in a background task
let integration_handle = tokio::spawn(async move {
if let Err(e) = mycelium_integration.start().await {
error!("Mycelium integration error: {}", e);
}
});
// Give the server a moment to start
// Give the integration a moment to start
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
info!("OpenRPC server started successfully");
info!("Mycelium integration started successfully");
// Store the handle for potential cleanup (we could add this to the struct if needed)
std::mem::forget(server_handle); // For now, let it run in background
// Store the handle for potential cleanup
std::mem::forget(integration_handle); // For now, let it run in background
Ok(())
}
@@ -150,36 +152,6 @@ impl SupervisorApp {
Ok(())
}
/// Start the Mycelium server
async fn start_mycelium_server(&self) -> Result<(), Box<dyn std::error::Error>> {
info!("Starting Mycelium server...");
let supervisor_for_mycelium = Arc::new(Mutex::new(self.supervisor.clone()));
let mycelium_port = 8990; // Standard Mycelium port
let bind_address = "127.0.0.1".to_string();
let mycelium_server = MyceliumServer::new(
supervisor_for_mycelium,
bind_address,
mycelium_port,
);
// Start the Mycelium server in a background task
let server_handle = tokio::spawn(async move {
if let Err(e) = mycelium_server.start().await {
error!("Mycelium server error: {}", e);
}
});
// Give the server a moment to start
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
info!("Mycelium server started successfully on port {}", mycelium_port);
// Store the handle for potential cleanup
std::mem::forget(server_handle); // For now, let it run in background
Ok(())
}
/// Get status of all runners
pub async fn get_status(&self) -> Result<Vec<(String, String)>, Box<dyn std::error::Error>> {

View File

@@ -16,4 +16,4 @@ pub use supervisor::{Supervisor, SupervisorBuilder, ProcessManagerType};
pub use hero_job::{Job, JobBuilder, JobStatus, JobError};
pub use hero_job::Client;
pub use app::SupervisorApp;
pub use mycelium::MyceliumServer;
pub use mycelium::{MyceliumIntegration, MyceliumServer};

View File

@@ -1,297 +1,309 @@
//! # Mycelium Server Integration for Hero Supervisor
//! # Mycelium Integration for Hero Supervisor
//!
//! This module implements a Mycelium-compatible JSON-RPC server that bridges
//! Mycelium transport messages to the supervisor's OpenRPC interface.
//! This module integrates the supervisor with Mycelium's message transport system.
//! Instead of running its own server, it connects to an existing Mycelium daemon
//! and listens for incoming supervisor RPC messages.
use std::sync::Arc;
use std::collections::HashMap;
use tokio::sync::Mutex;
use serde_json::{Value, json};
use log::{info, error, debug};
use base64::Engine;
use reqwest::Client as HttpClient;
use crate::Supervisor;
/// Mycelium server that handles pushMessage calls and forwards them to supervisor
pub struct MyceliumServer {
/// Mycelium integration that connects to a Mycelium daemon and handles supervisor RPC messages
pub struct MyceliumIntegration {
supervisor: Arc<Mutex<Supervisor>>,
bind_address: String,
port: u16,
mycelium_url: String,
http_client: HttpClient,
topic: String,
message_handlers: Arc<Mutex<HashMap<String, tokio::task::JoinHandle<()>>>>,
}
impl MyceliumServer {
pub fn new(supervisor: Arc<Mutex<Supervisor>>, bind_address: String, port: u16) -> Self {
impl MyceliumIntegration {
pub fn new(supervisor: Arc<Mutex<Supervisor>>, mycelium_url: String, topic: String) -> Self {
Self {
supervisor,
bind_address,
port,
mycelium_url,
http_client: HttpClient::new(),
topic,
message_handlers: Arc::new(Mutex::new(HashMap::new())),
}
}
/// Start the Mycelium-compatible JSON-RPC server
/// Start listening for messages on the Mycelium network
pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
use jsonrpsee::server::{ServerBuilder, RpcModule};
use tower_http::cors::{CorsLayer, Any};
info!("Starting Mycelium integration with daemon at {}", self.mycelium_url);
info!("Starting Mycelium server on {}:{}", self.bind_address, self.port);
// Test connection to Mycelium daemon
self.test_connection().await?;
let cors = CorsLayer::new()
.allow_methods(Any)
.allow_headers(Any)
.allow_origin(Any);
info!("Mycelium integration started successfully, listening on topic: {}", self.topic);
let server = ServerBuilder::default()
.build(format!("{}:{}", self.bind_address, self.port))
.await?;
let mut module = RpcModule::new(());
let supervisor_clone = self.supervisor.clone();
// Register pushMessage method
module.register_async_method("pushMessage", move |params, _, _| {
let supervisor = supervisor_clone.clone();
async move {
handle_push_message(supervisor, params).await
}
})?;
// Register messageStatus method (basic implementation)
module.register_async_method("messageStatus", |params, _, _| async move {
handle_message_status(params).await
})?;
let handle = server.start(module);
info!("Mycelium server started successfully on {}:{}", self.bind_address, self.port);
// Keep the server running
handle.stopped().await;
// Note: In a real implementation, we would need to implement a message listener
// that polls or subscribes to incoming messages from the Mycelium daemon.
// For now, this integration works with the existing client-server model
// where clients send pushMessage calls to the Mycelium daemon.
Ok(())
}
}
/// Handle pushMessage calls from Mycelium clients
async fn handle_push_message(
supervisor: Arc<Mutex<Supervisor>>,
params: jsonrpsee::types::Params<'_>,
) -> Result<Value, jsonrpsee::types::ErrorObjectOwned> {
// Parse params as array first, then get the first element
let params_array: Vec<Value> = params.parse()
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some(e.to_string())))?;
let params_value = params_array.get(0)
.ok_or_else(|| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some("missing params object".to_string())))?;
debug!("Received pushMessage: {}", params_value);
// Extract message from params
let message = params_value.get("message")
.ok_or_else(|| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some("missing message".to_string())))?;
// Extract payload (base64 encoded supervisor JSON-RPC)
let payload_b64 = message.get("payload")
.and_then(|v| v.as_str())
.ok_or_else(|| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some("missing payload".to_string())))?;
// Extract topic and destination (for logging/debugging)
let _topic = message.get("topic")
.and_then(|v| v.as_str())
.unwrap_or("supervisor.rpc");
let _dst = message.get("dst");
// Check if this is a reply timeout request
let reply_timeout = params_value.get("reply_timeout")
.and_then(|v| v.as_u64());
// Decode the supervisor JSON-RPC payload
let payload_bytes = base64::engine::general_purpose::STANDARD
.decode(payload_b64)
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some(format!("invalid base64: {}", e))))?;
let supervisor_rpc: Value = serde_json::from_slice(&payload_bytes)
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some(format!("invalid JSON: {}", e))))?;
debug!("Decoded supervisor RPC: {}", supervisor_rpc);
// Extract method and params from supervisor JSON-RPC
let method = supervisor_rpc.get("method")
.and_then(|v| v.as_str())
.ok_or_else(|| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some("missing method".to_string())))?;
let rpc_params = supervisor_rpc.get("params")
.cloned()
.unwrap_or(json!([]));
let rpc_id = supervisor_rpc.get("id").cloned();
// Route to appropriate supervisor method
let result = route_supervisor_call(supervisor, method, rpc_params).await
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32603, "Internal error", Some(e.to_string())))?;
// Generate message ID for tracking
let message_id = format!("{:016x}", rand::random::<u64>());
if let Some(_timeout) = reply_timeout {
// For sync calls, return the supervisor result as an InboundMessage
let supervisor_response = json!({
/// Test connection to Mycelium daemon
async fn test_connection(&self) -> Result<(), Box<dyn std::error::Error>> {
let test_req = json!({
"jsonrpc": "2.0",
"id": rpc_id,
"result": result
"id": 1,
"method": "messageStatus",
"params": [{ "id": "test" }]
});
let response_b64 = base64::engine::general_purpose::STANDARD
.encode(serde_json::to_string(&supervisor_response).unwrap().as_bytes());
let response = self.http_client
.post(&self.mycelium_url)
.json(&test_req)
.send()
.await?;
if response.status().is_success() {
info!("Successfully connected to Mycelium daemon");
Ok(())
} else {
Err(format!("Failed to connect to Mycelium daemon: {}", response.status()).into())
}
}
/// Handle incoming supervisor RPC message (called by Mycelium daemon via pushMessage)
pub async fn handle_supervisor_message(
&self,
payload_b64: &str,
reply_info: Option<(String, String)>, // (src_ip, message_id) for replies
) -> Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
// Decode the supervisor JSON-RPC payload
let payload_bytes = base64::engine::general_purpose::STANDARD
.decode(payload_b64)
.map_err(|e| format!("invalid base64: {}", e))?;
Ok(json!({
"id": message_id,
"srcIp": "127.0.0.1",
"payload": response_b64
}))
} else {
// For async calls, just return the message ID
Ok(json!({
"id": message_id
}))
let supervisor_rpc: Value = serde_json::from_slice(&payload_bytes)
.map_err(|e| format!("invalid JSON: {}", e))?;
debug!("Decoded supervisor RPC: {}", supervisor_rpc);
// Extract method and params from supervisor JSON-RPC
let method = supervisor_rpc.get("method")
.and_then(|v| v.as_str())
.ok_or("missing method")?;
let rpc_params = supervisor_rpc.get("params")
.cloned()
.unwrap_or(json!([]));
let rpc_id = supervisor_rpc.get("id").cloned();
// Route to appropriate supervisor method
let result = self.route_supervisor_call(method, rpc_params).await?;
// If we have reply info, send the response back via Mycelium
if let Some((src_ip, _msg_id)) = reply_info {
let supervisor_response = json!({
"jsonrpc": "2.0",
"id": rpc_id,
"result": result
});
let response_b64 = base64::engine::general_purpose::STANDARD
.encode(serde_json::to_string(&supervisor_response)?.as_bytes());
// Send reply back to the client
self.send_reply(&src_ip, &response_b64).await?;
}
Ok(Some("handled".to_string()))
}
/// Send a reply message back to a client
async fn send_reply(
&self,
dst_ip: &str,
payload_b64: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let reply_req = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "pushMessage",
"params": [{
"message": {
"dst": { "ip": dst_ip },
"topic": self.topic,
"payload": payload_b64
}
}]
});
let _response = self.http_client
.post(&self.mycelium_url)
.json(&reply_req)
.send()
.await?;
debug!("Sent reply to {}", dst_ip);
Ok(())
}
/// Route supervisor method calls to the appropriate supervisor functions
async fn route_supervisor_call(
&self,
method: &str,
params: Value,
) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
let mut supervisor_guard = self.supervisor.lock().await;
match method {
"list_runners" => {
let runners = supervisor_guard.list_runners();
Ok(json!(runners))
}
"register_runner" => {
if let Some(param_obj) = params.as_array().and_then(|arr| arr.get(0)) {
let secret = param_obj.get("secret")
.and_then(|v| v.as_str())
.ok_or("missing secret")?;
let name = param_obj.get("name")
.and_then(|v| v.as_str())
.ok_or("missing name")?;
let queue = param_obj.get("queue")
.and_then(|v| v.as_str())
.ok_or("missing queue")?;
supervisor_guard.register_runner(secret, name, queue).await?;
Ok(json!("success"))
} else {
Err("invalid register_runner params".into())
}
}
"start_runner" => {
if let Some(actor_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) {
supervisor_guard.start_runner(actor_id).await?;
Ok(json!("success"))
} else {
Err("invalid start_runner params".into())
}
}
"stop_runner" => {
if let Some(arr) = params.as_array() {
let actor_id = arr.get(0).and_then(|v| v.as_str()).ok_or("missing actor_id")?;
let force = arr.get(1).and_then(|v| v.as_bool()).unwrap_or(false);
supervisor_guard.stop_runner(actor_id, force).await?;
Ok(json!("success"))
} else {
Err("invalid stop_runner params".into())
}
}
"get_runner_status" => {
if let Some(actor_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) {
let status = supervisor_guard.get_runner_status(actor_id).await?;
Ok(json!(format!("{:?}", status)))
} else {
Err("invalid get_runner_status params".into())
}
}
"get_all_runner_status" => {
let statuses = supervisor_guard.get_all_runner_status().await?;
let status_map: std::collections::HashMap<String, String> = statuses
.into_iter()
.map(|(id, status)| (id, format!("{:?}", status)))
.collect();
Ok(json!(status_map))
}
"start_all" => {
let results = supervisor_guard.start_all().await;
let status_results: Vec<(String, String)> = results
.into_iter()
.map(|(id, result)| {
let status = match result {
Ok(_) => "started".to_string(),
Err(e) => format!("error: {}", e),
};
(id, status)
})
.collect();
Ok(json!(status_results))
}
"stop_all" => {
let force = params.as_array()
.and_then(|arr| arr.get(0))
.and_then(|v| v.as_bool())
.unwrap_or(false);
let results = supervisor_guard.stop_all(force).await;
let status_results: Vec<(String, String)> = results
.into_iter()
.map(|(id, result)| {
let status = match result {
Ok(_) => "stopped".to_string(),
Err(e) => format!("error: {}", e),
};
(id, status)
})
.collect();
Ok(json!(status_results))
}
"job.run" => {
if let Some(param_obj) = params.as_array().and_then(|arr| arr.get(0)) {
let _secret = param_obj.get("secret")
.and_then(|v| v.as_str())
.ok_or("missing secret")?;
let _job = param_obj.get("job")
.ok_or("missing job")?;
// TODO: Implement actual job execution
Ok(json!("job_queued"))
} else {
Err("invalid job.run params".into())
}
}
"job.status" => {
if let Some(_job_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) {
// TODO: Implement actual job status lookup
Ok(json!({"status": "completed"}))
} else {
Err("invalid job.status params".into())
}
}
"job.result" => {
if let Some(_job_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) {
// TODO: Implement actual job result lookup
Ok(json!({"success": "job completed successfully"}))
} else {
Err("invalid job.result params".into())
}
}
"rpc.discover" => {
let methods = vec![
"list_runners", "register_runner", "start_runner", "stop_runner",
"get_runner_status", "get_all_runner_status", "start_all", "stop_all",
"job.run", "job.status", "job.result", "rpc.discover"
];
Ok(json!(methods))
}
_ => {
error!("Unknown method: {}", method);
Err(format!("unknown method: {}", method).into())
}
}
}
}
/// Handle messageStatus calls
async fn handle_message_status(
params: jsonrpsee::types::Params<'_>,
) -> Result<Value, jsonrpsee::types::ErrorObjectOwned> {
// Parse params as array first, then get the first element
let params_array: Vec<Value> = params.parse()
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some(e.to_string())))?;
let params_value = params_array.get(0)
.ok_or_else(|| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some("missing params object".to_string())))?;
let _message_id = params_value.get("id")
.and_then(|v| v.as_str())
.ok_or_else(|| jsonrpsee::types::ErrorObjectOwned::owned(-32602, "Invalid params", Some("missing id".to_string())))?;
// For simplicity, always return "delivered" status
Ok(json!({
"status": "delivered"
}))
}
/// Route supervisor method calls to the appropriate supervisor functions
async fn route_supervisor_call(
supervisor: Arc<Mutex<Supervisor>>,
method: &str,
params: Value,
) -> Result<Value, Box<dyn std::error::Error + Send + Sync>> {
let mut supervisor_guard = supervisor.lock().await;
match method {
"list_runners" => {
let runners = supervisor_guard.list_runners();
Ok(json!(runners))
}
"register_runner" => {
if let Some(param_obj) = params.as_array().and_then(|arr| arr.get(0)) {
let secret = param_obj.get("secret")
.and_then(|v| v.as_str())
.ok_or("missing secret")?;
let name = param_obj.get("name")
.and_then(|v| v.as_str())
.ok_or("missing name")?;
let queue = param_obj.get("queue")
.and_then(|v| v.as_str())
.ok_or("missing queue")?;
supervisor_guard.register_runner(secret, name, queue).await?;
Ok(json!("success"))
} else {
Err("invalid register_runner params".into())
}
}
"start_runner" => {
if let Some(actor_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) {
supervisor_guard.start_runner(actor_id).await?;
Ok(json!("success"))
} else {
Err("invalid start_runner params".into())
}
}
"stop_runner" => {
if let Some(arr) = params.as_array() {
let actor_id = arr.get(0).and_then(|v| v.as_str()).ok_or("missing actor_id")?;
let force = arr.get(1).and_then(|v| v.as_bool()).unwrap_or(false);
supervisor_guard.stop_runner(actor_id, force).await?;
Ok(json!("success"))
} else {
Err("invalid stop_runner params".into())
}
}
"get_runner_status" => {
if let Some(actor_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) {
let status = supervisor_guard.get_runner_status(actor_id).await?;
Ok(json!(format!("{:?}", status)))
} else {
Err("invalid get_runner_status params".into())
}
}
"get_all_runner_status" => {
let statuses = supervisor_guard.get_all_runner_status().await?;
let status_map: std::collections::HashMap<String, String> = statuses
.into_iter()
.map(|(id, status)| (id, format!("{:?}", status)))
.collect();
Ok(json!(status_map))
}
"job.run" => {
if let Some(param_obj) = params.as_array().and_then(|arr| arr.get(0)) {
let secret = param_obj.get("secret")
.and_then(|v| v.as_str())
.ok_or("missing secret")?;
let job = param_obj.get("job")
.ok_or("missing job")?;
// For now, return success - actual job execution would need more integration
Ok(json!("job_queued"))
} else {
Err("invalid job.run params".into())
}
}
"job.status" => {
if let Some(job_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) {
// For now, return a mock status
Ok(json!({"status": "completed"}))
} else {
Err("invalid job.status params".into())
}
}
"job.result" => {
if let Some(job_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) {
// For now, return a mock result
Ok(json!({"success": "job completed successfully"}))
} else {
Err("invalid job.result params".into())
}
}
"rpc.discover" => {
let methods = vec![
"list_runners", "register_runner", "start_runner", "stop_runner",
"get_runner_status", "get_all_runner_status",
"job.run", "job.status", "job.result", "rpc.discover"
];
Ok(json!(methods))
}
_ => {
error!("Unknown method: {}", method);
Err(format!("unknown method: {}", method).into())
}
}
}
// Legacy type alias for backward compatibility
pub type MyceliumServer = MyceliumIntegration;