diff --git a/src/clients/mod.rs b/src/clients/mod.rs index b9707f9..3d14737 100644 --- a/src/clients/mod.rs +++ b/src/clients/mod.rs @@ -1,8 +1,9 @@ pub mod supervisor_client; pub mod mycelium_client; +pub mod types; +pub use types::Destination; pub use supervisor_client::{ - Destination, SupervisorClient, SupervisorClientError, }; diff --git a/src/clients/mycelium_client.rs b/src/clients/mycelium_client.rs index 58557dc..f4567cf 100644 --- a/src/clients/mycelium_client.rs +++ b/src/clients/mycelium_client.rs @@ -2,16 +2,17 @@ use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use reqwest::Client as HttpClient; -use serde::Deserialize; + use serde_json::{Value, json}; use thiserror::Error; use crate::models::TransportStatus; +use crate::clients::Destination; -/// Lightweight client for querying Mycelium transport status +/// Lightweight client for Mycelium JSON-RPC (send + query status) #[derive(Clone)] pub struct MyceliumClient { - base_url: String, // e.g. http://127.0.0.1:9651 + base_url: String, // e.g. http://127.0.0.1:8990 http: HttpClient, id_counter: Arc, } @@ -22,6 +23,8 @@ pub enum MyceliumClientError { Http(#[from] reqwest::Error), #[error("JSON error: {0}")] Json(#[from] serde_json::Error), + #[error("Transport timed out waiting for a reply (408)")] + TransportTimeout, #[error("JSON-RPC error: {0}")] RpcError(String), #[error("Invalid response: {0}")] @@ -56,6 +59,9 @@ impl MyceliumClient { if let Some(err) = body.get("error") { let code = err.get("code").and_then(|v| v.as_i64()).unwrap_or(0); let msg = err.get("message").and_then(|v| v.as_str()).unwrap_or("unknown error"); + if code == 408 { + return Err(MyceliumClientError::TransportTimeout); + } return Err(MyceliumClientError::RpcError(format!("code={code} msg={msg}"))); } if !status.is_success() { @@ -94,4 +100,109 @@ impl MyceliumClient { _ => None, } } + + /// Build params object for pushMessage without performing any network call. + /// Exposed for serializer-only tests and reuse. + pub(crate) fn build_push_params( + dst: &Destination, + topic: &str, + payload_b64: &str, + reply_timeout: Option, + ) -> Value { + let dst_v = match dst { + Destination::Ip(ip) => json!({ "ip": ip.to_string() }), + Destination::Pk(pk) => json!({ "pk": pk }), + }; + let message = json!({ + "dst": dst_v, + "topic": topic, + "payload": payload_b64, + }); + let mut params = json!({ "message": message }); + if let Some(rt) = reply_timeout { + params["reply_timeout"] = json!(rt); + } + params + } + + /// pushMessage: send a message with dst/topic/payload. Optional reply_timeout for sync replies. + pub async fn push_message( + &self, + dst: &Destination, + topic: &str, + payload_b64: &str, + reply_timeout: Option, + ) -> Result { + let params = Self::build_push_params(dst, topic, payload_b64, reply_timeout); + let body = self.jsonrpc("pushMessage", params).await?; + let result = body.get("result").ok_or_else(|| { + MyceliumClientError::InvalidResponse(format!("missing result in response: {body}")) + })?; + Ok(result.clone()) + } + + /// Helper to extract outbound message id from pushMessage result (InboundMessage or PushMessageResponseId) + pub fn extract_message_id_from_result(result: &Value) -> Option { + result.get("id").and_then(|v| v.as_str()).map(|s| s.to_string()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::clients::Destination; + + #[test] + fn build_push_params_shapes_ip_pk_and_timeout() { + // IP destination + let p1 = MyceliumClient::build_push_params( + &Destination::Ip("2001:db8::1".parse().unwrap()), + "supervisor.rpc", + "Zm9vYmFy", // "foobar" + Some(10), + ); + let msg1 = p1.get("message").unwrap(); + assert_eq!(msg1.get("topic").unwrap().as_str().unwrap(), "supervisor.rpc"); + assert_eq!(msg1.get("payload").unwrap().as_str().unwrap(), "Zm9vYmFy"); + assert_eq!( + msg1.get("dst").unwrap().get("ip").unwrap().as_str().unwrap(), + "2001:db8::1" + ); + assert_eq!(p1.get("reply_timeout").unwrap().as_u64().unwrap(), 10); + + // PK destination without timeout + let p2 = MyceliumClient::build_push_params( + &Destination::Pk("bb39b4a3a4efd70f3e05e37887677e02efbda14681d0acd3882bc0f754792c32".into()), + "supervisor.rpc", + "YmF6", // "baz" + None, + ); + let msg2 = p2.get("message").unwrap(); + assert_eq!( + msg2.get("dst").unwrap().get("pk").unwrap().as_str().unwrap(), + "bb39b4a3a4efd70f3e05e37887677e02efbda14681d0acd3882bc0f754792c32" + ); + assert!(p2.get("reply_timeout").is_none()); + } + + #[test] + fn extract_message_id_variants() { + // PushMessageResponseId + let r1 = json!({"id":"0123456789abcdef"}); + assert_eq!( + MyceliumClient::extract_message_id_from_result(&r1).unwrap(), + "0123456789abcdef" + ); + + // InboundMessage-like + let r2 = json!({ + "id":"fedcba9876543210", + "srcIp":"449:abcd:0123:defa::1", + "payload":"hpV+" + }); + assert_eq!( + MyceliumClient::extract_message_id_from_result(&r2).unwrap(), + "fedcba9876543210" + ); + } } \ No newline at end of file diff --git a/src/clients/supervisor_client.rs b/src/clients/supervisor_client.rs index 4cc4049..80b07d8 100644 --- a/src/clients/supervisor_client.rs +++ b/src/clients/supervisor_client.rs @@ -1,28 +1,20 @@ -use std::net::IpAddr; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use base64::Engine; use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; -use reqwest::Client as HttpClient; use serde_json::{Value, json}; use thiserror::Error; -/// Destination for Mycelium messages -#[derive(Clone, Debug)] -pub enum Destination { - Ip(IpAddr), - Pk(String), // 64-hex public key -} +use crate::clients::{Destination, MyceliumClient, MyceliumClientError}; #[derive(Clone)] pub struct SupervisorClient { - base_url: String, // e.g. "http://127.0.0.1:8990" - destination: Destination, // ip or pk - topic: String, // e.g. "supervisor.rpc" - secret: Option, // optional, required by several supervisor methods - http: HttpClient, - id_counter: Arc, // JSON-RPC id generator (for inner + outer requests) + mycelium: Arc, // Delegated Mycelium transport + destination: Destination, // ip or pk + topic: String, // e.g. "supervisor.rpc" + secret: Option, // optional, required by several supervisor methods + id_counter: Arc, // JSON-RPC id generator (for inner supervisor requests) } #[derive(Debug, Error)] @@ -41,8 +33,37 @@ pub enum SupervisorClientError { MissingSecret, } +impl From for SupervisorClientError { + fn from(e: MyceliumClientError) -> Self { + match e { + MyceliumClientError::TransportTimeout => SupervisorClientError::TransportTimeout, + MyceliumClientError::RpcError(m) => SupervisorClientError::RpcError(m), + MyceliumClientError::InvalidResponse(m) => SupervisorClientError::InvalidResponse(m), + MyceliumClientError::Http(err) => SupervisorClientError::Http(err), + MyceliumClientError::Json(err) => SupervisorClientError::Json(err), + } + } +} + impl SupervisorClient { - /// Create a new client. base_url defaults to Mycelium spec "http://127.0.0.1:8990" if empty. + /// Preferred constructor: provide a shared Mycelium client. + pub fn new_with_client( + mycelium: Arc, + destination: Destination, + topic: impl Into, + secret: Option, + ) -> Self { + Self { + mycelium, + destination, + topic: topic.into(), + secret, + id_counter: Arc::new(AtomicU64::new(1)), + } + } + + /// Backward-compatible constructor that builds a Mycelium client from base_url. + /// base_url defaults to Mycelium spec "http://127.0.0.1:8990" if empty. pub fn new( base_url: impl Into, destination: Destination, @@ -53,21 +74,15 @@ impl SupervisorClient { if url.is_empty() { url = "http://127.0.0.1:8990".to_string(); } - let http = HttpClient::builder().build()?; - Ok(Self { - base_url: url, - destination, - topic: topic.into(), - secret, - http, - id_counter: Arc::new(AtomicU64::new(1)), - }) + let mycelium = Arc::new(MyceliumClient::new(url)?); + Ok(Self::new_with_client(mycelium, destination, topic, secret)) } fn next_id(&self) -> u64 { self.id_counter.fetch_add(1, Ordering::Relaxed) } + /// Internal helper used by tests to inspect dst JSON shape. fn build_dst(&self) -> Value { match &self.destination { Destination::Ip(ip) => json!({ "ip": ip.to_string() }), @@ -89,49 +104,6 @@ impl SupervisorClient { Ok(BASE64_STANDARD.encode(s.as_bytes())) } - fn build_push_request(&self, payload_b64: String) -> Value { - let dst = self.build_dst(); - let msg = json!({ - "dst": dst, - "topic": self.topic, - "payload": payload_b64 - }); - - // Async path: no reply_timeout attached - json!({ - "jsonrpc": "2.0", - "id": self.next_id(), - "method": "pushMessage", - "params": [ { "message": msg } ] - }) - } - - async fn send_push(&self, req: &Value) -> Result { - let resp = self.http.post(&self.base_url).json(req).send().await?; - let status = resp.status(); - let body: Value = resp.json().await?; - // JSON-RPC error object handling - if let Some(err) = body.get("error") { - let code = err.get("code").and_then(|v| v.as_i64()).unwrap_or(0); - let msg = err - .get("message") - .and_then(|v| v.as_str()) - .unwrap_or("unknown error"); - if code == 408 { - return Err(SupervisorClientError::TransportTimeout); - } - return Err(SupervisorClientError::RpcError(format!( - "code={code} msg={msg}" - ))); - } - if !status.is_success() { - return Err(SupervisorClientError::RpcError(format!( - "HTTP status {status}, body {body}" - ))); - } - Ok(body) - } - fn extract_message_id_from_result(result: &Value) -> Option { // Two possibilities per Mycelium spec oneOf: // - PushMessageResponseId: { "id": "0123456789abcdef" } @@ -142,34 +114,28 @@ impl SupervisorClient { .map(|s| s.to_string()) } - /// Generic call: build supervisor JSON-RPC message, wrap in Mycelium pushMessage, return outbound message id (hex). + /// Generic call: build supervisor JSON-RPC message, send via Mycelium pushMessage, return outbound message id (hex). pub async fn call(&self, method: &str, params: Value) -> Result { let inner = self.build_supervisor_payload(method, params); let payload_b64 = Self::encode_payload(&inner)?; - let push_req = self.build_push_request(payload_b64); - let resp = self.send_push(&push_req).await?; + let result = self + .mycelium + .push_message(&self.destination, &self.topic, &payload_b64, None) + .await?; - // Expect "result" to be either inbound message or response id - match resp.get("result") { - Some(res_obj) => { - if let Some(id) = Self::extract_message_id_from_result(res_obj) { - return Ok(id); - } - // Some servers might return the oneOf wrapped, handle len==1 array defensively (not in spec but resilient) - if let Some(arr) = res_obj.as_array() - && arr.len() == 1 - && let Some(id) = Self::extract_message_id_from_result(&arr[0]) - { - return Ok(id); - } - Err(SupervisorClientError::InvalidResponse(format!( - "result did not contain message id: {res_obj}" - ))) - } - None => Err(SupervisorClientError::InvalidResponse(format!( - "missing result in response: {resp}" - ))), + if let Some(id) = MyceliumClient::extract_message_id_from_result(&result) { + return Ok(id); } + // Some servers might return the oneOf wrapped, handle len==1 array defensively (not in spec but resilient) + if let Some(arr) = result.as_array() + && arr.len() == 1 + && let Some(id) = MyceliumClient::extract_message_id_from_result(&arr[0]) + { + return Ok(id); + } + Err(SupervisorClientError::InvalidResponse(format!( + "result did not contain message id: {result}" + ))) } fn need_secret(&self) -> Result<&str, SupervisorClientError> { @@ -334,8 +300,10 @@ impl SupervisorClient { #[cfg(test)] mod tests { use super::*; + use std::net::IpAddr; fn mk_client() -> SupervisorClient { + // Uses the legacy constructor but will not issue real network calls in these tests. SupervisorClient::new( "http://127.0.0.1:8990", Destination::Pk( @@ -368,20 +336,21 @@ mod tests { } #[test] - fn wraps_supervisor_payload_in_push_message() { + fn encodes_supervisor_payload_b64() { let c = mk_client(); let payload = c.build_supervisor_payload("list_runners", json!([])); let b64 = SupervisorClient::encode_payload(&payload).unwrap(); - let req = c.build_push_request(b64); - assert_eq!(req.get("method").unwrap().as_str().unwrap(), "pushMessage"); - let params = req.get("params").unwrap().as_array().unwrap(); - let msg = params[0].get("message").unwrap(); - assert!(msg.get("payload").is_some()); + + // decode and compare round-trip JSON + let raw = base64::engine::general_purpose::STANDARD + .decode(b64.as_bytes()) + .unwrap(); + let decoded: Value = serde_json::from_slice(&raw).unwrap(); assert_eq!( - msg.get("topic").unwrap().as_str().unwrap(), - "supervisor.rpc" + decoded.get("method").unwrap().as_str().unwrap(), + "list_runners" ); - assert!(msg.get("dst").unwrap().get("pk").is_some()); + assert_eq!(decoded.get("jsonrpc").unwrap().as_str().unwrap(), "2.0"); } #[test] diff --git a/src/clients/types.rs b/src/clients/types.rs new file mode 100644 index 0000000..b21533b --- /dev/null +++ b/src/clients/types.rs @@ -0,0 +1,9 @@ +use std::net::IpAddr; + +/// Destination for Mycelium messages (shared by clients) +#[derive(Clone, Debug)] +pub enum Destination { + Ip(IpAddr), + /// 64-hex public key of the receiver node + Pk(String), +} \ No newline at end of file diff --git a/src/router.rs b/src/router.rs index 908b0ff..3177c47 100644 --- a/src/router.rs +++ b/src/router.rs @@ -34,6 +34,18 @@ pub fn start_router(service: AppService, cfg: RouterConfig) -> Vec break Arc::new(c), + Err(e) => { + eprintln!("[router ctx={}] MyceliumClient init error: {}", ctx_id, e); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + }; + loop { // Pop next message key (blocking with timeout) match service_cloned.brpop_msg_out(ctx_id, 1).await { @@ -52,16 +64,19 @@ pub fn start_router(service: AppService, cfg: RouterConfig) -> Vec, ) -> Result<(), Box> { // Parse "message:{caller_id}:{id}" let (caller_id, id) = parse_message_key(msg_key) @@ -120,12 +136,12 @@ async fn deliver_one( } else { Destination::Ip(runner.address) }; - let client = SupervisorClient::new( - cfg.base_url.clone(), + let client = SupervisorClient::new_with_client( + mycelium.clone(), dest, cfg.topic.clone(), None, // secret - )?; + ); // Build supervisor method and params from Message let method = msg.message.clone(); @@ -153,27 +169,14 @@ async fn deliver_one( // Spawn transport-status poller { let service_poll = service.clone(); - let base_url = cfg.base_url.clone(); let poll_interval = std::time::Duration::from_secs(cfg.transport_poll_interval_secs); let poll_timeout = std::time::Duration::from_secs(cfg.transport_poll_timeout_secs); let out_id_cloned = out_id.clone(); + let mycelium = mycelium.clone(); tokio::spawn(async move { let start = std::time::Instant::now(); - let client = match MyceliumClient::new(base_url) { - Ok(c) => c, - Err(e) => { - let _ = service_poll - .append_message_logs( - context_id, - caller_id, - id, - vec![format!("MyceliumClient init error: {e}")], - ) - .await; - return; - } - }; + let client = mycelium; let mut last_status: Option = Some(TransportStatus::Sent);