Periodically verify the status of messages sent over mycelium
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
		@@ -1,7 +1,12 @@
 | 
			
		||||
pub mod supervisor_client;
 | 
			
		||||
pub mod mycelium_client;
 | 
			
		||||
 | 
			
		||||
pub use supervisor_client::{
 | 
			
		||||
    Destination,
 | 
			
		||||
    SupervisorClient,
 | 
			
		||||
    SupervisorClientError,
 | 
			
		||||
};
 | 
			
		||||
pub use mycelium_client::{
 | 
			
		||||
    MyceliumClient,
 | 
			
		||||
    MyceliumClientError,
 | 
			
		||||
};
 | 
			
		||||
							
								
								
									
										97
									
								
								src/clients/mycelium_client.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										97
									
								
								src/clients/mycelium_client.rs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,97 @@
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use std::sync::atomic::{AtomicU64, Ordering};
 | 
			
		||||
 | 
			
		||||
use reqwest::Client as HttpClient;
 | 
			
		||||
use serde::Deserialize;
 | 
			
		||||
use serde_json::{Value, json};
 | 
			
		||||
use thiserror::Error;
 | 
			
		||||
 | 
			
		||||
use crate::models::TransportStatus;
 | 
			
		||||
 | 
			
		||||
/// Lightweight client for querying Mycelium transport status
 | 
			
		||||
#[derive(Clone)]
 | 
			
		||||
pub struct MyceliumClient {
 | 
			
		||||
    base_url: String,       // e.g. http://127.0.0.1:9651
 | 
			
		||||
    http: HttpClient,
 | 
			
		||||
    id_counter: Arc<AtomicU64>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug, Error)]
 | 
			
		||||
pub enum MyceliumClientError {
 | 
			
		||||
    #[error("HTTP error: {0}")]
 | 
			
