Listen for responses of supervisors
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
@@ -3,6 +3,8 @@ use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
use reqwest::Client as HttpClient;
|
||||
|
||||
use base64::Engine;
|
||||
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
|
||||
use serde_json::{Value, json};
|
||||
use thiserror::Error;
|
||||
|
||||
@@ -159,6 +161,83 @@ impl MyceliumClient {
|
||||
.and_then(|v| v.as_str())
|
||||
.map(|s| s.to_string())
|
||||
}
|
||||
/// popMessage: retrieve an inbound message if available (optionally filtered by topic).
|
||||
/// - peek: if true, do not remove the message from the queue
|
||||
/// - timeout_secs: seconds to wait for a message (0 returns immediately)
|
||||
/// - topic_plain: optional plain-text topic which will be base64-encoded per Mycelium spec
|
||||
/// Returns:
|
||||
/// - Ok(Some(result_json)) on success, where result_json matches InboundMessage schema
|
||||
/// - Ok(None) when there is no message ready (Mycelium returns error code 204)
|
||||
pub async fn pop_message(
|
||||
&self,
|
||||
peek: Option<bool>,
|
||||
timeout_secs: Option<u64>,
|
||||
topic_plain: Option<&str>,
|
||||
) -> Result<Option<Value>, MyceliumClientError> {
|
||||
// Build params array
|
||||
let mut params_array = vec![];
|
||||
if let Some(p) = peek {
|
||||
params_array.push(serde_json::Value::Bool(p));
|
||||
} else {
|
||||
params_array.push(serde_json::Value::Null)
|
||||
}
|
||||
if let Some(t) = timeout_secs {
|
||||
params_array.push(serde_json::Value::Number(t.into()));
|
||||
} else {
|
||||
params_array.push(serde_json::Value::Null)
|
||||
}
|
||||
if let Some(tp) = topic_plain {
|
||||
let topic_b64 = BASE64_STANDARD.encode(tp.as_bytes());
|
||||
params_array.push(serde_json::Value::String(topic_b64));
|
||||
} else {
|
||||
params_array.push(serde_json::Value::Null)
|
||||
}
|
||||
|
||||
let req = json!({
|
||||
"jsonrpc": "2.0",
|
||||
"id": self.next_id(),
|
||||
"method": "popMessage",
|
||||
"params": serde_json::Value::Array(params_array),
|
||||
});
|
||||
|
||||
tracing::info!(%req, "calling popMessage");
|
||||
|
||||
let resp = self.http.post(&self.base_url).json(&req).send().await?;
|
||||
let status = resp.status();
|
||||
let body: Value = resp.json().await?;
|
||||
|
||||
// Handle JSON-RPC error envelope specially for code 204 (no message ready)
|
||||
if let Some(err) = body.get("error") {
|
||||
let code = err.get("code").and_then(|v| v.as_i64()).unwrap_or(0);
|
||||
let msg = err
|
||||
.get("message")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("unknown error");
|
||||
|
||||
if code == 204 {
|
||||
// No message ready
|
||||
return Ok(None);
|
||||
}
|
||||
if code == 408 {
|
||||
// Align with other transport timeout mapping
|
||||
return Err(MyceliumClientError::TransportTimeout);
|
||||
}
|
||||
return Err(MyceliumClientError::RpcError(format!(
|
||||
"code={code} msg={msg}"
|
||||
)));
|
||||
}
|
||||
|
||||
if !status.is_success() {
|
||||
return Err(MyceliumClientError::RpcError(format!(
|
||||
"HTTP {status}, body {body}"
|
||||
)));
|
||||
}
|
||||
|
||||
let result = body.get("result").ok_or_else(|| {
|
||||
MyceliumClientError::InvalidResponse(format!("missing result in response: {body}"))
|
||||
})?;
|
||||
Ok(Some(result.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@@ -99,6 +99,16 @@ impl SupervisorClient {
|
||||
})
|
||||
}
|
||||
|
||||
/// Build a supervisor JSON-RPC payload but force a specific id (used for correlation).
|
||||
fn build_supervisor_payload_with_id(&self, method: &str, params: Value, id: u64) -> Value {
|
||||
json!({
|
||||
"jsonrpc": "2.0",
|
||||
"id": id,
|
||||
"method": method,
|
||||
"params": params,
|
||||
})
|
||||
}
|
||||
|
||||
fn encode_payload(payload: &Value) -> Result<String, SupervisorClientError> {
|
||||
let s = serde_json::to_string(payload)?;
|
||||
Ok(BASE64_STANDARD.encode(s.as_bytes()))
|
||||
@@ -147,6 +157,42 @@ impl SupervisorClient {
|
||||
)))
|
||||
}
|
||||
|
||||
/// Variant of call that also returns the inner supervisor JSON-RPC id used in the payload.
|
||||
/// This id is required to correlate asynchronous popMessage replies coming from Mycelium.
|
||||
pub async fn call_with_ids(
|
||||
&self,
|
||||
method: &str,
|
||||
params: Value,
|
||||
) -> Result<(String, u64), SupervisorClientError> {
|
||||
let inner_id = self.next_id();
|
||||
let inner = self.build_supervisor_payload_with_id(method, params, inner_id);
|
||||
let payload_b64 = Self::encode_payload(&inner)?;
|
||||
let result = self
|
||||
.mycelium
|
||||
.push_message(
|
||||
&self.destination,
|
||||
&Self::encode_topic(self.topic.as_bytes()),
|
||||
&payload_b64,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let out_id = if let Some(id) = MyceliumClient::extract_message_id_from_result(&result) {
|
||||
id
|
||||
} else if let Some(arr) = result.as_array()
|
||||
&& arr.len() == 1
|
||||
&& let Some(id) = MyceliumClient::extract_message_id_from_result(&arr[0])
|
||||
{
|
||||
id
|
||||
} else {
|
||||
return Err(SupervisorClientError::InvalidResponse(format!(
|
||||
"result did not contain message id: {result}"
|
||||
)));
|
||||
};
|
||||
|
||||
Ok((out_id, inner_id))
|
||||
}
|
||||
|
||||
/// Synchronous variant: wait for a JSON-RPC reply via Mycelium reply_timeout, and return the inner JSON-RPC "result".
|
||||
/// If the supervisor returns an error object, map to RpcError.
|
||||
pub async fn call_sync(
|
||||
@@ -308,6 +354,19 @@ impl SupervisorClient {
|
||||
self.call("job.run", params).await
|
||||
}
|
||||
|
||||
/// Typed wrapper returning both outbound Mycelium id and inner supervisor JSON-RPC id.
|
||||
pub async fn job_run_with_ids(
|
||||
&self,
|
||||
job: Value,
|
||||
) -> Result<(String, u64), SupervisorClientError> {
|
||||
let secret = self.need_secret()?;
|
||||
let params = json!([{
|
||||
"secret": secret,
|
||||
"job": job
|
||||
}]);
|
||||
self.call_with_ids("job.run", params).await
|
||||
}
|
||||
|
||||
pub async fn job_start(
|
||||
&self,
|
||||
job_id: impl Into<String>,
|
||||
|
@@ -99,7 +99,7 @@ async fn main() {
|
||||
// Shared application state
|
||||
let state = Arc::new(herocoordinator::rpc::AppState::new(service));
|
||||
|
||||
// Start router workers (auto-discovered contexts)
|
||||
// Start router workers (auto-discovered contexts) and a single global inbound listener
|
||||
{
|
||||
let base_url = format!("http://{}:{}", cli.mycelium_ip, cli.mycelium_port);
|
||||
let cfg = herocoordinator::router::RouterConfig {
|
||||
@@ -110,6 +110,10 @@ async fn main() {
|
||||
transport_poll_interval_secs: 2,
|
||||
transport_poll_timeout_secs: 300,
|
||||
};
|
||||
// Global inbound listener for supervisor replies via Mycelium popMessage
|
||||
let _inbound_handle =
|
||||
herocoordinator::router::start_inbound_listener(service_for_router.clone(), cfg.clone());
|
||||
// Per-context outbound delivery loops
|
||||
let _auto_handle = herocoordinator::router::start_router_auto(service_for_router, cfg);
|
||||
}
|
||||
|
||||
|
180
src/router.rs
180
src/router.rs
@@ -1,5 +1,7 @@
|
||||
use std::{collections::HashSet, sync::Arc};
|
||||
|
||||
use base64::Engine;
|
||||
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
|
||||
use serde_json::{Value, json};
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
@@ -151,21 +153,25 @@ async fn deliver_one(
|
||||
// Build supervisor method and params from Message
|
||||
let method = msg.message.clone();
|
||||
let params = build_params(&msg)?;
|
||||
|
||||
|
||||
// Send
|
||||
// If this is a job.run and we have a secret configured on the client,
|
||||
// prefer the typed wrapper that injects the secret into inner supervisor params.
|
||||
let out_id = if method == "job.run" {
|
||||
// prefer the typed wrapper that injects the secret into inner supervisor params,
|
||||
// and also capture the inner supervisor JSON-RPC id for correlation.
|
||||
let (out_id, inner_id_opt) = if method == "job.run" {
|
||||
if let Some(j) = msg.job.first() {
|
||||
let jv = job_to_json(j)?;
|
||||
// This uses SupervisorClient::job_run, which sets {"secret": "...", "job": <job>}
|
||||
client.job_run(jv).await?
|
||||
// Returns (outbound message id, inner supervisor JSON-RPC id)
|
||||
let (out, inner) = client.job_run_with_ids(jv).await?;
|
||||
(out, Some(inner))
|
||||
} else {
|
||||
// Fallback: no embedded job, use the generic call
|
||||
client.call(&method, params).await?
|
||||
let out = client.call(&method, params).await?;
|
||||
(out, None)
|
||||
}
|
||||
} else {
|
||||
client.call(&method, params).await?
|
||||
let out = client.call(&method, params).await?;
|
||||
(out, None)
|
||||
};
|
||||
|
||||
// Store transport id and initial Sent status
|
||||
@@ -184,6 +190,13 @@ async fn deliver_one(
|
||||
.update_message_status(context_id, caller_id, id, MessageStatus::Acknowledged)
|
||||
.await?;
|
||||
|
||||
// Record correlation (inner supervisor JSON-RPC id -> job/message) for inbound popMessage handling
|
||||
if let (Some(inner_id), Some(job_id)) = (inner_id_opt, job_id_opt) {
|
||||
let _ = service
|
||||
.supcorr_set(inner_id, context_id, caller_id, job_id, id)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Spawn transport-status poller
|
||||
{
|
||||
let service_poll = service.clone();
|
||||
@@ -487,3 +500,156 @@ pub fn start_router_auto(service: AppService, cfg: RouterConfig) -> tokio::task:
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Start a single global inbound listener that reads Mycelium popMessage with topic filter,
|
||||
/// decodes supervisor JSON-RPC replies, and updates correlated jobs/messages.
|
||||
/// This listens for async replies like {"result":{"job_queued":...}} carrying the same inner JSON-RPC id.
|
||||
pub fn start_inbound_listener(
|
||||
service: AppService,
|
||||
cfg: RouterConfig,
|
||||
) -> tokio::task::JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
// Initialize Mycelium client (retry loop)
|
||||
let mycelium = loop {
|
||||
match MyceliumClient::new(cfg.base_url.clone()) {
|
||||
Ok(c) => break c,
|
||||
Err(e) => {
|
||||
error!(error=%e, "MyceliumClient init error (inbound listener)");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
loop {
|
||||
// Poll for inbound supervisor messages on the configured topic
|
||||
match mycelium
|
||||
.pop_message(Some(false), Some(20), Some(cfg.topic.as_str()))
|
||||
.await
|
||||
{
|
||||
Ok(Some(inb)) => {
|
||||
// Expect InboundMessage with base64 "payload"
|
||||
let Some(payload_b64) = inb.get("payload").and_then(|v| v.as_str()) else {
|
||||
// Not a payload-bearing message; ignore
|
||||
continue;
|
||||
};
|
||||
let Ok(raw) = BASE64_STANDARD.decode(payload_b64.as_bytes()) else {
|
||||
let _ = service
|
||||
.append_message_logs(
|
||||
0, // unknown context yet
|
||||
0,
|
||||
0,
|
||||
vec![
|
||||
"Inbound payload base64 decode error (supervisor reply)".into(),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
continue;
|
||||
};
|
||||
let Ok(rpc): Result<Value, _> = serde_json::from_slice(&raw) else {
|
||||
// Invalid JSON payload
|
||||
continue;
|
||||
};
|
||||
|
||||
// Extract inner supervisor JSON-RPC id (number preferred; string fallback)
|
||||
let inner_id_u64 = match rpc.get("id") {
|
||||
Some(Value::Number(n)) => n.as_u64(),
|
||||
Some(Value::String(s)) => s.parse::<u64>().ok(),
|
||||
_ => None,
|
||||
};
|
||||
let Some(inner_id) = inner_id_u64 else {
|
||||
// Cannot correlate without id
|
||||
continue;
|
||||
};
|
||||
|
||||
// Lookup correlation mapping
|
||||
match service.supcorr_get(inner_id).await {
|
||||
Ok(Some((context_id, caller_id, job_id, message_id))) => {
|
||||
// Determine success/error from supervisor JSON-RPC envelope
|
||||
let is_success = rpc
|
||||
.get("result")
|
||||
.map(|res| {
|
||||
res.get("job_queued").is_some()
|
||||
|| res.as_str().map(|s| s == "job_queued").unwrap_or(false)
|
||||
})
|
||||
.unwrap_or(false);
|
||||
|
||||
if is_success {
|
||||
// Set to Dispatched (idempotent) per spec choice, and append log
|
||||
let _ = service
|
||||
.update_job_status_unchecked(
|
||||
context_id,
|
||||
caller_id,
|
||||
job_id,
|
||||
JobStatus::Dispatched,
|
||||
)
|
||||
.await;
|
||||
let _ = service
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
message_id,
|
||||
vec![format!(
|
||||
"Supervisor reply for job {}: job_queued",
|
||||
job_id
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
let _ = service.supcorr_del(inner_id).await;
|
||||
} else if let Some(err_obj) = rpc.get("error") {
|
||||
// Error path: set job Error and log details
|
||||
let _ = service
|
||||
.update_job_status_unchecked(
|
||||
context_id,
|
||||
caller_id,
|
||||
job_id,
|
||||
JobStatus::Error,
|
||||
)
|
||||
.await;
|
||||
let _ = service
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
message_id,
|
||||
vec![format!(
|
||||
"Supervisor error for job {}: {}",
|
||||
job_id, err_obj
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
let _ = service.supcorr_del(inner_id).await;
|
||||
} else {
|
||||
// Unknown result; keep correlation for a later, clearer reply
|
||||
let _ = service
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
message_id,
|
||||
vec![
|
||||
"Supervisor reply did not contain job_queued or error"
|
||||
.to_string(),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
// No correlation found; ignore or log once
|
||||
}
|
||||
Err(e) => {
|
||||
error!(error=%e, "supcorr_get error");
|
||||
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
// No message; continue polling
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(error=%e, "popMessage error");
|
||||
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@@ -1161,6 +1161,37 @@ impl AppService {
|
||||
pub async fn scan_runners(&self, context_id: u32) -> Result<Vec<Runner>, BoxError> {
|
||||
self.redis.scan_runners(context_id).await
|
||||
}
|
||||
|
||||
/// Correlation map: store mapping from inner supervisor JSON-RPC id to context/caller/job/message.
|
||||
pub async fn supcorr_set(
|
||||
&self,
|
||||
inner_id: u64,
|
||||
context_id: u32,
|
||||
caller_id: u32,
|
||||
job_id: u32,
|
||||
message_id: u32,
|
||||
) -> Result<(), BoxError> {
|
||||
self.redis
|
||||
.supcorr_set(inner_id, context_id, caller_id, job_id, message_id)
|
||||
.await
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
/// Correlation map: load mapping by inner supervisor JSON-RPC id.
|
||||
pub async fn supcorr_get(
|
||||
&self,
|
||||
inner_id: u64,
|
||||
) -> Result<Option<(u32, u32, u32, u32)>, BoxError> {
|
||||
self.redis
|
||||
.supcorr_get(inner_id)
|
||||
.await
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
/// Correlation map: delete mapping by inner supervisor JSON-RPC id.
|
||||
pub async fn supcorr_del(&self, inner_id: u64) -> Result<(), BoxError> {
|
||||
self.redis.supcorr_del(inner_id).await.map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
/// Auto-discovery helpers for contexts (wrappers over RedisDriver)
|
||||
|
@@ -751,4 +751,80 @@ impl RedisDriver {
|
||||
out.sort_unstable();
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
// -----------------------------
|
||||
// Supervisor correlation mapping (DB 0)
|
||||
// Key: "supcorr:{inner_id_decimal}"
|
||||
// Value: JSON {"context_id":u32,"caller_id":u32,"job_id":u32,"message_id":u32}
|
||||
// TTL: 1 hour to avoid leaks in case of crashes
|
||||
pub async fn supcorr_set(
|
||||
&self,
|
||||
inner_id: u64,
|
||||
context_id: u32,
|
||||
caller_id: u32,
|
||||
job_id: u32,
|
||||
message_id: u32,
|
||||
) -> Result<()> {
|
||||
let mut cm = self.manager_for_db(0).await?;
|
||||
let key = format!("supcorr:{}", inner_id);
|
||||
let val = serde_json::json!({
|
||||
"context_id": context_id,
|
||||
"caller_id": caller_id,
|
||||
"job_id": job_id,
|
||||
"message_id": message_id,
|
||||
})
|
||||
.to_string();
|
||||
// SET key val EX 3600
|
||||
let _: () = redis::cmd("SET")
|
||||
.arg(&key)
|
||||
.arg(&val)
|
||||
.arg("EX")
|
||||
.arg(3600)
|
||||
.query_async(&mut cm)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!(db=0, key=%key, error=%e, "SET supcorr_set failed");
|
||||
e
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn supcorr_get(
|
||||
&self,
|
||||
inner_id: u64,
|
||||
) -> Result<Option<(u32, u32, u32, u32)>> {
|
||||
let mut cm = self.manager_for_db(0).await?;
|
||||
let key = format!("supcorr:{}", inner_id);
|
||||
let res: Option<String> = redis::cmd("GET")
|
||||
.arg(&key)
|
||||
.query_async(&mut cm)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!(db=0, key=%key, error=%e, "GET supcorr_get failed");
|
||||
e
|
||||
})?;
|
||||
if let Some(s) = res {
|
||||
let v: Value = serde_json::from_str(&s)?;
|
||||
let ctx = v.get("context_id").and_then(|x| x.as_u64()).unwrap_or(0) as u32;
|
||||
let caller = v.get("caller_id").and_then(|x| x.as_u64()).unwrap_or(0) as u32;
|
||||
let job = v.get("job_id").and_then(|x| x.as_u64()).unwrap_or(0) as u32;
|
||||
let msg = v.get("message_id").and_then(|x| x.as_u64()).unwrap_or(0) as u32;
|
||||
return Ok(Some((ctx, caller, job, msg)));
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub async fn supcorr_del(&self, inner_id: u64) -> Result<()> {
|
||||
let mut cm = self.manager_for_db(0).await?;
|
||||
let key = format!("supcorr:{}", inner_id);
|
||||
let _: i64 = redis::cmd("DEL")
|
||||
.arg(&key)
|
||||
.query_async(&mut cm)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!(db=0, key=%key, error=%e, "DEL supcorr_del failed");
|
||||
e
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user