From 2c88114d4591a68173cf4c8a252cb5ee4adf83fa Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Fri, 5 Sep 2025 13:23:48 +0200 Subject: [PATCH] Remove notion of sync calls Signed-off-by: Lee Smet --- src/clients/supervisor_client.rs | 123 +-------- src/router.rs | 454 +++++++++++++++++++++---------- 2 files changed, 321 insertions(+), 256 deletions(-) diff --git a/src/clients/supervisor_client.rs b/src/clients/supervisor_client.rs index b17a5c3..a5daa07 100644 --- a/src/clients/supervisor_client.rs +++ b/src/clients/supervisor_client.rs @@ -193,66 +193,6 @@ impl SupervisorClient { Ok((out_id, inner_id)) } - /// Synchronous variant: wait for a JSON-RPC reply via Mycelium reply_timeout, and return the inner JSON-RPC "result". - /// If the supervisor returns an error object, map to RpcError. - pub async fn call_sync( - &self, - method: &str, - params: Value, - reply_timeout_secs: u64, - ) -> Result { - let inner = self.build_supervisor_payload(method, params); - let payload_b64 = Self::encode_payload(&inner)?; - - let result = self - .mycelium - .push_message( - &self.destination, - &Self::encode_topic(self.topic.as_bytes()), - &payload_b64, - Some(reply_timeout_secs), - ) - .await?; - - // Expect an InboundMessage-like with a base64 payload containing the supervisor JSON-RPC response - let payload_field = if let Some(p) = result.get("payload").and_then(|v| v.as_str()) { - p.to_string() - } else if let Some(arr) = result.as_array() { - // Defensive: handle single-element array shape - if let Some(one) = arr.get(0) { - one.get("payload") - .and_then(|v| v.as_str()) - .map(|s| s.to_string()) - .ok_or_else(|| { - SupervisorClientError::InvalidResponse(format!( - "missing payload in result: {result}" - )) - })? - } else { - return Err(SupervisorClientError::TransportTimeout); - } - } else { - // No payload => no reply received within timeout (Mycelium would have returned just an id) - return Err(SupervisorClientError::TransportTimeout); - }; - - let raw = BASE64_STANDARD - .decode(payload_field.as_bytes()) - .map_err(|e| { - SupervisorClientError::InvalidResponse(format!("invalid base64 payload: {e}")) - })?; - let rpc_resp: Value = serde_json::from_slice(&raw)?; - - if let Some(err) = rpc_resp.get("error") { - return Err(SupervisorClientError::RpcError(err.to_string())); - } - let res = rpc_resp.get("result").ok_or_else(|| { - SupervisorClientError::InvalidResponse(format!( - "missing result in supervisor reply: {rpc_resp}" - )) - })?; - Ok(res.clone()) - } fn need_secret(&self) -> Result<&str, SupervisorClientError> { self.secret @@ -386,28 +326,15 @@ impl SupervisorClient { self.call("job.status", json!([job_id.into()])).await } - /// Synchronous job.status: waits for the supervisor to reply and returns the status string. - /// The supervisor result may be an object with { status: "..." } or a bare string. - pub async fn job_status_sync( + /// Asynchronous job.status returning outbound and inner IDs for correlation + pub async fn job_status_with_ids( &self, job_id: impl Into, - reply_timeout_secs: u64, - ) -> Result { - let res = self - .call_sync("job.status", json!([job_id.into()]), reply_timeout_secs) - .await?; - let status = if let Some(s) = res.get("status").and_then(|v| v.as_str()) { - s.to_string() - } else if let Some(s) = res.as_str() { - s.to_string() - } else { - return Err(SupervisorClientError::InvalidResponse(format!( - "unexpected job.status result shape: {res}" - ))); - }; - Ok(status) + ) -> Result<(String, u64), SupervisorClientError> { + self.call_with_ids("job.status", json!([job_id.into()])).await } + pub async fn job_result( &self, job_id: impl Into, @@ -415,45 +342,15 @@ impl SupervisorClient { self.call("job.result", json!([job_id.into()])).await } - /// Synchronous job.result: waits for the supervisor to reply and returns a map - /// containing exactly one of: - /// - {"success": "..."} on success - /// - {"error": "..."} on error reported by the runner - /// Some servers may return a bare string; we treat that as {"success": ""}. - pub async fn job_result_sync( + /// Asynchronous job.result returning outbound and inner IDs for correlation + pub async fn job_result_with_ids( &self, job_id: impl Into, - reply_timeout_secs: u64, - ) -> Result, SupervisorClientError> { - let res = self - .call_sync("job.result", json!([job_id.into()]), reply_timeout_secs) - .await?; - - use std::collections::HashMap; - let mut out: HashMap = HashMap::new(); - - if let Some(obj) = res.as_object() { - if let Some(s) = obj.get("success").and_then(|v| v.as_str()) { - out.insert("success".to_string(), s.to_string()); - return Ok(out); - } - if let Some(s) = obj.get("error").and_then(|v| v.as_str()) { - out.insert("error".to_string(), s.to_string()); - return Ok(out); - } - return Err(SupervisorClientError::InvalidResponse(format!( - "unexpected job.result result shape: {res}" - ))); - } else if let Some(s) = res.as_str() { - out.insert("success".to_string(), s.to_string()); - return Ok(out); - } - - Err(SupervisorClientError::InvalidResponse(format!( - "unexpected job.result result shape: {res}" - ))) + ) -> Result<(String, u64), SupervisorClientError> { + self.call_with_ids("job.result", json!([job_id.into()])).await } + pub async fn job_stop( &self, job_id: impl Into, diff --git a/src/router.rs b/src/router.rs index e8a7da7..365ab41 100644 --- a/src/router.rs +++ b/src/router.rs @@ -204,12 +204,6 @@ async fn deliver_one( let poll_timeout = std::time::Duration::from_secs(cfg.transport_poll_timeout_secs); let out_id_cloned = out_id.clone(); let mycelium = mycelium.clone(); - // Determine reply timeout for supervisor job.result: prefer message.timeout_result, fallback to router config timeout - let job_result_reply_timeout: u64 = if msg.timeout_result > 0 { - msg.timeout_result as u64 - } else { - cfg.transport_poll_timeout_secs - }; tokio::spawn(async move { let start = std::time::Instant::now(); @@ -253,124 +247,35 @@ async fn deliver_one( // Stop on terminal states if matches!(s, TransportStatus::Delivered | TransportStatus::Read) { - // On Read, fetch supervisor job.status and update local job/message if terminal - if matches!(s, TransportStatus::Read) - && let Some(job_id) = job_id_opt - { + // On Read, request supervisor job.status asynchronously; inbound listener will handle replies + // if matches!(s, TransportStatus::Read) + // && let Some(job_id) = job_id_opt + if let Some(job_id) = job_id_opt { let sup = SupervisorClient::new_with_client( client.clone(), sup_dest.clone(), sup_topic.clone(), secret_for_poller.clone(), ); - match sup.job_status_sync(job_id.to_string(), 10).await { - Ok(remote_status) => { - if let Some((mapped, terminal)) = - map_supervisor_job_status(&remote_status) - { - if terminal { - let _ = service_poll - .update_job_status_unchecked( - context_id, - caller_id, - job_id, - mapped.clone(), - ) - .await; - - // After terminal status, fetch supervisor job.result and store into Job.result - let sup = SupervisorClient::new_with_client( - client.clone(), - sup_dest.clone(), - sup_topic.clone(), - secret_for_poller.clone(), - ); - match sup - .job_result_sync( - job_id.to_string(), - job_result_reply_timeout, - ) - .await - { - Ok(result_map) => { - // Persist the result into the Job.result map (merge) - let _ = service_poll - .update_job_result_merge_unchecked( - context_id, - caller_id, - job_id, - result_map.clone(), - ) - .await; - // Log which key was stored (success or error) - let key = result_map - .keys() - .next() - .cloned() - .unwrap_or_else(|| { - "unknown".to_string() - }); - let _ = service_poll - .append_message_logs( - context_id, - caller_id, - id, - vec![format!( - "Stored supervisor job.result for job {} ({})", - job_id, key - )], - ) - .await; - } - Err(e) => { - let _ = service_poll - .append_message_logs( - context_id, - caller_id, - id, - vec![format!( - "job.result fetch error for job {}: {}", - job_id, e - )], - ) - .await; - } - } - - // Mark message as processed - let _ = service_poll - .update_message_status( - context_id, - caller_id, - id, - MessageStatus::Processed, - ) - .await; - let _ = service_poll - .append_message_logs( - context_id, - caller_id, - id, - vec![format!( - "Supervisor job.status for job {} -> {} (mapped to {:?})", - job_id, remote_status, mapped - )], - ) - .await; - } - } else { - let _ = service_poll - .append_message_logs( - context_id, - caller_id, - id, - vec![format!( - "Unknown supervisor status '{}' for job {}", - remote_status, job_id - )], - ) - .await; - } + match sup.job_status_with_ids(job_id.to_string()).await { + Ok((_out_id, inner_id)) => { + // Correlate this status request to the message/job + let _ = service_poll + .supcorr_set( + inner_id, context_id, caller_id, job_id, id, + ) + .await; + let _ = service_poll + .append_message_logs( + context_id, + caller_id, + id, + vec![format!( + "Requested supervisor job.status for job {}", + job_id + )], + ) + .await; } Err(e) => { let _ = service_poll @@ -378,13 +283,13 @@ async fn deliver_one( context_id, caller_id, id, - vec![format!("job.status sync error: {}", e)], + vec![format!("job.status request error: {}", e)], ) .await; } } } - break; + // break; } if matches!(s, TransportStatus::Failed) { let _ = service_poll @@ -512,7 +417,7 @@ pub fn start_inbound_listener( // Initialize Mycelium client (retry loop) let mycelium = loop { match MyceliumClient::new(cfg.base_url.clone()) { - Ok(c) => break c, + Ok(c) => break Arc::new(c), Err(e) => { error!(error=%e, "MyceliumClient init error (inbound listener)"); tokio::time::sleep(std::time::Duration::from_secs(1)).await; @@ -566,16 +471,25 @@ pub fn start_inbound_listener( match service.supcorr_get(inner_id).await { Ok(Some((context_id, caller_id, job_id, message_id))) => { // Determine success/error from supervisor JSON-RPC envelope - let is_success = rpc - .get("result") - .map(|res| { - res.get("job_queued").is_some() - || res.as_str().map(|s| s == "job_queued").unwrap_or(false) + // Inspect result/error to route job.run/job.status/job.result replies + let result_opt = rpc.get("result"); + let error_opt = rpc.get("error"); + + // Handle job.run success (job_queued) + let is_job_queued = result_opt + .and_then(|res| { + if res.get("job_queued").is_some() { + Some(true) + } else if let Some(s) = res.as_str() { + Some(s == "job_queued") + } else { + None + } }) .unwrap_or(false); - if is_success { - // Set to Dispatched (idempotent) per spec choice, and append log + if is_job_queued { + // Set to Dispatched (idempotent) per spec, and append log let _ = service .update_job_status_unchecked( context_id, @@ -596,8 +510,11 @@ pub fn start_inbound_listener( ) .await; let _ = service.supcorr_del(inner_id).await; - } else if let Some(err_obj) = rpc.get("error") { - // Error path: set job Error and log details + continue; + } + + // Error envelope: set job Error and log + if let Some(err_obj) = error_opt { let _ = service .update_job_status_unchecked( context_id, @@ -618,20 +535,271 @@ pub fn start_inbound_listener( ) .await; let _ = service.supcorr_del(inner_id).await; - } else { - // Unknown result; keep correlation for a later, clearer reply - let _ = service - .append_message_logs( - context_id, - caller_id, - message_id, - vec![ - "Supervisor reply did not contain job_queued or error" - .to_string(), - ], - ) - .await; + continue; } + + // If we have a result, try to interpret it as job.status or job.result + if let Some(res) = result_opt { + // Try job.status: object {status: "..."} or bare string + let status_candidate = res + .get("status") + .and_then(|v| v.as_str()) + .or_else(|| res.as_str()); + + if let Some(remote_status) = status_candidate { + if let Some((mapped, terminal)) = + map_supervisor_job_status(remote_status) + { + // Update job status and log + let _ = service + .update_job_status_unchecked( + context_id, + caller_id, + job_id, + mapped.clone(), + ) + .await; + let _ = service + .append_message_logs( + context_id, + caller_id, + message_id, + vec![format!( + "Supervisor job.status for job {} -> {} (mapped to {:?})", + job_id, remote_status, mapped + )], + ) + .await; + // Done with this correlation id + let _ = service.supcorr_del(inner_id).await; + + // If terminal, request job.result asynchronously now + if terminal { + // Load job to determine script_type for runner selection + match service + .load_job(context_id, caller_id, job_id) + .await + { + Ok(job) => { + match service.scan_runners(context_id).await { + Ok(runners) => { + if let Some(runner) = + runners.into_iter().find(|r| { + r.script_type == job.script_type + }) + { + let dest = if !runner + .pubkey + .trim() + .is_empty() + { + Destination::Pk( + runner.pubkey.clone(), + ) + } else { + Destination::Ip(runner.address) + }; + let sup = SupervisorClient::new_with_client( + mycelium.clone(), + dest, + cfg.topic.clone(), + runner.secret.clone(), + ); + match sup + .job_result_with_ids( + job_id.to_string(), + ) + .await + { + Ok((_out2, inner2)) => { + let _ = service + .supcorr_set( + inner2, context_id, + caller_id, job_id, + message_id, + ) + .await; + let _ = service + .append_message_logs( + context_id, + caller_id, + message_id, + vec![format!( + "Requested supervisor job.result for job {}", + job_id + )], + ) + .await; + } + Err(e) => { + let _ = service + .append_message_logs( + context_id, + caller_id, + message_id, + vec![format!( + "job.result request error for job {}: {}", + job_id, e + )], + ) + .await; + } + } + } else { + let _ = service + .append_message_logs( + context_id, + caller_id, + message_id, + vec![format!( + "No runner with matching script_type found to request job.result for job {}", + job_id + )], + ) + .await; + } + } + Err(e) => { + let _ = service + .append_message_logs( + context_id, + caller_id, + message_id, + vec![format!( + "scan_runners error while requesting job.result for job {}: {}", + job_id, e + )], + ) + .await; + } + } + } + Err(e) => { + let _ = service + .append_message_logs( + context_id, + caller_id, + message_id, + vec![format!( + "load_job error while requesting job.result for job {}: {}", + job_id, e + )], + ) + .await; + } + } + } + continue; + } + } + + // Try job.result: object with success/error or bare string treated as success + if let Some(obj) = res.as_object() { + if let Some(s) = obj.get("success").and_then(|v| v.as_str()) { + let mut patch = std::collections::HashMap::new(); + patch.insert("success".to_string(), s.to_string()); + let _ = service + .update_job_result_merge_unchecked( + context_id, caller_id, job_id, patch, + ) + .await; + let _ = service + .update_message_status( + context_id, + caller_id, + message_id, + MessageStatus::Processed, + ) + .await; + let _ = service + .append_message_logs( + context_id, + caller_id, + message_id, + vec![format!( + "Stored supervisor job.result for job {} (success)", + job_id + )], + ) + .await; + let _ = service.supcorr_del(inner_id).await; + continue; + } + if let Some(s) = obj.get("error").and_then(|v| v.as_str()) { + let mut patch = std::collections::HashMap::new(); + patch.insert("error".to_string(), s.to_string()); + let _ = service + .update_job_result_merge_unchecked( + context_id, caller_id, job_id, patch, + ) + .await; + let _ = service + .update_message_status( + context_id, + caller_id, + message_id, + MessageStatus::Processed, + ) + .await; + let _ = service + .append_message_logs( + context_id, + caller_id, + message_id, + vec![format!( + "Stored supervisor job.result for job {} (error)", + job_id + )], + ) + .await; + let _ = service.supcorr_del(inner_id).await; + continue; + } + } else if let Some(s) = res.as_str() { + // Bare string => treat as success + let mut patch = std::collections::HashMap::new(); + patch.insert("success".to_string(), s.to_string()); + let _ = service + .update_job_result_merge_unchecked( + context_id, caller_id, job_id, patch, + ) + .await; + let _ = service + .update_message_status( + context_id, + caller_id, + message_id, + MessageStatus::Processed, + ) + .await; + let _ = service + .append_message_logs( + context_id, + caller_id, + message_id, + vec![format!( + "Stored supervisor job.result for job {} (success)", + job_id + )], + ) + .await; + let _ = service.supcorr_del(inner_id).await; + continue; + } + } + + // Unknown/unsupported supervisor reply; keep correlation for later + let _ = service + .append_message_logs( + context_id, + caller_id, + message_id, + vec![ + "Supervisor reply did not contain recognizable job.run/status/result fields" + .to_string(), + ], + ) + .await; } Ok(None) => { // No correlation found; ignore or log once