		||||
    Http(#[from] reqwest::Error),
 | 
			
		||||
    #[error("JSON error: {0}")]
 | 
			
		||||
    Json(#[from] serde_json::Error),
 | 
			
		||||
    #[error("JSON-RPC error: {0}")]
 | 
			
		||||
    RpcError(String),
 | 
			
		||||
    #[error("Invalid response: {0}")]
 | 
			
		||||
    InvalidResponse(String),
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl MyceliumClient {
 | 
			
		||||
    pub fn new(base_url: impl Into<String>) -> Result<Self, MyceliumClientError> {
 | 
			
		||||
        let url = base_url.into();
 | 
			
		||||
        let http = HttpClient::builder().build()?;
 | 
			
		||||
        Ok(Self {
 | 
			
		||||
            base_url: url,
 | 
			
		||||
            http,
 | 
			
		||||
            id_counter: Arc::new(AtomicU64::new(1)),
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn next_id(&self) -> u64 {
 | 
			
		||||
        self.id_counter.fetch_add(1, Ordering::Relaxed)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn jsonrpc(&self, method: &str, params: Value) -> Result<Value, MyceliumClientError> {
 | 
			
		||||
        let req = json!({
 | 
			
		||||
            "jsonrpc": "2.0",
 | 
			
		||||
            "id": self.next_id(),
 | 
			
		||||
            "method": method,
 | 
			
		||||
            "params": [ params ]
 | 
			
		||||
        });
 | 
			
		||||
        let resp = self.http.post(&self.base_url).json(&req).send().await?;
 | 
			
		||||
        let status = resp.status();
 | 
			
		||||
        let body: Value = resp.json().await?;
 | 
			
		||||
        if let Some(err) = body.get("error") {
 | 
			
		||||
            let code = err.get("code").and_then(|v| v.as_i64()).unwrap_or(0);
 | 
			
		||||
            let msg = err.get("message").and_then(|v| v.as_str()).unwrap_or("unknown error");
 | 
			
		||||
            return Err(MyceliumClientError::RpcError(format!("code={code} msg={msg}")));
 | 
			
		||||
        }
 | 
			
		||||
        if !status.is_success() {
 | 
			
		||||
            return Err(MyceliumClientError::RpcError(format!("HTTP {status}, body {body}")));
 | 
			
		||||
        }
 | 
			
		||||
        Ok(body)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Call messageStatus with an outbound message id (hex string)
 | 
			
		||||
    pub async fn message_status(&self, id_hex: &str) -> Result<TransportStatus, MyceliumClientError> {
 | 
			
		||||
        let params = json!({ "id": id_hex });
 | 
			
		||||
        let body = self.jsonrpc("messageStatus", params).await?;
 | 
			
		||||
        let result = body.get("result").ok_or_else(|| {
 | 
			
		||||
            MyceliumClientError::InvalidResponse(format!("missing result in response: {body}"))
 | 
			
		||||
        })?;
 | 
			
		||||
        // Accept both { status: "..."} and bare "..."
 | 
			
		||||
        let status_str = if let Some(s) = result.get("status").and_then(|v| v.as_str()) {
 | 
			
		||||
            s.to_string()
 | 
			
		||||
        } else if let Some(s) = result.as_str() {
 | 
			
		||||
            s.to_string()
 | 
			
		||||
        } else {
 | 
			
		||||
            return Err(MyceliumClientError::InvalidResponse(format!("unexpected result shape: {result}")));
 | 
			
		||||
        };
 | 
			
		||||
        Self::map_status(&status_str).ok_or_else(|| {
 | 
			
		||||
            MyceliumClientError::InvalidResponse(format!("unknown status: {status_str}"))
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn map_status(s: &str) -> Option<TransportStatus> {
 | 
			
		||||
        match s {
 | 
			
		||||
            "queued" => Some(TransportStatus::Queued),
 | 
			
		||||
            "sent" => Some(TransportStatus::Sent),
 | 
			
		||||
            "delivered" => Some(TransportStatus::Delivered),
 | 
			
		||||
            "read" => Some(TransportStatus::Read),
 | 
			
		||||
            "failed" => Some(TransportStatus::Failed),
 | 
			
		||||
            _ => None,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@@ -97,6 +97,8 @@ async fn main() {
 | 
			
		||||
            concurrency: 32,
 | 
			
		||||
            base_url,
 | 
			
		||||
            topic: "supervisor.rpc".to_string(),
 | 
			
		||||
            transport_poll_interval_secs: 2,
 | 
			
		||||
            transport_poll_timeout_secs: 300,
 | 
			
		||||
        };
 | 
			
		||||
        let _auto_handle = herocoordinator::router::start_router_auto(service_for_router, cfg);
 | 
			
		||||
    }
 | 
			
		||||
 
 | 
			
		||||
@@ -10,6 +10,6 @@ pub use actor::Actor;
 | 
			
		||||
pub use context::Context;
 | 
			
		||||
pub use flow::{Flow, FlowStatus};
 | 
			
		||||
pub use job::{Job, JobStatus};
 | 
			
		||||
pub use message::{Message, MessageFormatType, MessageStatus, MessageType};
 | 
			
		||||
pub use message::{Message, MessageFormatType, MessageStatus, MessageType, TransportStatus};
 | 
			
		||||
pub use runner::Runner;
 | 
			
		||||
pub use script_type::ScriptType;
 | 
			
		||||
 
 | 
			
		||||
@@ -22,6 +22,12 @@ pub struct Message {
 | 
			
		||||
    pub timeout_ack: u32,
 | 
			
		||||
    /// Seconds for the receiver to send us a reply
 | 
			
		||||
    pub timeout_result: u32,
 | 
			
		||||
 | 
			
		||||
    /// Outbound transport id returned by Mycelium on push
 | 
			
		||||
    pub transport_id: Option<String>,
 | 
			
		||||
    /// Latest transport status as reported by Mycelium
 | 
			
		||||
    pub transport_status: Option<TransportStatus>,
 | 
			
		||||
 | 
			
		||||
    pub job: Vec<Job>,
 | 
			
		||||
    pub logs: Vec<Log>,
 | 
			
		||||
    pub created_at: Timestamp,
 | 
			
		||||
@@ -44,6 +50,15 @@ pub enum MessageStatus {
 | 
			
		||||
    Processed,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
 | 
			
		||||
pub enum TransportStatus {
 | 
			
		||||
    Queued,
 | 
			
		||||
    Sent,
 | 
			
		||||
    Delivered,
 | 
			
		||||
    Read,
 | 
			
		||||
    Failed,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug, Clone, Serialize, Deserialize)]
 | 
			
		||||
pub enum MessageFormatType {
 | 
			
		||||
    Html,
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										113
									
								
								src/router.rs
									
									
									
									
									
								
							
							
						
						
									
										113
									
								
								src/router.rs
									
									
									
									
									
								
							@@ -4,8 +4,8 @@ use serde_json::{Value, json};
 | 
			
		||||
use tokio::sync::Semaphore;
 | 
			
		||||
 | 
			
		||||
use crate::{
 | 
			
		||||
    clients::{Destination, SupervisorClient},
 | 
			
		||||
    models::{Job, Message, MessageStatus, ScriptType},
 | 
			
		||||
    clients::{Destination, SupervisorClient, MyceliumClient},
 | 
			
		||||
    models::{Job, Message, MessageStatus, ScriptType, TransportStatus},
 | 
			
		||||
    service::AppService,
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
@@ -15,7 +15,9 @@ pub struct RouterConfig {
 | 
			
		||||
    pub concurrency: usize,
 | 
			
		||||
    pub base_url: String, // e.g. http://127.0.0.1:8990
 | 
			
		||||
    pub topic: String,    // e.g. "supervisor.rpc"
 | 
			
		||||
                          // secret currently unused (None), add here later if needed
 | 
			
		||||
    // Transport status polling configuration
 | 
			
		||||
    pub transport_poll_interval_secs: u64, // e.g. 2
 | 
			
		||||
    pub transport_poll_timeout_secs: u64,  // e.g. 300 (5 minutes)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Start background router loops, one per context.
 | 
			
		||||
@@ -130,13 +132,116 @@ async fn deliver_one(
 | 
			
		||||
    let params = build_params(&msg)?;
 | 
			
		||||
 | 
			
		||||
    // Send
 | 
			
		||||
    let _out_id = client.call(&method, params).await?;
 | 
			
		||||
    let out_id = client.call(&method, params).await?;
 | 
			
		||||
 | 
			
		||||
    // Store transport id and initial Sent status
 | 
			
		||||
    let _ = service
 | 
			
		||||
        .update_message_transport(
 | 
			
		||||
            context_id,
 | 
			
		||||
            caller_id,
 | 
			
		||||
            id,
 | 
			
		||||
            Some(out_id.clone()),
 | 
			
		||||
            Some(TransportStatus::Sent),
 | 
			
		||||
        )
 | 
			
		||||
        .await;
 | 
			
		||||
 | 
			
		||||
    // Mark as acknowledged on success
 | 
			
		||||
    service
 | 
			
		||||
        .update_message_status(context_id, caller_id, id, MessageStatus::Acknowledged)
 | 
			
		||||
        .await?;
 | 
			
		||||
 | 
			
		||||
    // Spawn transport-status poller
 | 
			
		||||
    {
 | 
			
		||||
        let service_poll = service.clone();
 | 
			
		||||
        let base_url = cfg.base_url.clone();
 | 
			
		||||
        let poll_interval = std::time::Duration::from_secs(cfg.transport_poll_interval_secs);
 | 
			
		||||
        let poll_timeout = std::time::Duration::from_secs(cfg.transport_poll_timeout_secs);
 | 
			
		||||
        let out_id_cloned = out_id.clone();
 | 
			
		||||
 | 
			
		||||
        tokio::spawn(async move {
 | 
			
		||||
            let start = std::time::Instant::now();
 | 
			
		||||
            let client = match MyceliumClient::new(base_url) {
 | 
			
		||||
                Ok(c) => c,
 | 
			
		||||
                Err(e) => {
 | 
			
		||||
                    let _ = service_poll
 | 
			
		||||
                        .append_message_logs(
 | 
			
		||||
                            context_id,
 | 
			
		||||
                            caller_id,
 | 
			
		||||
                            id,
 | 
			
		||||
                            vec![format!("MyceliumClient init error: {e}")],
 | 
			
		||||
                        )
 | 
			
		||||
                        .await;
 | 
			
		||||
                    return;
 | 
			
		||||
                }
 | 
			
		||||
            };
 | 
			
		||||
 | 
			
		||||
            let mut last_status: Option<TransportStatus> = Some(TransportStatus::Sent);
 | 
			
		||||
 | 
			
		||||
            loop {
 | 
			
		||||
                if start.elapsed() >= poll_timeout {
 | 
			
		||||
                    let _ = service_poll
 | 
			
		||||
                        .append_message_logs(
 | 
			
		||||
                            context_id,
 | 
			
		||||
                            caller_id,
 | 
			
		||||
                            id,
 | 
			
		||||
                            vec!["Transport-status polling timed out".to_string()],
 | 
			
		||||
                        )
 | 
			
		||||
                        .await;
 | 
			
		||||
                    // leave last known status; do not override
 | 
			
		||||
                    break;
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                match client.message_status(&out_id_cloned).await {
 | 
			
		||||
                    Ok(s) => {
 | 
			
		||||
                        if last_status.as_ref() != Some(&s) {
 | 
			
		||||
                            let _ = service_poll
 | 
			
		||||
                                .update_message_transport(
 | 
			
		||||
                                    context_id,
 | 
			
		||||
                                    caller_id,
 | 
			
		||||
                                    id,
 | 
			
		||||
                                    None,
 | 
			
		||||
                                    Some(s.clone()),
 | 
			
		||||
                                )
 | 
			
		||||
                                .await;
 | 
			
		||||
                            last_status = Some(s.clone());
 | 
			
		||||
                        }
 | 
			
		||||
 | 
			
		||||
                        // Stop on terminal states
 | 
			
		||||
                        if matches!(s, TransportStatus::Delivered | TransportStatus::Read) {
 | 
			
		||||
                            break;
 | 
			
		||||
                        }
 | 
			
		||||
                        if matches!(s, TransportStatus::Failed) {
 | 
			
		||||
                            let _ = service_poll
 | 
			
		||||
                                .append_message_logs(
 | 
			
		||||
                                    context_id,
 | 
			
		||||
                                    caller_id,
 | 
			
		||||
                                    id,
 | 
			
		||||
                                    vec![format!(
 | 
			
		||||
                                        "Transport failed for outbound id {out_id_cloned}"
 | 
			
		||||
                                    )],
 | 
			
		||||
                                )
 | 
			
		||||
                                .await;
 | 
			
		||||
                            break;
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                    Err(e) => {
 | 
			
		||||
                        // Log and continue polling
 | 
			
		||||
                        let _ = service_poll
 | 
			
		||||
                            .append_message_logs(
 | 
			
		||||
                                context_id,
 | 
			
		||||
                                caller_id,
 | 
			
		||||
                                id,
 | 
			
		||||
                                vec![format!("messageStatus query error: {e}")],
 | 
			
		||||
                            )
 | 
			
		||||
                            .await;
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                tokio::time::sleep(poll_interval).await;
 | 
			
		||||
            }
 | 
			
		||||
        });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -299,6 +299,8 @@ impl MessageCreate {
 | 
			
		||||
            timeout,
 | 
			
		||||
            timeout_ack,
 | 
			
		||||
            timeout_result,
 | 
			
		||||
            transport_id: None,
 | 
			
		||||
            transport_status: None,
 | 
			
		||||
            job: job.into_iter().map(JobCreate::into_domain).collect(),
 | 
			
		||||
            logs: Vec::new(),
 | 
			
		||||
            created_at: ts,
 | 
			
		||||
 
 | 
			
		||||
@@ -1,7 +1,7 @@
 | 
			
		||||
use crate::dag::{DagError, DagResult, FlowDag, build_flow_dag};
 | 
			
		||||
use crate::models::{
 | 
			
		||||
    Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageFormatType, MessageStatus,
 | 
			
		||||
    Runner,
 | 
			
		||||
    Runner, TransportStatus,
 | 
			
		||||
};
 | 
			
		||||
use crate::storage::RedisDriver;
 | 
			
		||||
 | 
			
		||||
@@ -508,6 +508,8 @@ impl AppService {
 | 
			
		||||
                                    timeout: job.timeout,
 | 
			
		||||
                                    timeout_ack: 10,
 | 
			
		||||
                                    timeout_result: job.timeout,
 | 
			
		||||
                                    transport_id: None,
 | 
			
		||||
                                    transport_status: None,
 | 
			
		||||
                                    job: vec![job.clone()],
 | 
			
		||||
                                    logs: Vec::new(),
 | 
			
		||||
                                    created_at: ts,
 | 
			
		||||
@@ -589,6 +591,8 @@ impl AppService {
 | 
			
		||||
                timeout: job.timeout,
 | 
			
		||||
                timeout_ack: 10,
 | 
			
		||||
                timeout_result: job.timeout,
 | 
			
		||||
                transport_id: None,
 | 
			
		||||
                transport_status: None,
 | 
			
		||||
                job: vec![job.clone()],
 | 
			
		||||
                logs: Vec::new(),
 | 
			
		||||
                created_at: ts,
 | 
			
		||||
@@ -817,6 +821,21 @@ impl AppService {
 | 
			
		||||
            .await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn update_message_transport(
 | 
			
		||||
        &self,
 | 
			
		||||
        context_id: u32,
 | 
			
		||||
        caller_id: u32,
 | 
			
		||||
        id: u32,
 | 
			
		||||
        transport_id: Option<String>,
 | 
			
		||||
        transport_status: Option<TransportStatus>,
 | 
			
		||||
    ) -> Result<(), BoxError> {
 | 
			
		||||
        // Ensure message exists (provides clearer error)
 | 
			
		||||
        let _ = self.redis.load_message(context_id, caller_id, id).await?;
 | 
			
		||||
        self.redis
 | 
			
		||||
            .update_message_transport(context_id, caller_id, id, transport_id, transport_status)
 | 
			
		||||
            .await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn update_flow_env_vars_merge(
 | 
			
		||||
        &self,
 | 
			
		||||
        context_id: u32,
 | 
			
		||||
 
 | 
			
		||||
@@ -7,7 +7,7 @@ use serde_json::{Map as JsonMap, Value};
 | 
			
		||||
use tokio::sync::Mutex;
 | 
			
		||||
 | 
			
		||||
use crate::models::{
 | 
			
		||||
    Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageStatus, Runner,
 | 
			
		||||
    Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageStatus, Runner, TransportStatus,
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
 | 
			
		||||
@@ -358,6 +358,40 @@ impl RedisDriver {
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Message: update transport_id / transport_status (optionally) and bump updated_at
 | 
			
		||||
    pub async fn update_message_transport(
 | 
			
		||||
        &self,
 | 
			
		||||
        db: u32,
 | 
			
		||||
        caller_id: u32,
 | 
			
		||||
        id: u32,
 | 
			
		||||
        transport_id: Option<String>,
 | 
			
		||||
        transport_status: Option<TransportStatus>,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
        let mut cm = self.manager_for_db(db).await?;
 | 
			
		||||
        let key = Self::message_key(caller_id, id);
 | 
			
		||||
 | 
			
		||||
        let mut pairs: Vec<(String, String)> = Vec::new();
 | 
			
		||||
 | 
			
		||||
        if let Some(tid) = transport_id {
 | 
			
		||||
            pairs.push(("transport_id".to_string(), tid));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if let Some(ts_status) = transport_status {
 | 
			
		||||
            let status_str = match serde_json::to_value(&ts_status)? {
 | 
			
		||||
                Value::String(s) => s,
 | 
			
		||||
                v => v.to_string(),
 | 
			
		||||
            };
 | 
			
		||||
            pairs.push(("transport_status".to_string(), status_str));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // Always bump updated_at
 | 
			
		||||
        let ts = crate::time::current_timestamp();
 | 
			
		||||
        pairs.push(("updated_at".to_string(), ts.to_string()));
 | 
			
		||||
 | 
			
		||||
        let _: usize = cm.hset_multiple(key, &pairs).await?;
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Flow: merge env_vars map and bump updated_at
 | 
			
		||||
    pub async fn update_flow_env_vars_merge(
 | 
			
		||||
        &self,
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user