use std::{collections::HashSet, sync::Arc}; use serde_json::{Value, json}; use tokio::sync::Semaphore; use crate::{ clients::{Destination, MyceliumClient, SupervisorClient}, models::{Job, JobStatus, Message, MessageStatus, ScriptType, TransportStatus}, service::AppService, }; use tracing::{error, info}; #[derive(Clone, Debug)] pub struct RouterConfig { pub context_ids: Vec, pub concurrency: usize, pub base_url: String, // e.g. http://127.0.0.1:8990 pub topic: String, // e.g. "supervisor.rpc" // Transport status polling configuration pub transport_poll_interval_secs: u64, // e.g. 2 pub transport_poll_timeout_secs: u64, // e.g. 300 (5 minutes) } /// Start background router loops, one per context. /// Each loop: /// - BRPOP msg_out with 1s timeout /// - Loads the Message by key, selects a Runner by script_type /// - Sends supervisor JSON-RPC via Mycelium /// - On success: Message.status = Acknowledged /// - On error: Message.status = Error and append a log pub fn start_router(service: AppService, cfg: RouterConfig) -> Vec> { let mut handles = Vec::new(); for ctx_id in cfg.context_ids.clone() { let service_cloned = service.clone(); let cfg_cloned = cfg.clone(); let handle = tokio::spawn(async move { let sem = Arc::new(Semaphore::new(cfg_cloned.concurrency)); // Create a shared Mycelium client for this context loop (retry until available) let mycelium = loop { match MyceliumClient::new(cfg_cloned.base_url.clone()) { Ok(c) => break Arc::new(c), Err(e) => { error!(context_id=ctx_id, error=%e, "MyceliumClient init error"); tokio::time::sleep(std::time::Duration::from_secs(1)).await; } } }; loop { // Pop next message key (blocking with timeout) match service_cloned.brpop_msg_out(ctx_id, 1).await { Ok(Some(key)) => { let permit = { // acquire a concurrency permit (non-fair is fine) let sem = sem.clone(); // if semaphore is exhausted, await until a slot becomes available match sem.acquire_owned().await { Ok(p) => p, Err(_) => { // Semaphore closed; exit loop break; } } }; let service_task = service_cloned.clone(); let cfg_task = cfg_cloned.clone(); tokio::spawn({ let mycelium = mycelium.clone(); async move { // Ensure permit is dropped at end of task let _permit = permit; if let Err(e) = deliver_one(&service_task, &cfg_task, ctx_id, &key, mycelium) .await { error!(context_id=ctx_id, key=%key, error=%e, "Delivery error"); } } }); } Ok(None) => { // timeout: just tick continue; } Err(e) => { error!(context_id=ctx_id, error=%e, "BRPOP error"); // small backoff to avoid busy-loop on persistent errors tokio::time::sleep(std::time::Duration::from_millis(200)).await; } } } }); handles.push(handle); } handles } async fn deliver_one( service: &AppService, cfg: &RouterConfig, context_id: u32, msg_key: &str, mycelium: Arc, ) -> Result<(), Box> { // Parse "message:{caller_id}:{id}" let (caller_id, id) = parse_message_key(msg_key) .ok_or_else(|| format!("invalid message key format: {}", msg_key))?; // Load message let msg: Message = service.load_message(context_id, caller_id, id).await?; // Embedded job id (if any) let job_id_opt: Option = msg.job.first().map(|j| j.id); // Determine routing script_type let desired: ScriptType = determine_script_type(&msg); // Discover runners and select a matching one let runners = service.scan_runners(context_id).await?; let Some(runner) = runners.into_iter().find(|r| r.script_type == desired) else { let log = format!( "No runner with script_type {:?} available in context {} for message {}", desired, context_id, msg_key ); let _ = service .append_message_logs(context_id, caller_id, id, vec![log.clone()]) .await; let _ = service .update_message_status(context_id, caller_id, id, MessageStatus::Error) .await; return Err(log.into()); }; // Build SupervisorClient let dest = if !runner.pubkey.trim().is_empty() { Destination::Pk(runner.pubkey.clone()) } else { Destination::Ip(runner.address) }; // Keep clones for poller usage let dest_for_poller = dest.clone(); let topic_for_poller = cfg.topic.clone(); let client = SupervisorClient::new_with_client( mycelium.clone(), dest.clone(), cfg.topic.clone(), None, // secret ); // Build supervisor method and params from Message let method = msg.message.clone(); let params = build_params(&msg)?; // Send let out_id = client.call(&method, params).await?; // Store transport id and initial Sent status let _ = service .update_message_transport( context_id, caller_id, id, Some(out_id.clone()), Some(TransportStatus::Sent), ) .await; // Mark as acknowledged on success service .update_message_status(context_id, caller_id, id, MessageStatus::Acknowledged) .await?; // Spawn transport-status poller { let service_poll = service.clone(); let poll_interval = std::time::Duration::from_secs(cfg.transport_poll_interval_secs); let poll_timeout = std::time::Duration::from_secs(cfg.transport_poll_timeout_secs); let out_id_cloned = out_id.clone(); let mycelium = mycelium.clone(); // Determine reply timeout for supervisor job.result: prefer message.timeout_result, fallback to router config timeout let job_result_reply_timeout: u64 = if msg.timeout_result > 0 { msg.timeout_result as u64 } else { cfg.transport_poll_timeout_secs }; tokio::spawn(async move { let start = std::time::Instant::now(); let client = mycelium; // Supervisor call context captured for sync status checks let sup_dest = dest_for_poller; let sup_topic = topic_for_poller; let job_id_opt = job_id_opt; let mut last_status: Option = Some(TransportStatus::Sent); loop { if start.elapsed() >= poll_timeout { let _ = service_poll .append_message_logs( context_id, caller_id, id, vec!["Transport-status polling timed out".to_string()], ) .await; // leave last known status; do not override break; } match client.message_status(&out_id_cloned).await { Ok(s) => { if last_status.as_ref() != Some(&s) { let _ = service_poll .update_message_transport( context_id, caller_id, id, None, Some(s.clone()), ) .await; last_status = Some(s.clone()); } // Stop on terminal states if matches!(s, TransportStatus::Delivered | TransportStatus::Read) { // On Read, fetch supervisor job.status and update local job/message if terminal if matches!(s, TransportStatus::Read) && let Some(job_id) = job_id_opt { let sup = SupervisorClient::new_with_client( client.clone(), sup_dest.clone(), sup_topic.clone(), None, ); match sup.job_status_sync(job_id.to_string(), 10).await { Ok(remote_status) => { if let Some((mapped, terminal)) = map_supervisor_job_status(&remote_status) { if terminal { let _ = service_poll .update_job_status_unchecked( context_id, caller_id, job_id, mapped.clone(), ) .await; // After terminal status, fetch supervisor job.result and store into Job.result let sup = SupervisorClient::new_with_client( client.clone(), sup_dest.clone(), sup_topic.clone(), None, ); match sup .job_result_sync( job_id.to_string(), job_result_reply_timeout, ) .await { Ok(result_map) => { // Persist the result into the Job.result map (merge) let _ = service_poll .update_job_result_merge_unchecked( context_id, caller_id, job_id, result_map.clone(), ) .await; // Log which key was stored (success or error) let key = result_map .keys() .next() .cloned() .unwrap_or_else(|| { "unknown".to_string() }); let _ = service_poll .append_message_logs( context_id, caller_id, id, vec![format!( "Stored supervisor job.result for job {} ({})", job_id, key )], ) .await; } Err(e) => { let _ = service_poll .append_message_logs( context_id, caller_id, id, vec![format!( "job.result fetch error for job {}: {}", job_id, e )], ) .await; } } // Mark message as processed let _ = service_poll .update_message_status( context_id, caller_id, id, MessageStatus::Processed, ) .await; let _ = service_poll .append_message_logs( context_id, caller_id, id, vec![format!( "Supervisor job.status for job {} -> {} (mapped to {:?})", job_id, remote_status, mapped )], ) .await; } } else { let _ = service_poll .append_message_logs( context_id, caller_id, id, vec![format!( "Unknown supervisor status '{}' for job {}", remote_status, job_id )], ) .await; } } Err(e) => { let _ = service_poll .append_message_logs( context_id, caller_id, id, vec![format!("job.status sync error: {}", e)], ) .await; } } } break; } if matches!(s, TransportStatus::Failed) { let _ = service_poll .append_message_logs( context_id, caller_id, id, vec![format!( "Transport failed for outbound id {out_id_cloned}" )], ) .await; break; } } Err(e) => { // Log and continue polling let _ = service_poll .append_message_logs( context_id, caller_id, id, vec![format!("messageStatus query error: {e}")], ) .await; } } tokio::time::sleep(poll_interval).await; } }); } Ok(()) } fn determine_script_type(msg: &Message) -> ScriptType { // Prefer embedded job's script_type if available, else fallback to message.message_type match msg.job.first() { Some(j) => j.script_type.clone(), None => msg.message_type.clone(), } } fn build_params(msg: &Message) -> Result> { // Minimal mapping: // - "job.run" with exactly one embedded job: [{ "job": }] // - otherwise: [] if msg.message == "job.run" && let Some(j) = msg.job.first() { let jv = job_to_json(j)?; return Ok(json!([ { "job": jv } ])); } Ok(json!([])) } fn job_to_json(job: &Job) -> Result> { Ok(serde_json::to_value(job)?) } fn parse_message_key(s: &str) -> Option<(u32, u32)> { // Expect "message:{caller_id}:{id}" let mut it = s.split(':'); match (it.next(), it.next(), it.next(), it.next()) { (Some("message"), Some(caller), Some(id), None) => { let caller_id = caller.parse::().ok()?; let msg_id = id.parse::().ok()?; Some((caller_id, msg_id)) } _ => None, } } /// Map supervisor job.status -> (local JobStatus, terminal) fn map_supervisor_job_status(s: &str) -> Option<(JobStatus, bool)> { match s { "created" | "queued" => Some((JobStatus::Dispatched, false)), "running" => Some((JobStatus::Started, false)), "completed" => Some((JobStatus::Finished, true)), "failed" | "timeout" => Some((JobStatus::Error, true)), _ => None, } } /// Auto-discover contexts periodically and ensure a router loop exists for each. /// Returns a JoinHandle of the discovery task (router loops are detached). pub fn start_router_auto(service: AppService, cfg: RouterConfig) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { let mut active: HashSet = HashSet::new(); loop { match service.list_context_ids().await { Ok(ids) => { for ctx_id in ids { if !active.contains(&ctx_id) { // Spawn a loop for this new context let cfg_ctx = RouterConfig { context_ids: vec![ctx_id], ..cfg.clone() }; let _ = start_router(service.clone(), cfg_ctx); active.insert(ctx_id); info!(context_id = ctx_id, "Started loop for context"); } } } Err(e) => { error!(error=%e, "list_context_ids error"); } } tokio::time::sleep(std::time::Duration::from_secs(5)).await; } }) }