From 059d5131e796355e96467f7d3678b25cd6fad2ed Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Thu, 4 Sep 2025 16:24:15 +0200 Subject: [PATCH] Listen for responses of supervisors Signed-off-by: Lee Smet --- src/clients/mycelium_client.rs | 79 ++++++++++++++ src/clients/supervisor_client.rs | 59 ++++++++++ src/main.rs | 6 +- src/router.rs | 180 +++++++++++++++++++++++++++++-- src/service.rs | 31 ++++++ src/storage/redis.rs | 76 +++++++++++++ 6 files changed, 423 insertions(+), 8 deletions(-) diff --git a/src/clients/mycelium_client.rs b/src/clients/mycelium_client.rs index 5b89867..7750851 100644 --- a/src/clients/mycelium_client.rs +++ b/src/clients/mycelium_client.rs @@ -3,6 +3,8 @@ use std::sync::atomic::{AtomicU64, Ordering}; use reqwest::Client as HttpClient; +use base64::Engine; +use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; use serde_json::{Value, json}; use thiserror::Error; @@ -159,6 +161,83 @@ impl MyceliumClient { .and_then(|v| v.as_str()) .map(|s| s.to_string()) } + /// popMessage: retrieve an inbound message if available (optionally filtered by topic). + /// - peek: if true, do not remove the message from the queue + /// - timeout_secs: seconds to wait for a message (0 returns immediately) + /// - topic_plain: optional plain-text topic which will be base64-encoded per Mycelium spec + /// Returns: + /// - Ok(Some(result_json)) on success, where result_json matches InboundMessage schema + /// - Ok(None) when there is no message ready (Mycelium returns error code 204) + pub async fn pop_message( + &self, + peek: Option, + timeout_secs: Option, + topic_plain: Option<&str>, + ) -> Result, MyceliumClientError> { + // Build params array + let mut params_array = vec![]; + if let Some(p) = peek { + params_array.push(serde_json::Value::Bool(p)); + } else { + params_array.push(serde_json::Value::Null) + } + if let Some(t) = timeout_secs { + params_array.push(serde_json::Value::Number(t.into())); + } else { + params_array.push(serde_json::Value::Null) + } + if let Some(tp) = topic_plain { + let topic_b64 = BASE64_STANDARD.encode(tp.as_bytes()); + params_array.push(serde_json::Value::String(topic_b64)); + } else { + params_array.push(serde_json::Value::Null) + } + + let req = json!({ + "jsonrpc": "2.0", + "id": self.next_id(), + "method": "popMessage", + "params": serde_json::Value::Array(params_array), + }); + + tracing::info!(%req, "calling popMessage"); + + let resp = self.http.post(&self.base_url).json(&req).send().await?; + let status = resp.status(); + let body: Value = resp.json().await?; + + // Handle JSON-RPC error envelope specially for code 204 (no message ready) + if let Some(err) = body.get("error") { + let code = err.get("code").and_then(|v| v.as_i64()).unwrap_or(0); + let msg = err + .get("message") + .and_then(|v| v.as_str()) + .unwrap_or("unknown error"); + + if code == 204 { + // No message ready + return Ok(None); + } + if code == 408 { + // Align with other transport timeout mapping + return Err(MyceliumClientError::TransportTimeout); + } + return Err(MyceliumClientError::RpcError(format!( + "code={code} msg={msg}" + ))); + } + + if !status.is_success() { + return Err(MyceliumClientError::RpcError(format!( + "HTTP {status}, body {body}" + ))); + } + + let result = body.get("result").ok_or_else(|| { + MyceliumClientError::InvalidResponse(format!("missing result in response: {body}")) + })?; + Ok(Some(result.clone())) + } } #[cfg(test)] diff --git a/src/clients/supervisor_client.rs b/src/clients/supervisor_client.rs index 7d48561..f19b592 100644 --- a/src/clients/supervisor_client.rs +++ b/src/clients/supervisor_client.rs @@ -99,6 +99,16 @@ impl SupervisorClient { }) } + /// Build a supervisor JSON-RPC payload but force a specific id (used for correlation). + fn build_supervisor_payload_with_id(&self, method: &str, params: Value, id: u64) -> Value { + json!({ + "jsonrpc": "2.0", + "id": id, + "method": method, + "params": params, + }) + } + fn encode_payload(payload: &Value) -> Result { let s = serde_json::to_string(payload)?; Ok(BASE64_STANDARD.encode(s.as_bytes())) @@ -147,6 +157,42 @@ impl SupervisorClient { ))) } + /// Variant of call that also returns the inner supervisor JSON-RPC id used in the payload. + /// This id is required to correlate asynchronous popMessage replies coming from Mycelium. + pub async fn call_with_ids( + &self, + method: &str, + params: Value, + ) -> Result<(String, u64), SupervisorClientError> { + let inner_id = self.next_id(); + let inner = self.build_supervisor_payload_with_id(method, params, inner_id); + let payload_b64 = Self::encode_payload(&inner)?; + let result = self + .mycelium + .push_message( + &self.destination, + &Self::encode_topic(self.topic.as_bytes()), + &payload_b64, + None, + ) + .await?; + + let out_id = if let Some(id) = MyceliumClient::extract_message_id_from_result(&result) { + id + } else if let Some(arr) = result.as_array() + && arr.len() == 1 + && let Some(id) = MyceliumClient::extract_message_id_from_result(&arr[0]) + { + id + } else { + return Err(SupervisorClientError::InvalidResponse(format!( + "result did not contain message id: {result}" + ))); + }; + + Ok((out_id, inner_id)) + } + /// Synchronous variant: wait for a JSON-RPC reply via Mycelium reply_timeout, and return the inner JSON-RPC "result". /// If the supervisor returns an error object, map to RpcError. pub async fn call_sync( @@ -308,6 +354,19 @@ impl SupervisorClient { self.call("job.run", params).await } + /// Typed wrapper returning both outbound Mycelium id and inner supervisor JSON-RPC id. + pub async fn job_run_with_ids( + &self, + job: Value, + ) -> Result<(String, u64), SupervisorClientError> { + let secret = self.need_secret()?; + let params = json!([{ + "secret": secret, + "job": job + }]); + self.call_with_ids("job.run", params).await + } + pub async fn job_start( &self, job_id: impl Into, diff --git a/src/main.rs b/src/main.rs index 80853a2..7eab6d2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -99,7 +99,7 @@ async fn main() { // Shared application state let state = Arc::new(herocoordinator::rpc::AppState::new(service)); - // Start router workers (auto-discovered contexts) + // Start router workers (auto-discovered contexts) and a single global inbound listener { let base_url = format!("http://{}:{}", cli.mycelium_ip, cli.mycelium_port); let cfg = herocoordinator::router::RouterConfig { @@ -110,6 +110,10 @@ async fn main() { transport_poll_interval_secs: 2, transport_poll_timeout_secs: 300, }; + // Global inbound listener for supervisor replies via Mycelium popMessage + let _inbound_handle = + herocoordinator::router::start_inbound_listener(service_for_router.clone(), cfg.clone()); + // Per-context outbound delivery loops let _auto_handle = herocoordinator::router::start_router_auto(service_for_router, cfg); } diff --git a/src/router.rs b/src/router.rs index df6532e..545228a 100644 --- a/src/router.rs +++ b/src/router.rs @@ -1,5 +1,7 @@ use std::{collections::HashSet, sync::Arc}; +use base64::Engine; +use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; use serde_json::{Value, json}; use tokio::sync::Semaphore; @@ -151,21 +153,25 @@ async fn deliver_one( // Build supervisor method and params from Message let method = msg.message.clone(); let params = build_params(&msg)?; - + // Send // If this is a job.run and we have a secret configured on the client, - // prefer the typed wrapper that injects the secret into inner supervisor params. - let out_id = if method == "job.run" { + // prefer the typed wrapper that injects the secret into inner supervisor params, + // and also capture the inner supervisor JSON-RPC id for correlation. + let (out_id, inner_id_opt) = if method == "job.run" { if let Some(j) = msg.job.first() { let jv = job_to_json(j)?; - // This uses SupervisorClient::job_run, which sets {"secret": "...", "job": } - client.job_run(jv).await? + // Returns (outbound message id, inner supervisor JSON-RPC id) + let (out, inner) = client.job_run_with_ids(jv).await?; + (out, Some(inner)) } else { // Fallback: no embedded job, use the generic call - client.call(&method, params).await? + let out = client.call(&method, params).await?; + (out, None) } } else { - client.call(&method, params).await? + let out = client.call(&method, params).await?; + (out, None) }; // Store transport id and initial Sent status @@ -184,6 +190,13 @@ async fn deliver_one( .update_message_status(context_id, caller_id, id, MessageStatus::Acknowledged) .await?; + // Record correlation (inner supervisor JSON-RPC id -> job/message) for inbound popMessage handling + if let (Some(inner_id), Some(job_id)) = (inner_id_opt, job_id_opt) { + let _ = service + .supcorr_set(inner_id, context_id, caller_id, job_id, id) + .await; + } + // Spawn transport-status poller { let service_poll = service.clone(); @@ -487,3 +500,156 @@ pub fn start_router_auto(service: AppService, cfg: RouterConfig) -> tokio::task: } }) } + +/// Start a single global inbound listener that reads Mycelium popMessage with topic filter, +/// decodes supervisor JSON-RPC replies, and updates correlated jobs/messages. +/// This listens for async replies like {"result":{"job_queued":...}} carrying the same inner JSON-RPC id. +pub fn start_inbound_listener( + service: AppService, + cfg: RouterConfig, +) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + // Initialize Mycelium client (retry loop) + let mycelium = loop { + match MyceliumClient::new(cfg.base_url.clone()) { + Ok(c) => break c, + Err(e) => { + error!(error=%e, "MyceliumClient init error (inbound listener)"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + }; + + loop { + // Poll for inbound supervisor messages on the configured topic + match mycelium + .pop_message(Some(false), Some(20), Some(cfg.topic.as_str())) + .await + { + Ok(Some(inb)) => { + // Expect InboundMessage with base64 "payload" + let Some(payload_b64) = inb.get("payload").and_then(|v| v.as_str()) else { + // Not a payload-bearing message; ignore + continue; + }; + let Ok(raw) = BASE64_STANDARD.decode(payload_b64.as_bytes()) else { + let _ = service + .append_message_logs( + 0, // unknown context yet + 0, + 0, + vec![ + "Inbound payload base64 decode error (supervisor reply)".into(), + ], + ) + .await; + continue; + }; + let Ok(rpc): Result = serde_json::from_slice(&raw) else { + // Invalid JSON payload + continue; + }; + + // Extract inner supervisor JSON-RPC id (number preferred; string fallback) + let inner_id_u64 = match rpc.get("id") { + Some(Value::Number(n)) => n.as_u64(), + Some(Value::String(s)) => s.parse::().ok(), + _ => None, + }; + let Some(inner_id) = inner_id_u64 else { + // Cannot correlate without id + continue; + }; + + // Lookup correlation mapping + match service.supcorr_get(inner_id).await { + Ok(Some((context_id, caller_id, job_id, message_id))) => { + // Determine success/error from supervisor JSON-RPC envelope + let is_success = rpc + .get("result") + .map(|res| { + res.get("job_queued").is_some() + || res.as_str().map(|s| s == "job_queued").unwrap_or(false) + }) + .unwrap_or(false); + + if is_success { + // Set to Dispatched (idempotent) per spec choice, and append log + let _ = service + .update_job_status_unchecked( + context_id, + caller_id, + job_id, + JobStatus::Dispatched, + ) + .await; + let _ = service + .append_message_logs( + context_id, + caller_id, + message_id, + vec![format!( + "Supervisor reply for job {}: job_queued", + job_id + )], + ) + .await; + let _ = service.supcorr_del(inner_id).await; + } else if let Some(err_obj) = rpc.get("error") { + // Error path: set job Error and log details + let _ = service + .update_job_status_unchecked( + context_id, + caller_id, + job_id, + JobStatus::Error, + ) + .await; + let _ = service + .append_message_logs( + context_id, + caller_id, + message_id, + vec![format!( + "Supervisor error for job {}: {}", + job_id, err_obj + )], + ) + .await; + let _ = service.supcorr_del(inner_id).await; + } else { + // Unknown result; keep correlation for a later, clearer reply + let _ = service + .append_message_logs( + context_id, + caller_id, + message_id, + vec![ + "Supervisor reply did not contain job_queued or error" + .to_string(), + ], + ) + .await; + } + } + Ok(None) => { + // No correlation found; ignore or log once + } + Err(e) => { + error!(error=%e, "supcorr_get error"); + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + } + } + } + Ok(None) => { + // No message; continue polling + continue; + } + Err(e) => { + error!(error=%e, "popMessage error"); + tokio::time::sleep(std::time::Duration::from_millis(200)).await; + } + } + } + }) +} diff --git a/src/service.rs b/src/service.rs index 335b7fc..84c9754 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1161,6 +1161,37 @@ impl AppService { pub async fn scan_runners(&self, context_id: u32) -> Result, BoxError> { self.redis.scan_runners(context_id).await } + + /// Correlation map: store mapping from inner supervisor JSON-RPC id to context/caller/job/message. + pub async fn supcorr_set( + &self, + inner_id: u64, + context_id: u32, + caller_id: u32, + job_id: u32, + message_id: u32, + ) -> Result<(), BoxError> { + self.redis + .supcorr_set(inner_id, context_id, caller_id, job_id, message_id) + .await + .map_err(Into::into) + } + + /// Correlation map: load mapping by inner supervisor JSON-RPC id. + pub async fn supcorr_get( + &self, + inner_id: u64, + ) -> Result, BoxError> { + self.redis + .supcorr_get(inner_id) + .await + .map_err(Into::into) + } + + /// Correlation map: delete mapping by inner supervisor JSON-RPC id. + pub async fn supcorr_del(&self, inner_id: u64) -> Result<(), BoxError> { + self.redis.supcorr_del(inner_id).await.map_err(Into::into) + } } /// Auto-discovery helpers for contexts (wrappers over RedisDriver) diff --git a/src/storage/redis.rs b/src/storage/redis.rs index f04e9e5..41f5196 100644 --- a/src/storage/redis.rs +++ b/src/storage/redis.rs @@ -751,4 +751,80 @@ impl RedisDriver { out.sort_unstable(); Ok(out) } + + // ----------------------------- + // Supervisor correlation mapping (DB 0) + // Key: "supcorr:{inner_id_decimal}" + // Value: JSON {"context_id":u32,"caller_id":u32,"job_id":u32,"message_id":u32} + // TTL: 1 hour to avoid leaks in case of crashes + pub async fn supcorr_set( + &self, + inner_id: u64, + context_id: u32, + caller_id: u32, + job_id: u32, + message_id: u32, + ) -> Result<()> { + let mut cm = self.manager_for_db(0).await?; + let key = format!("supcorr:{}", inner_id); + let val = serde_json::json!({ + "context_id": context_id, + "caller_id": caller_id, + "job_id": job_id, + "message_id": message_id, + }) + .to_string(); + // SET key val EX 3600 + let _: () = redis::cmd("SET") + .arg(&key) + .arg(&val) + .arg("EX") + .arg(3600) + .query_async(&mut cm) + .await + .map_err(|e| { + error!(db=0, key=%key, error=%e, "SET supcorr_set failed"); + e + })?; + Ok(()) + } + + pub async fn supcorr_get( + &self, + inner_id: u64, + ) -> Result> { + let mut cm = self.manager_for_db(0).await?; + let key = format!("supcorr:{}", inner_id); + let res: Option = redis::cmd("GET") + .arg(&key) + .query_async(&mut cm) + .await + .map_err(|e| { + error!(db=0, key=%key, error=%e, "GET supcorr_get failed"); + e + })?; + if let Some(s) = res { + let v: Value = serde_json::from_str(&s)?; + let ctx = v.get("context_id").and_then(|x| x.as_u64()).unwrap_or(0) as u32; + let caller = v.get("caller_id").and_then(|x| x.as_u64()).unwrap_or(0) as u32; + let job = v.get("job_id").and_then(|x| x.as_u64()).unwrap_or(0) as u32; + let msg = v.get("message_id").and_then(|x| x.as_u64()).unwrap_or(0) as u32; + return Ok(Some((ctx, caller, job, msg))); + } + Ok(None) + } + + pub async fn supcorr_del(&self, inner_id: u64) -> Result<()> { + let mut cm = self.manager_for_db(0).await?; + let key = format!("supcorr:{}", inner_id); + let _: i64 = redis::cmd("DEL") + .arg(&key) + .query_async(&mut cm) + .await + .map_err(|e| { + error!(db=0, key=%key, error=%e, "DEL supcorr_del failed"); + e + })?; + Ok(()) + } }