diff --git a/src/clients/mod.rs b/src/clients/mod.rs index 3d14737..2eddc06 100644 --- a/src/clients/mod.rs +++ b/src/clients/mod.rs @@ -1,13 +1,7 @@ -pub mod supervisor_client; pub mod mycelium_client; +pub mod supervisor_client; pub mod types; +pub use mycelium_client::{MyceliumClient, MyceliumClientError}; +pub use supervisor_client::{SupervisorClient, SupervisorClientError}; pub use types::Destination; -pub use supervisor_client::{ - SupervisorClient, - SupervisorClientError, -}; -pub use mycelium_client::{ - MyceliumClient, - MyceliumClientError, -}; \ No newline at end of file diff --git a/src/clients/mycelium_client.rs b/src/clients/mycelium_client.rs index f4567cf..cdce9e1 100644 --- a/src/clients/mycelium_client.rs +++ b/src/clients/mycelium_client.rs @@ -6,13 +6,13 @@ use reqwest::Client as HttpClient; use serde_json::{Value, json}; use thiserror::Error; -use crate::models::TransportStatus; use crate::clients::Destination; +use crate::models::TransportStatus; /// Lightweight client for Mycelium JSON-RPC (send + query status) #[derive(Clone)] pub struct MyceliumClient { - base_url: String, // e.g. http://127.0.0.1:8990 + base_url: String, // e.g. http://127.0.0.1:8990 http: HttpClient, id_counter: Arc, } @@ -58,20 +58,30 @@ impl MyceliumClient { let body: Value = resp.json().await?; if let Some(err) = body.get("error") { let code = err.get("code").and_then(|v| v.as_i64()).unwrap_or(0); - let msg = err.get("message").and_then(|v| v.as_str()).unwrap_or("unknown error"); + let msg = err + .get("message") + .and_then(|v| v.as_str()) + .unwrap_or("unknown error"); if code == 408 { return Err(MyceliumClientError::TransportTimeout); } - return Err(MyceliumClientError::RpcError(format!("code={code} msg={msg}"))); + return Err(MyceliumClientError::RpcError(format!( + "code={code} msg={msg}" + ))); } if !status.is_success() { - return Err(MyceliumClientError::RpcError(format!("HTTP {status}, body {body}"))); + return Err(MyceliumClientError::RpcError(format!( + "HTTP {status}, body {body}" + ))); } Ok(body) } /// Call messageStatus with an outbound message id (hex string) - pub async fn message_status(&self, id_hex: &str) -> Result { + pub async fn message_status( + &self, + id_hex: &str, + ) -> Result { let params = json!({ "id": id_hex }); let body = self.jsonrpc("messageStatus", params).await?; let result = body.get("result").ok_or_else(|| { @@ -83,7 +93,9 @@ impl MyceliumClient { } else if let Some(s) = result.as_str() { s.to_string() } else { - return Err(MyceliumClientError::InvalidResponse(format!("unexpected result shape: {result}"))); + return Err(MyceliumClientError::InvalidResponse(format!( + "unexpected result shape: {result}" + ))); }; Self::map_status(&status_str).ok_or_else(|| { MyceliumClientError::InvalidResponse(format!("unknown status: {status_str}")) @@ -143,7 +155,10 @@ impl MyceliumClient { /// Helper to extract outbound message id from pushMessage result (InboundMessage or PushMessageResponseId) pub fn extract_message_id_from_result(result: &Value) -> Option { - result.get("id").and_then(|v| v.as_str()).map(|s| s.to_string()) + result + .get("id") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) } } @@ -162,24 +177,39 @@ mod tests { Some(10), ); let msg1 = p1.get("message").unwrap(); - assert_eq!(msg1.get("topic").unwrap().as_str().unwrap(), "supervisor.rpc"); + assert_eq!( + msg1.get("topic").unwrap().as_str().unwrap(), + "supervisor.rpc" + ); assert_eq!(msg1.get("payload").unwrap().as_str().unwrap(), "Zm9vYmFy"); assert_eq!( - msg1.get("dst").unwrap().get("ip").unwrap().as_str().unwrap(), + msg1.get("dst") + .unwrap() + .get("ip") + .unwrap() + .as_str() + .unwrap(), "2001:db8::1" ); assert_eq!(p1.get("reply_timeout").unwrap().as_u64().unwrap(), 10); // PK destination without timeout let p2 = MyceliumClient::build_push_params( - &Destination::Pk("bb39b4a3a4efd70f3e05e37887677e02efbda14681d0acd3882bc0f754792c32".into()), + &Destination::Pk( + "bb39b4a3a4efd70f3e05e37887677e02efbda14681d0acd3882bc0f754792c32".into(), + ), "supervisor.rpc", "YmF6", // "baz" None, ); let msg2 = p2.get("message").unwrap(); assert_eq!( - msg2.get("dst").unwrap().get("pk").unwrap().as_str().unwrap(), + msg2.get("dst") + .unwrap() + .get("pk") + .unwrap() + .as_str() + .unwrap(), "bb39b4a3a4efd70f3e05e37887677e02efbda14681d0acd3882bc0f754792c32" ); assert!(p2.get("reply_timeout").is_none()); @@ -205,4 +235,4 @@ mod tests { "fedcba9876543210" ); } -} \ No newline at end of file +} diff --git a/src/clients/supervisor_client.rs b/src/clients/supervisor_client.rs index f54f253..6da18b2 100644 --- a/src/clients/supervisor_client.rs +++ b/src/clients/supervisor_client.rs @@ -151,7 +151,12 @@ impl SupervisorClient { let result = self .mycelium - .push_message(&self.destination, &self.topic, &payload_b64, Some(reply_timeout_secs)) + .push_message( + &self.destination, + &self.topic, + &payload_b64, + Some(reply_timeout_secs), + ) .await?; // Expect an InboundMessage-like with a base64 payload containing the supervisor JSON-RPC response @@ -163,7 +168,11 @@ impl SupervisorClient { one.get("payload") .and_then(|v| v.as_str()) .map(|s| s.to_string()) - .ok_or_else(|| SupervisorClientError::InvalidResponse(format!("missing payload in result: {result}")))? + .ok_or_else(|| { + SupervisorClientError::InvalidResponse(format!( + "missing payload in result: {result}" + )) + })? } else { return Err(SupervisorClientError::TransportTimeout); } @@ -174,15 +183,19 @@ impl SupervisorClient { let raw = BASE64_STANDARD .decode(payload_field.as_bytes()) - .map_err(|e| SupervisorClientError::InvalidResponse(format!("invalid base64 payload: {e}")))?; + .map_err(|e| { + SupervisorClientError::InvalidResponse(format!("invalid base64 payload: {e}")) + })?; let rpc_resp: Value = serde_json::from_slice(&raw)?; if let Some(err) = rpc_resp.get("error") { return Err(SupervisorClientError::RpcError(err.to_string())); } - let res = rpc_resp - .get("result") - .ok_or_else(|| SupervisorClientError::InvalidResponse(format!("missing result in supervisor reply: {rpc_resp}")))?; + let res = rpc_resp.get("result").ok_or_else(|| { + SupervisorClientError::InvalidResponse(format!( + "missing result in supervisor reply: {rpc_resp}" + )) + })?; Ok(res.clone()) } diff --git a/src/clients/types.rs b/src/clients/types.rs index b21533b..c83180b 100644 --- a/src/clients/types.rs +++ b/src/clients/types.rs @@ -6,4 +6,4 @@ pub enum Destination { Ip(IpAddr), /// 64-hex public key of the receiver node Pk(String), -} \ No newline at end of file +} diff --git a/src/lib.rs b/src/lib.rs index 79c6230..3f689fb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,8 @@ -pub mod models; -pub mod storage; -pub mod service; -mod time; -pub mod dag; -pub mod rpc; pub mod clients; +pub mod dag; +pub mod models; pub mod router; +pub mod rpc; +pub mod service; +pub mod storage; +mod time; diff --git a/src/main.rs b/src/main.rs index 90bb073..e65055b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,8 +2,8 @@ use clap::Parser; use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; -use tracing::{info, warn, error}; -use tracing_subscriber::{fmt, EnvFilter}; +use tracing::{error, info, warn}; +use tracing_subscriber::{EnvFilter, fmt}; #[derive(Debug, Clone, Parser)] #[command( name = "herocoordinator", @@ -75,14 +75,14 @@ struct Cli { #[tokio::main] async fn main() { let cli = Cli::parse(); -// Initialize tracing subscriber (pretty formatter; controlled by RUST_LOG) -let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); -tracing_subscriber::fmt() - .with_env_filter(filter) - .pretty() - .with_target(true) - .with_level(true) - .init(); + // Initialize tracing subscriber (pretty formatter; controlled by RUST_LOG) + let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + tracing_subscriber::fmt() + .with_env_filter(filter) + .pretty() + .with_target(true) + .with_level(true) + .init(); let http_addr = SocketAddr::new(cli.api_http_ip, cli.api_http_port); let ws_addr = SocketAddr::new(cli.api_ws_ip, cli.api_ws_port); diff --git a/src/router.rs b/src/router.rs index aa7a6f2..d85a9ad 100644 --- a/src/router.rs +++ b/src/router.rs @@ -3,12 +3,12 @@ use std::{collections::HashSet, sync::Arc}; use serde_json::{Value, json}; use tokio::sync::Semaphore; -use tracing::{info, warn, error}; use crate::{ - clients::{Destination, SupervisorClient, MyceliumClient}, + clients::{Destination, MyceliumClient, SupervisorClient}, models::{Job, JobStatus, Message, MessageStatus, ScriptType, TransportStatus}, service::AppService, }; +use tracing::{error, info, warn}; #[derive(Clone, Debug)] pub struct RouterConfig { @@ -71,7 +71,8 @@ pub fn start_router(service: AppService, cfg: RouterConfig) -> Vec { // Persist the result into the Job.result map (merge) let _ = service_poll @@ -268,7 +275,13 @@ async fn deliver_one( ) .await; // Log which key was stored (success or error) - let key = result_map.keys().next().cloned().unwrap_or_else(|| "unknown".to_string()); + let key = result_map + .keys() + .next() + .cloned() + .unwrap_or_else(|| { + "unknown".to_string() + }); let _ = service_poll .append_message_logs( context_id, @@ -337,10 +350,7 @@ async fn deliver_one( context_id, caller_id, id, - vec![format!( - "job.status sync error: {}", - e - )], + vec![format!("job.status sync error: {}", e)], ) .await; } @@ -434,7 +444,6 @@ fn map_supervisor_job_status(s: &str) -> Option<(JobStatus, bool)> { } } - /// Auto-discover contexts periodically and ensure a router loop exists for each. /// Returns a JoinHandle of the discovery task (router loops are detached). pub fn start_router_auto(service: AppService, cfg: RouterConfig) -> tokio::task::JoinHandle<()> { @@ -452,7 +461,7 @@ pub fn start_router_auto(service: AppService, cfg: RouterConfig) -> tokio::task: }; let _ = start_router(service.clone(), cfg_ctx); active.insert(ctx_id); - info!(context_id=ctx_id, "Started loop for context"); + info!(context_id = ctx_id, "Started loop for context"); } } } diff --git a/src/rpc.rs b/src/rpc.rs index a9703c2..002655b 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -410,11 +410,7 @@ pub fn build_module(state: Arc) -> RpcModule<()> { let state = state.clone(); async move { let p: ActorLoadParams = params.parse().map_err(invalid_params_err)?; - let actor = state - .service - .load_actor(p.id) - .await - .map_err(storage_err)?; + let actor = state.service.load_actor(p.id).await.map_err(storage_err)?; Ok::<_, ErrorObjectOwned>(actor) } }) diff --git a/src/service.rs b/src/service.rs index 5b99eed..335b7fc 100644 --- a/src/service.rs +++ b/src/service.rs @@ -694,7 +694,7 @@ impl AppService { Ok(()) } -/// Bypass-permission variant to update a job status with transition validation. + /// Bypass-permission variant to update a job status with transition validation. /// This skips the executor permission check but enforces the same state transition rules. pub async fn update_job_status_unchecked( &self, diff --git a/src/storage.rs b/src/storage.rs index e1a6129..0f726e5 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,4 +1,3 @@ - pub mod redis; pub use redis::RedisDriver; diff --git a/src/storage/redis.rs b/src/storage/redis.rs index 4e2a5f0..d4ff007 100644 --- a/src/storage/redis.rs +++ b/src/storage/redis.rs @@ -6,10 +6,11 @@ use serde::de::DeserializeOwned; use serde_json::{Map as JsonMap, Value}; use tokio::sync::Mutex; -use tracing::{error, warn, info, debug, trace}; use crate::models::{ - Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageStatus, Runner, TransportStatus, + Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageStatus, Runner, + TransportStatus, }; +use tracing::{debug, error, info, trace, warn}; type Result = std::result::Result>; @@ -219,7 +220,7 @@ impl RedisDriver { let key = Self::actor_key(id); self.hget_model(db, &key).await } -/// Save an Actor globally in DB 0 (Actor is context-independent) + /// Save an Actor globally in DB 0 (Actor is context-independent) pub async fn save_actor_global(&self, actor: &Actor) -> Result<()> { let json = serde_json::to_value(actor)?; let id = json @@ -717,10 +718,15 @@ impl RedisDriver { /// Register a context id in the global set "contexts" stored in DB 0. pub async fn register_context_id(&self, id: u32) -> Result<()> { let mut cm = self.manager_for_db(0).await?; - let _: i64 = redis::cmd("SADD").arg("contexts").arg(id).query_async(&mut cm).await.map_err(|e| { - error!(db=0, context_id=%id, error=%e, "SADD contexts failed"); - e - })?; + let _: i64 = redis::cmd("SADD") + .arg("contexts") + .arg(id) + .query_async(&mut cm) + .await + .map_err(|e| { + error!(db=0, context_id=%id, error=%e, "SADD contexts failed"); + e + })?; Ok(()) } @@ -728,10 +734,14 @@ impl RedisDriver { pub async fn list_context_ids(&self) -> Result> { let mut cm = self.manager_for_db(0).await?; // Using SMEMBERS and parsing into u32 - let vals: Vec = redis::cmd("SMEMBERS").arg("contexts").query_async(&mut cm).await.map_err(|e| { - error!(db=0, error=%e, "SMEMBERS contexts failed"); - e - })?; + let vals: Vec = redis::cmd("SMEMBERS") + .arg("contexts") + .query_async(&mut cm) + .await + .map_err(|e| { + error!(db=0, error=%e, "SMEMBERS contexts failed"); + e + })?; let mut out = Vec::with_capacity(vals.len()); for v in vals { if let Ok(n) = v.parse::() {