diff --git a/specs/openrpc.json b/specs/openrpc.json index db6c757..ea12271 100644 --- a/specs/openrpc.json +++ b/specs/openrpc.json @@ -438,6 +438,16 @@ "Processed" ] }, + "TransportStatus": { + "type": "string", + "enum": [ + "Queued", + "Sent", + "Delivered", + "Read", + "Failed" + ] + }, "MessageType": { "type": "string", "enum": [ @@ -779,6 +789,12 @@ }, "status": { "$ref": "#/components/schemas/MessageStatus" + }, + "transport_id": { + "type": "string" + }, + "transport_status": { + "$ref": "#/components/schemas/TransportStatus" } } }, diff --git a/src/clients/mod.rs b/src/clients/mod.rs index 0b8d24a..b9707f9 100644 --- a/src/clients/mod.rs +++ b/src/clients/mod.rs @@ -1,7 +1,12 @@ pub mod supervisor_client; +pub mod mycelium_client; pub use supervisor_client::{ Destination, SupervisorClient, SupervisorClientError, +}; +pub use mycelium_client::{ + MyceliumClient, + MyceliumClientError, }; \ No newline at end of file diff --git a/src/clients/mycelium_client.rs b/src/clients/mycelium_client.rs new file mode 100644 index 0000000..58557dc --- /dev/null +++ b/src/clients/mycelium_client.rs @@ -0,0 +1,97 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + +use reqwest::Client as HttpClient; +use serde::Deserialize; +use serde_json::{Value, json}; +use thiserror::Error; + +use crate::models::TransportStatus; + +/// Lightweight client for querying Mycelium transport status +#[derive(Clone)] +pub struct MyceliumClient { + base_url: String, // e.g. http://127.0.0.1:9651 + http: HttpClient, + id_counter: Arc, +} + +#[derive(Debug, Error)] +pub enum MyceliumClientError { + #[error("HTTP error: {0}")] + Http(#[from] reqwest::Error), + #[error("JSON error: {0}")] + Json(#[from] serde_json::Error), + #[error("JSON-RPC error: {0}")] + RpcError(String), + #[error("Invalid response: {0}")] + InvalidResponse(String), +} + +impl MyceliumClient { + pub fn new(base_url: impl Into) -> Result { + let url = base_url.into(); + let http = HttpClient::builder().build()?; + Ok(Self { + base_url: url, + http, + id_counter: Arc::new(AtomicU64::new(1)), + }) + } + + fn next_id(&self) -> u64 { + self.id_counter.fetch_add(1, Ordering::Relaxed) + } + + async fn jsonrpc(&self, method: &str, params: Value) -> Result { + let req = json!({ + "jsonrpc": "2.0", + "id": self.next_id(), + "method": method, + "params": [ params ] + }); + let resp = self.http.post(&self.base_url).json(&req).send().await?; + let status = resp.status(); + let body: Value = resp.json().await?; + if let Some(err) = body.get("error") { + let code = err.get("code").and_then(|v| v.as_i64()).unwrap_or(0); + let msg = err.get("message").and_then(|v| v.as_str()).unwrap_or("unknown error"); + return Err(MyceliumClientError::RpcError(format!("code={code} msg={msg}"))); + } + if !status.is_success() { + return Err(MyceliumClientError::RpcError(format!("HTTP {status}, body {body}"))); + } + Ok(body) + } + + /// Call messageStatus with an outbound message id (hex string) + pub async fn message_status(&self, id_hex: &str) -> Result { + let params = json!({ "id": id_hex }); + let body = self.jsonrpc("messageStatus", params).await?; + let result = body.get("result").ok_or_else(|| { + MyceliumClientError::InvalidResponse(format!("missing result in response: {body}")) + })?; + // Accept both { status: "..."} and bare "..." + let status_str = if let Some(s) = result.get("status").and_then(|v| v.as_str()) { + s.to_string() + } else if let Some(s) = result.as_str() { + s.to_string() + } else { + return Err(MyceliumClientError::InvalidResponse(format!("unexpected result shape: {result}"))); + }; + Self::map_status(&status_str).ok_or_else(|| { + MyceliumClientError::InvalidResponse(format!("unknown status: {status_str}")) + }) + } + + fn map_status(s: &str) -> Option { + match s { + "queued" => Some(TransportStatus::Queued), + "sent" => Some(TransportStatus::Sent), + "delivered" => Some(TransportStatus::Delivered), + "read" => Some(TransportStatus::Read), + "failed" => Some(TransportStatus::Failed), + _ => None, + } + } +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 2a31f9b..fb4f3f6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -97,6 +97,8 @@ async fn main() { concurrency: 32, base_url, topic: "supervisor.rpc".to_string(), + transport_poll_interval_secs: 2, + transport_poll_timeout_secs: 300, }; let _auto_handle = herocoordinator::router::start_router_auto(service_for_router, cfg); } diff --git a/src/models.rs b/src/models.rs index cc30447..467df98 100644 --- a/src/models.rs +++ b/src/models.rs @@ -10,6 +10,6 @@ pub use actor::Actor; pub use context::Context; pub use flow::{Flow, FlowStatus}; pub use job::{Job, JobStatus}; -pub use message::{Message, MessageFormatType, MessageStatus, MessageType}; +pub use message::{Message, MessageFormatType, MessageStatus, MessageType, TransportStatus}; pub use runner::Runner; pub use script_type::ScriptType; diff --git a/src/models/message.rs b/src/models/message.rs index 60cefca..e96ee2f 100644 --- a/src/models/message.rs +++ b/src/models/message.rs @@ -22,6 +22,12 @@ pub struct Message { pub timeout_ack: u32, /// Seconds for the receiver to send us a reply pub timeout_result: u32, + + /// Outbound transport id returned by Mycelium on push + pub transport_id: Option, + /// Latest transport status as reported by Mycelium + pub transport_status: Option, + pub job: Vec, pub logs: Vec, pub created_at: Timestamp, @@ -44,6 +50,15 @@ pub enum MessageStatus { Processed, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum TransportStatus { + Queued, + Sent, + Delivered, + Read, + Failed, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub enum MessageFormatType { Html, diff --git a/src/router.rs b/src/router.rs index d21e75d..908b0ff 100644 --- a/src/router.rs +++ b/src/router.rs @@ -4,8 +4,8 @@ use serde_json::{Value, json}; use tokio::sync::Semaphore; use crate::{ - clients::{Destination, SupervisorClient}, - models::{Job, Message, MessageStatus, ScriptType}, + clients::{Destination, SupervisorClient, MyceliumClient}, + models::{Job, Message, MessageStatus, ScriptType, TransportStatus}, service::AppService, }; @@ -15,7 +15,9 @@ pub struct RouterConfig { pub concurrency: usize, pub base_url: String, // e.g. http://127.0.0.1:8990 pub topic: String, // e.g. "supervisor.rpc" - // secret currently unused (None), add here later if needed + // Transport status polling configuration + pub transport_poll_interval_secs: u64, // e.g. 2 + pub transport_poll_timeout_secs: u64, // e.g. 300 (5 minutes) } /// Start background router loops, one per context. @@ -130,13 +132,116 @@ async fn deliver_one( let params = build_params(&msg)?; // Send - let _out_id = client.call(&method, params).await?; + let out_id = client.call(&method, params).await?; + + // Store transport id and initial Sent status + let _ = service + .update_message_transport( + context_id, + caller_id, + id, + Some(out_id.clone()), + Some(TransportStatus::Sent), + ) + .await; // Mark as acknowledged on success service .update_message_status(context_id, caller_id, id, MessageStatus::Acknowledged) .await?; + // Spawn transport-status poller + { + let service_poll = service.clone(); + let base_url = cfg.base_url.clone(); + let poll_interval = std::time::Duration::from_secs(cfg.transport_poll_interval_secs); + let poll_timeout = std::time::Duration::from_secs(cfg.transport_poll_timeout_secs); + let out_id_cloned = out_id.clone(); + + tokio::spawn(async move { + let start = std::time::Instant::now(); + let client = match MyceliumClient::new(base_url) { + Ok(c) => c, + Err(e) => { + let _ = service_poll + .append_message_logs( + context_id, + caller_id, + id, + vec![format!("MyceliumClient init error: {e}")], + ) + .await; + return; + } + }; + + let mut last_status: Option = Some(TransportStatus::Sent); + + loop { + if start.elapsed() >= poll_timeout { + let _ = service_poll + .append_message_logs( + context_id, + caller_id, + id, + vec!["Transport-status polling timed out".to_string()], + ) + .await; + // leave last known status; do not override + break; + } + + match client.message_status(&out_id_cloned).await { + Ok(s) => { + if last_status.as_ref() != Some(&s) { + let _ = service_poll + .update_message_transport( + context_id, + caller_id, + id, + None, + Some(s.clone()), + ) + .await; + last_status = Some(s.clone()); + } + + // Stop on terminal states + if matches!(s, TransportStatus::Delivered | TransportStatus::Read) { + break; + } + if matches!(s, TransportStatus::Failed) { + let _ = service_poll + .append_message_logs( + context_id, + caller_id, + id, + vec![format!( + "Transport failed for outbound id {out_id_cloned}" + )], + ) + .await; + break; + } + } + Err(e) => { + // Log and continue polling + let _ = service_poll + .append_message_logs( + context_id, + caller_id, + id, + vec![format!("messageStatus query error: {e}")], + ) + .await; + } + } + + tokio::time::sleep(poll_interval).await; + } + }); + } + Ok(()) } diff --git a/src/rpc.rs b/src/rpc.rs index 314d599..8ab1755 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -299,6 +299,8 @@ impl MessageCreate { timeout, timeout_ack, timeout_result, + transport_id: None, + transport_status: None, job: job.into_iter().map(JobCreate::into_domain).collect(), logs: Vec::new(), created_at: ts, diff --git a/src/service.rs b/src/service.rs index 1559969..d1c9422 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,7 +1,7 @@ use crate::dag::{DagError, DagResult, FlowDag, build_flow_dag}; use crate::models::{ Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageFormatType, MessageStatus, - Runner, + Runner, TransportStatus, }; use crate::storage::RedisDriver; @@ -508,6 +508,8 @@ impl AppService { timeout: job.timeout, timeout_ack: 10, timeout_result: job.timeout, + transport_id: None, + transport_status: None, job: vec![job.clone()], logs: Vec::new(), created_at: ts, @@ -589,6 +591,8 @@ impl AppService { timeout: job.timeout, timeout_ack: 10, timeout_result: job.timeout, + transport_id: None, + transport_status: None, job: vec![job.clone()], logs: Vec::new(), created_at: ts, @@ -817,6 +821,21 @@ impl AppService { .await } + pub async fn update_message_transport( + &self, + context_id: u32, + caller_id: u32, + id: u32, + transport_id: Option, + transport_status: Option, + ) -> Result<(), BoxError> { + // Ensure message exists (provides clearer error) + let _ = self.redis.load_message(context_id, caller_id, id).await?; + self.redis + .update_message_transport(context_id, caller_id, id, transport_id, transport_status) + .await + } + pub async fn update_flow_env_vars_merge( &self, context_id: u32, diff --git a/src/storage/redis.rs b/src/storage/redis.rs index b251d01..845f7a6 100644 --- a/src/storage/redis.rs +++ b/src/storage/redis.rs @@ -7,7 +7,7 @@ use serde_json::{Map as JsonMap, Value}; use tokio::sync::Mutex; use crate::models::{ - Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageStatus, Runner, + Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageStatus, Runner, TransportStatus, }; type Result = std::result::Result>; @@ -358,6 +358,40 @@ impl RedisDriver { Ok(()) } + /// Message: update transport_id / transport_status (optionally) and bump updated_at + pub async fn update_message_transport( + &self, + db: u32, + caller_id: u32, + id: u32, + transport_id: Option, + transport_status: Option, + ) -> Result<()> { + let mut cm = self.manager_for_db(db).await?; + let key = Self::message_key(caller_id, id); + + let mut pairs: Vec<(String, String)> = Vec::new(); + + if let Some(tid) = transport_id { + pairs.push(("transport_id".to_string(), tid)); + } + + if let Some(ts_status) = transport_status { + let status_str = match serde_json::to_value(&ts_status)? { + Value::String(s) => s, + v => v.to_string(), + }; + pairs.push(("transport_status".to_string(), status_str)); + } + + // Always bump updated_at + let ts = crate::time::current_timestamp(); + pairs.push(("updated_at".to_string(), ts.to_string())); + + let _: usize = cm.hset_multiple(key, &pairs).await?; + Ok(()) + } + /// Flow: merge env_vars map and bump updated_at pub async fn update_flow_env_vars_merge( &self,