//! # Mycelium Integration for Hero Supervisor //! //! This module integrates the supervisor with Mycelium's message transport system. //! Instead of running its own server, it connects to an existing Mycelium daemon //! and listens for incoming supervisor RPC messages via HTTP REST API. use std::sync::Arc; use tokio::sync::Mutex; use serde_json::{Value, json}; use log::{info, error, debug, trace}; use base64::Engine; use reqwest::Client as HttpClient; use crate::Supervisor; use tokio::time::{sleep, Duration}; /// Mycelium integration that connects to a Mycelium daemon and handles supervisor RPC messages pub struct MyceliumIntegration { supervisor: Arc>, mycelium_url: String, http_client: HttpClient, topic: String, running: Arc>, } impl MyceliumIntegration { pub fn new(supervisor: Arc>, mycelium_url: String, topic: String) -> Self { Self { supervisor, mycelium_url, http_client: HttpClient::new(), topic, running: Arc::new(Mutex::new(false)), } } /// Start listening for messages on the Mycelium network pub async fn start(&self) -> Result<(), Box> { info!("Starting Mycelium integration with daemon at {}", self.mycelium_url); // Skip connection test for now due to API compatibility issues // TODO: Fix Mycelium API compatibility info!("Skipping connection test - assuming Mycelium daemon is running"); // Set running flag { let mut running = self.running.lock().await; *running = true; } info!("Mycelium integration started successfully, listening on topic: {}", self.topic); // Start message polling loop let supervisor = Arc::clone(&self.supervisor); let http_client = self.http_client.clone(); let mycelium_url = self.mycelium_url.clone(); let topic = self.topic.clone(); let running = Arc::clone(&self.running); tokio::spawn(async move { Self::message_loop(supervisor, http_client, mycelium_url, topic, running).await; }); Ok(()) } /// Test connection to Mycelium daemon using JSON-RPC async fn test_connection(&self) -> Result<(), Box> { let test_request = json!({ "jsonrpc": "2.0", "method": "getInfo", "params": [], "id": 1 }); let response = self.http_client .post(&self.mycelium_url) .json(&test_request) .send() .await?; if response.status().is_success() { let result: Value = response.json().await?; if result.get("result").is_some() { info!("Successfully connected to Mycelium daemon at {}", self.mycelium_url); Ok(()) } else { error!("Mycelium daemon returned error: {}", result); Err("Mycelium daemon returned error".into()) } } else { let status = response.status(); let text = response.text().await.unwrap_or_default(); error!("Failed to connect to Mycelium daemon: {} - {}", status, text); Err(format!("Mycelium connection failed: {}", status).into()) } } /// Handle incoming supervisor RPC message (called by Mycelium daemon via pushMessage) pub async fn handle_supervisor_message( &self, payload_b64: &str, reply_info: Option<(String, String)>, ) -> Result, Box> { // Decode the base64 payload let payload_bytes = base64::engine::general_purpose::STANDARD .decode(payload_b64.as_bytes())?; let payload_str = String::from_utf8(payload_bytes)?; info!("Received supervisor message: {}", payload_str); // Parse the JSON-RPC request let request: Value = serde_json::from_str(&payload_str)?; debug!("Decoded supervisor RPC: {}", request); // Extract method and params from supervisor JSON-RPC let method = request.get("method") .and_then(|v| v.as_str()) .ok_or("missing method")?; let rpc_params = request.get("params") .cloned() .unwrap_or(json!([])); let rpc_id = request.get("id").cloned(); // Route to appropriate supervisor method let result = self.route_supervisor_call(method, rpc_params).await?; // If we have reply info, send the response back via Mycelium if let Some((src_ip, _msg_id)) = reply_info { let supervisor_response = json!({ "jsonrpc": "2.0", "id": rpc_id, "result": result }); let response_b64 = base64::engine::general_purpose::STANDARD .encode(serde_json::to_string(&supervisor_response)?.as_bytes()); info!("Sending response back to client at {}: {}", src_ip, supervisor_response); // Send reply back to the client match self.send_reply(&src_ip, &response_b64).await { Ok(()) => info!("✅ Response sent successfully to {}", src_ip), Err(e) => error!("❌ Failed to send response to {}: {}", src_ip, e), } } Ok(Some("handled".to_string())) } /// Send a reply message back to a client using Mycelium JSON-RPC async fn send_reply( &self, dst_ip: &str, payload_b64: &str, ) -> Result<(), Box> { // Send response to a dedicated response topic let response_topic = "supervisor.response"; let topic_b64 = base64::engine::general_purpose::STANDARD.encode(response_topic.as_bytes()); let message_info = json!({ "dst": { "ip": dst_ip }, "topic": topic_b64, "payload": payload_b64 // payload_b64 is already base64 encoded }); let push_request = json!({ "jsonrpc": "2.0", "method": "pushMessage", "params": [message_info, null], "id": 1 }); let response = self.http_client .post(&self.mycelium_url) .json(&push_request) .send() .await?; if response.status().is_success() { let result: Value = response.json().await?; if result.get("result").is_some() { debug!("Sent reply to {}", dst_ip); Ok(()) } else { error!("Failed to send reply, Mycelium error: {}", result); Err("Mycelium pushMessage failed".into()) } } else { let status = response.status(); let text = response.text().await.unwrap_or_default(); error!("Failed to send reply: {} - {}", status, text); Err(format!("Failed to send reply: {}", status).into()) } } /// Route supervisor method calls to the appropriate supervisor functions async fn route_supervisor_call( &self, method: &str, params: Value, ) -> Result> { let mut supervisor_guard = self.supervisor.lock().await; match method { "list_runners" => { // list_runners doesn't require parameters let runners = supervisor_guard.list_runners(); Ok(json!(runners)) } "register_runner" => { if let Some(param_obj) = params.as_array().and_then(|arr| arr.get(0)) { let secret = param_obj.get("secret") .and_then(|v| v.as_str()) .ok_or("missing secret")?; let name = param_obj.get("name") .and_then(|v| v.as_str()) .ok_or("missing name")?; let queue = param_obj.get("queue") .and_then(|v| v.as_str()) .ok_or("missing queue")?; supervisor_guard.register_runner(secret, name, queue).await?; Ok(json!("success")) } else { Err("invalid register_runner params".into()) } } "start_runner" => { if let Some(actor_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) { supervisor_guard.start_runner(actor_id).await?; Ok(json!("success")) } else { Err("invalid start_runner params".into()) } } "stop_runner" => { if let Some(arr) = params.as_array() { let actor_id = arr.get(0).and_then(|v| v.as_str()).ok_or("missing actor_id")?; let force = arr.get(1).and_then(|v| v.as_bool()).unwrap_or(false); supervisor_guard.stop_runner(actor_id, force).await?; Ok(json!("success")) } else { Err("invalid stop_runner params".into()) } } "get_runner_status" => { if let Some(actor_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) { let status = supervisor_guard.get_runner_status(actor_id).await?; Ok(json!(format!("{:?}", status))) } else { Err("invalid get_runner_status params".into()) } } "get_all_runner_status" => { let statuses = supervisor_guard.get_all_runner_status().await?; let status_map: std::collections::HashMap = statuses .into_iter() .map(|(id, status)| (id, format!("{:?}", status))) .collect(); Ok(json!(status_map)) } "start_all" => { let results = supervisor_guard.runner_start_all().await; let status_results: Vec<(String, String)> = results .into_iter() .map(|(id, result)| { let status = match result { Ok(_) => "started".to_string(), Err(e) => format!("error: {}", e), }; (id, status) }) .collect(); Ok(json!(status_results)) } "stop_all" => { let force = params.as_array() .and_then(|arr| arr.get(0)) .and_then(|v| v.as_bool()) .unwrap_or(false); let results = supervisor_guard.runner_stop_all(force).await; let status_results: Vec<(String, String)> = results .into_iter() .map(|(id, result)| { let status = match result { Ok(_) => "stopped".to_string(), Err(e) => format!("error: {}", e), }; (id, status) }) .collect(); Ok(json!(status_results)) } "job.run" => { // Run job and wait for result (blocking) if let Some(param_obj) = params.as_array().and_then(|arr| arr.get(0)) { let _secret = param_obj.get("secret") .and_then(|v| v.as_str()) .ok_or("missing secret")?; let job_value = param_obj.get("job") .ok_or("missing job")?; let timeout = param_obj.get("timeout") .and_then(|v| v.as_u64()) .unwrap_or(60); // Deserialize the job let job: hero_job::Job = serde_json::from_value(job_value.clone()) .map_err(|e| format!("invalid job format: {}", e))?; let job_id = job.id.clone(); let runner_name = job.runner.clone(); // Verify signatures job.verify_signatures() .map_err(|e| format!("signature verification failed: {}", e))?; info!("Job {} signature verification passed for signatories: {:?}", job_id, job.signatories()); // Queue and wait for result let mut supervisor_guard = self.supervisor.lock().await; let result = supervisor_guard.queue_and_wait(&runner_name, job, timeout) .await .map_err(|e| format!("job execution failed: {}", e))?; Ok(json!({ "job_id": job_id, "status": "completed", "result": result })) } else { Err("invalid job.run params".into()) } } "job.start" => { // Start job without waiting (non-blocking) if let Some(param_obj) = params.as_array().and_then(|arr| arr.get(0)) { let _secret = param_obj.get("secret") .and_then(|v| v.as_str()) .ok_or("missing secret")?; let job_value = param_obj.get("job") .ok_or("missing job")?; // Deserialize the job let job: hero_job::Job = serde_json::from_value(job_value.clone()) .map_err(|e| format!("invalid job format: {}", e))?; let job_id = job.id.clone(); let runner_name = job.runner.clone(); // Verify signatures job.verify_signatures() .map_err(|e| format!("signature verification failed: {}", e))?; info!("Job {} signature verification passed for signatories: {:?}", job_id, job.signatories()); // Queue the job without waiting let mut supervisor_guard = self.supervisor.lock().await; supervisor_guard.queue_job_to_runner(&runner_name, job) .await .map_err(|e| format!("failed to queue job: {}", e))?; Ok(json!({ "job_id": job_id, "status": "queued" })) } else { Err("invalid job.start params".into()) } } "job.status" => { if let Some(_job_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) { // TODO: Implement actual job status lookup Ok(json!({"status": "completed"})) } else { Err("invalid job.status params".into()) } } "job.result" => { if let Some(_job_id) = params.as_array().and_then(|arr| arr.get(0)).and_then(|v| v.as_str()) { // TODO: Implement actual job result lookup Ok(json!({"success": "job completed successfully"})) } else { Err("invalid job.result params".into()) } } "rpc.discover" => { let methods = vec![ "list_runners", "register_runner", "start_runner", "stop_runner", "get_runner_status", "get_all_runner_status", "start_all", "stop_all", "job.run", "job.start", "job.status", "job.result", "rpc.discover" ]; Ok(json!(methods)) } _ => { error!("Unknown method: {}", method); Err(format!("unknown method: {}", method).into()) } } } /// Message polling loop that listens for incoming messages async fn message_loop( supervisor: Arc>, http_client: HttpClient, mycelium_url: String, topic: String, running: Arc>, ) { info!("Starting message polling loop for topic: {} (base64: {})", topic, base64::engine::general_purpose::STANDARD.encode(topic.as_bytes())); while { let running_guard = running.lock().await; *running_guard } { // Poll for messages using Mycelium JSON-RPC API // Topic needs to be base64 encoded for the RPC API let topic_b64 = base64::engine::general_purpose::STANDARD.encode(topic.as_bytes()); let poll_request = json!({ "jsonrpc": "2.0", "method": "popMessage", "params": [null, 1, topic_b64], // Reduced timeout to 1 second "id": 1 }); debug!("Polling for messages with request: {}", poll_request); match tokio::time::timeout( Duration::from_secs(10), http_client.post(&mycelium_url).json(&poll_request).send() ).await { Ok(Ok(response)) => { if response.status().is_success() { match response.json::().await { Ok(rpc_response) => { if let Some(message) = rpc_response.get("result") { debug!("Received message: {}", message); // Extract message details if let (Some(payload), Some(src_ip), Some(msg_id)) = ( message.get("payload").and_then(|v| v.as_str()), message.get("srcIp").and_then(|v| v.as_str()), message.get("id").and_then(|v| v.as_str()), ) { // Create a temporary integration instance to handle the message let integration = MyceliumIntegration { supervisor: Arc::clone(&supervisor), mycelium_url: mycelium_url.clone(), http_client: http_client.clone(), topic: topic.clone(), running: Arc::clone(&running), }; let reply_info = Some((src_ip.to_string(), msg_id.to_string())); if let Err(e) = integration.handle_supervisor_message(payload, reply_info).await { error!("Error handling supervisor message: {}", e); } } } else if let Some(error) = rpc_response.get("error") { let error_code = error.get("code").and_then(|c| c.as_i64()).unwrap_or(0); if error_code == -32014 { // Timeout - no message available, continue polling trace!("No messages available (timeout)"); } else { error!("Mycelium RPC error: {}", error); sleep(Duration::from_secs(1)).await; } } else { trace!("No messages available"); } } Err(e) => { error!("Failed to parse RPC response JSON: {}", e); } } } else { let status = response.status(); let text = response.text().await.unwrap_or_default(); error!("Message polling error: {} - {}", status, text); sleep(Duration::from_secs(1)).await; } } Ok(Err(e)) => { error!("HTTP request failed: {}", e); sleep(Duration::from_secs(1)).await; } Err(_) => { error!("Polling request timed out after 10 seconds"); sleep(Duration::from_secs(1)).await; } } } info!("Message polling loop stopped"); } } // Legacy type alias for backward compatibility pub type MyceliumServer = MyceliumIntegration;