use std::net::IpAddr; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use base64::Engine; use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; use reqwest::Client as HttpClient; use serde_json::{Value, json}; use thiserror::Error; /// Destination for Mycelium messages #[derive(Clone, Debug)] pub enum Destination { Ip(IpAddr), Pk(String), // 64-hex public key } #[derive(Clone)] pub struct SupervisorClient { base_url: String, // e.g. "http://127.0.0.1:8990" destination: Destination, // ip or pk topic: String, // e.g. "supervisor.rpc" secret: Option, // optional, required by several supervisor methods http: HttpClient, id_counter: Arc, // JSON-RPC id generator (for inner + outer requests) } #[derive(Debug, Error)] pub enum SupervisorClientError { #[error("HTTP error: {0}")] Http(#[from] reqwest::Error), #[error("JSON error: {0}")] Json(#[from] serde_json::Error), #[error("Transport timed out waiting for a reply (408)")] TransportTimeout, #[error("JSON-RPC error: {0}")] RpcError(String), #[error("Invalid response: {0}")] InvalidResponse(String), #[error("Missing secret for method requiring authentication")] MissingSecret, } impl SupervisorClient { /// Create a new client. base_url defaults to Mycelium spec "http://127.0.0.1:8990" if empty. pub fn new( base_url: impl Into, destination: Destination, topic: impl Into, secret: Option, ) -> Result { let mut url = base_url.into(); if url.is_empty() { url = "http://127.0.0.1:8990".to_string(); } let http = HttpClient::builder().build()?; Ok(Self { base_url: url, destination, topic: topic.into(), secret, http, id_counter: Arc::new(AtomicU64::new(1)), }) } fn next_id(&self) -> u64 { self.id_counter.fetch_add(1, Ordering::Relaxed) } fn build_dst(&self) -> Value { match &self.destination { Destination::Ip(ip) => json!({ "ip": ip.to_string() }), Destination::Pk(pk) => json!({ "pk": pk }), } } fn build_supervisor_payload(&self, method: &str, params: Value) -> Value { json!({ "jsonrpc": "2.0", "id": self.next_id(), "method": method, "params": params, }) } fn encode_payload(payload: &Value) -> Result { let s = serde_json::to_string(payload)?; Ok(BASE64_STANDARD.encode(s.as_bytes())) } fn build_push_request(&self, payload_b64: String) -> Value { let dst = self.build_dst(); let msg = json!({ "dst": dst, "topic": self.topic, "payload": payload_b64 }); // Async path: no reply_timeout attached json!({ "jsonrpc": "2.0", "id": self.next_id(), "method": "pushMessage", "params": [ { "message": msg } ] }) } async fn send_push(&self, req: &Value) -> Result { let resp = self.http.post(&self.base_url).json(req).send().await?; let status = resp.status(); let body: Value = resp.json().await?; // JSON-RPC error object handling if let Some(err) = body.get("error") { let code = err.get("code").and_then(|v| v.as_i64()).unwrap_or(0); let msg = err .get("message") .and_then(|v| v.as_str()) .unwrap_or("unknown error"); if code == 408 { return Err(SupervisorClientError::TransportTimeout); } return Err(SupervisorClientError::RpcError(format!( "code={code} msg={msg}" ))); } if !status.is_success() { return Err(SupervisorClientError::RpcError(format!( "HTTP status {status}, body {body}" ))); } Ok(body) } fn extract_message_id_from_result(result: &Value) -> Option { // Two possibilities per Mycelium spec oneOf: // - PushMessageResponseId: { "id": "0123456789abcdef" } // - InboundMessage: object containing "id" plus srcIp, ...; we still return id. result .get("id") .and_then(|v| v.as_str()) .map(|s| s.to_string()) } /// Generic call: build supervisor JSON-RPC message, wrap in Mycelium pushMessage, return outbound message id (hex). pub async fn call(&self, method: &str, params: Value) -> Result { let inner = self.build_supervisor_payload(method, params); let payload_b64 = Self::encode_payload(&inner)?; let push_req = self.build_push_request(payload_b64); let resp = self.send_push(&push_req).await?; // Expect "result" to be either inbound message or response id match resp.get("result") { Some(res_obj) => { if let Some(id) = Self::extract_message_id_from_result(res_obj) { return Ok(id); } // Some servers might return the oneOf wrapped, handle len==1 array defensively (not in spec but resilient) if let Some(arr) = res_obj.as_array() && arr.len() == 1 && let Some(id) = Self::extract_message_id_from_result(&arr[0]) { return Ok(id); } Err(SupervisorClientError::InvalidResponse(format!( "result did not contain message id: {res_obj}" ))) } None => Err(SupervisorClientError::InvalidResponse(format!( "missing result in response: {resp}" ))), } } fn need_secret(&self) -> Result<&str, SupervisorClientError> { self.secret .as_deref() .ok_or(SupervisorClientError::MissingSecret) } // ----------------------------- // Typed wrappers for Supervisor API // Asynchronous-only: returns outbound message id // ----------------------------- // Runners pub async fn list_runners(&self) -> Result { self.call("list_runners", json!([])).await } pub async fn register_runner( &self, name: impl Into, queue: impl Into, ) -> Result { let secret = self.need_secret()?; let params = json!([{ "secret": secret, "name": name.into(), "queue": queue.into() }]); self.call("register_runner", params).await } pub async fn remove_runner( &self, actor_id: impl Into, ) -> Result { self.call("remove_runner", json!([actor_id.into()])).await } pub async fn start_runner( &self, actor_id: impl Into, ) -> Result { self.call("start_runner", json!([actor_id.into()])).await } pub async fn stop_runner( &self, actor_id: impl Into, force: bool, ) -> Result { self.call("stop_runner", json!([actor_id.into(), force])) .await } pub async fn get_runner_status( &self, actor_id: impl Into, ) -> Result { self.call("get_runner_status", json!([actor_id.into()])) .await } pub async fn get_all_runner_status(&self) -> Result { self.call("get_all_runner_status", json!([])).await } pub async fn start_all(&self) -> Result { self.call("start_all", json!([])).await } pub async fn stop_all(&self, force: bool) -> Result { self.call("stop_all", json!([force])).await } pub async fn get_all_status(&self) -> Result { self.call("get_all_status", json!([])).await } // Jobs pub async fn jobs_create(&self, job: Value) -> Result { let secret = self.need_secret()?; let params = json!([{ "secret": secret, "job": job }]); self.call("jobs.create", params).await } pub async fn jobs_list(&self) -> Result { self.call("jobs.list", json!([])).await } pub async fn job_run(&self, job: Value) -> Result { let secret = self.need_secret()?; let params = json!([{ "secret": secret, "job": job }]); self.call("job.run", params).await } pub async fn job_start( &self, job_id: impl Into, ) -> Result { let secret = self.need_secret()?; let params = json!([{ "secret": secret, "job_id": job_id.into() }]); self.call("job.start", params).await } pub async fn job_status( &self, job_id: impl Into, ) -> Result { self.call("job.status", json!([job_id.into()])).await } pub async fn job_result( &self, job_id: impl Into, ) -> Result { self.call("job.result", json!([job_id.into()])).await } pub async fn job_stop( &self, job_id: impl Into, ) -> Result { let secret = self.need_secret()?; let params = json!([{ "secret": secret, "job_id": job_id.into() }]); self.call("job.stop", params).await } pub async fn job_delete( &self, job_id: impl Into, ) -> Result { let secret = self.need_secret()?; let params = json!([{ "secret": secret, "job_id": job_id.into() }]); self.call("job.delete", params).await } // Discovery pub async fn rpc_discover(&self) -> Result { self.call("rpc.discover", json!([])).await } } // ----------------------------- // Tests (serialization-only) // ----------------------------- #[cfg(test)] mod tests { use super::*; fn mk_client() -> SupervisorClient { SupervisorClient::new( "http://127.0.0.1:8990", Destination::Pk( "bb39b4a3a4efd70f3e05e37887677e02efbda14681d0acd3882bc0f754792c32".to_string(), ), "supervisor.rpc", Some("secret".to_string()), ) .unwrap() } #[test] fn builds_dst_ip_and_pk() { let c_ip = SupervisorClient::new( "http://127.0.0.1:8990", Destination::Ip("2001:db8::1".parse().unwrap()), "supervisor.rpc", None, ) .unwrap(); let v_ip = c_ip.build_dst(); assert_eq!(v_ip.get("ip").unwrap().as_str().unwrap(), "2001:db8::1"); let c_pk = mk_client(); let v_pk = c_pk.build_dst(); assert_eq!( v_pk.get("pk").unwrap().as_str().unwrap(), "bb39b4a3a4efd70f3e05e37887677e02efbda14681d0acd3882bc0f754792c32" ); } #[test] fn wraps_supervisor_payload_in_push_message() { let c = mk_client(); let payload = c.build_supervisor_payload("list_runners", json!([])); let b64 = SupervisorClient::encode_payload(&payload).unwrap(); let req = c.build_push_request(b64); assert_eq!(req.get("method").unwrap().as_str().unwrap(), "pushMessage"); let params = req.get("params").unwrap().as_array().unwrap(); let msg = params[0].get("message").unwrap(); assert!(msg.get("payload").is_some()); assert_eq!( msg.get("topic").unwrap().as_str().unwrap(), "supervisor.rpc" ); assert!(msg.get("dst").unwrap().get("pk").is_some()); } #[test] fn extract_message_id_works_for_both_variants() { // PushMessageResponseId let r1 = json!({"id":"0123456789abcdef"}); assert_eq!( SupervisorClient::extract_message_id_from_result(&r1).unwrap(), "0123456789abcdef" ); // InboundMessage-like let r2 = json!({ "id":"fedcba9876543210", "srcIp":"449:abcd:0123:defa::1", "payload":"hpV+" }); assert_eq!( SupervisorClient::extract_message_id_from_result(&r2).unwrap(), "fedcba9876543210" ); } }