Fetch job results if a job is finished

Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
Lee Smet
2025-08-28 17:09:04 +02:00
parent e5a6228448
commit fce0ccb2d8
3 changed files with 276 additions and 2 deletions

View File

@@ -138,6 +138,54 @@ impl SupervisorClient {
)))
}
/// Synchronous variant: wait for a JSON-RPC reply via Mycelium reply_timeout, and return the inner JSON-RPC "result".
/// If the supervisor returns an error object, map to RpcError.
pub async fn call_sync(
&self,
method: &str,
params: Value,
reply_timeout_secs: u64,
) -> Result<Value, SupervisorClientError> {
let inner = self.build_supervisor_payload(method, params);
let payload_b64 = Self::encode_payload(&inner)?;
let result = self
.mycelium
.push_message(&self.destination, &self.topic, &payload_b64, Some(reply_timeout_secs))
.await?;
// Expect an InboundMessage-like with a base64 payload containing the supervisor JSON-RPC response
let payload_field = if let Some(p) = result.get("payload").and_then(|v| v.as_str()) {
p.to_string()
} else if let Some(arr) = result.as_array() {
// Defensive: handle single-element array shape
if let Some(one) = arr.get(0) {
one.get("payload")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.ok_or_else(|| SupervisorClientError::InvalidResponse(format!("missing payload in result: {result}")))?
} else {
return Err(SupervisorClientError::TransportTimeout);
}
} else {
// No payload => no reply received within timeout (Mycelium would have returned just an id)
return Err(SupervisorClientError::TransportTimeout);
};
let raw = BASE64_STANDARD
.decode(payload_field.as_bytes())
.map_err(|e| SupervisorClientError::InvalidResponse(format!("invalid base64 payload: {e}")))?;
let rpc_resp: Value = serde_json::from_slice(&raw)?;
if let Some(err) = rpc_resp.get("error") {
return Err(SupervisorClientError::RpcError(err.to_string()));
}
let res = rpc_resp
.get("result")
.ok_or_else(|| SupervisorClientError::InvalidResponse(format!("missing result in supervisor reply: {rpc_resp}")))?;
Ok(res.clone())
}
fn need_secret(&self) -> Result<&str, SupervisorClientError> {
self.secret
.as_deref()
@@ -257,6 +305,28 @@ impl SupervisorClient {
self.call("job.status", json!([job_id.into()])).await
}
/// Synchronous job.status: waits for the supervisor to reply and returns the status string.
/// The supervisor result may be an object with { status: "..." } or a bare string.
pub async fn job_status_sync(
&self,
job_id: impl Into<String>,
reply_timeout_secs: u64,
) -> Result<String, SupervisorClientError> {
let res = self
.call_sync("job.status", json!([job_id.into()]), reply_timeout_secs)
.await?;
let status = if let Some(s) = res.get("status").and_then(|v| v.as_str()) {
s.to_string()
} else if let Some(s) = res.as_str() {
s.to_string()
} else {
return Err(SupervisorClientError::InvalidResponse(format!(
"unexpected job.status result shape: {res}"
)));
};
Ok(status)
}
pub async fn job_result(
&self,
job_id: impl Into<String>,
@@ -264,6 +334,45 @@ impl SupervisorClient {
self.call("job.result", json!([job_id.into()])).await
}
/// Synchronous job.result: waits for the supervisor to reply and returns a map
/// containing exactly one of:
/// - {"success": "..."} on success
/// - {"error": "..."} on error reported by the runner
/// Some servers may return a bare string; we treat that as {"success": "<string>"}.
pub async fn job_result_sync(
&self,
job_id: impl Into<String>,
reply_timeout_secs: u64,
) -> Result<std::collections::HashMap<String, String>, SupervisorClientError> {
let res = self
.call_sync("job.result", json!([job_id.into()]), reply_timeout_secs)
.await?;
use std::collections::HashMap;
let mut out: HashMap<String, String> = HashMap::new();
if let Some(obj) = res.as_object() {
if let Some(s) = obj.get("success").and_then(|v| v.as_str()) {
out.insert("success".to_string(), s.to_string());
return Ok(out);
}
if let Some(s) = obj.get("error").and_then(|v| v.as_str()) {
out.insert("error".to_string(), s.to_string());
return Ok(out);
}
return Err(SupervisorClientError::InvalidResponse(format!(
"unexpected job.result result shape: {res}"
)));
} else if let Some(s) = res.as_str() {
out.insert("success".to_string(), s.to_string());
return Ok(out);
}
Err(SupervisorClientError::InvalidResponse(format!(
"unexpected job.result result shape: {res}"
)))
}
pub async fn job_stop(
&self,
job_id: impl Into<String>,

View File

@@ -5,7 +5,7 @@ use tokio::sync::Semaphore;
use crate::{
clients::{Destination, SupervisorClient, MyceliumClient},
models::{Job, Message, MessageStatus, ScriptType, TransportStatus},
models::{Job, JobStatus, Message, MessageStatus, ScriptType, TransportStatus},
service::AppService,
};
@@ -110,6 +110,8 @@ async fn deliver_one(
// Load message
let msg: Message = service.load_message(context_id, caller_id, id).await?;
// Embedded job id (if any)
let job_id_opt: Option<u32> = msg.job.first().map(|j| j.id);
// Determine routing script_type
let desired: ScriptType = determine_script_type(&msg);
@@ -136,9 +138,12 @@ async fn deliver_one(
} else {
Destination::Ip(runner.address)
};
// Keep clones for poller usage
let dest_for_poller = dest.clone();
let topic_for_poller = cfg.topic.clone();
let client = SupervisorClient::new_with_client(
mycelium.clone(),
dest,
dest.clone(),
cfg.topic.clone(),
None, // secret
);
@@ -173,11 +178,22 @@ async fn deliver_one(
let poll_timeout = std::time::Duration::from_secs(cfg.transport_poll_timeout_secs);
let out_id_cloned = out_id.clone();
let mycelium = mycelium.clone();
// Determine reply timeout for supervisor job.result: prefer message.timeout_result, fallback to router config timeout
let job_result_reply_timeout: u64 = if msg.timeout_result > 0 {
msg.timeout_result as u64
} else {
cfg.transport_poll_timeout_secs
};
tokio::spawn(async move {
let start = std::time::Instant::now();
let client = mycelium;
// Supervisor call context captured for sync status checks
let sup_dest = dest_for_poller;
let sup_topic = topic_for_poller;
let job_id_opt = job_id_opt;
let mut last_status: Option<TransportStatus> = Some(TransportStatus::Sent);
loop {
@@ -211,6 +227,128 @@ async fn deliver_one(
// Stop on terminal states
if matches!(s, TransportStatus::Delivered | TransportStatus::Read) {
// On Read, fetch supervisor job.status and update local job/message if terminal
if matches!(s, TransportStatus::Read) {
if let Some(job_id) = job_id_opt {
let sup = SupervisorClient::new_with_client(
client.clone(),
sup_dest.clone(),
sup_topic.clone(),
None,
);
match sup.job_status_sync(job_id.to_string(), 10).await {
Ok(remote_status) => {
if let Some((mapped, terminal)) =
map_supervisor_job_status(&remote_status)
{
if terminal {
let _ = service_poll
.update_job_status_unchecked(
context_id,
caller_id,
job_id,
mapped.clone(),
)
.await;
// After terminal status, fetch supervisor job.result and store into Job.result
let sup = SupervisorClient::new_with_client(
client.clone(),
sup_dest.clone(),
sup_topic.clone(),
None,
);
match sup.job_result_sync(job_id.to_string(), job_result_reply_timeout).await {
Ok(result_map) => {
// Persist the result into the Job.result map (merge)
let _ = service_poll
.update_job_result_merge_unchecked(
context_id,
caller_id,
job_id,
result_map.clone(),
)
.await;
// Log which key was stored (success or error)
let key = result_map.keys().next().cloned().unwrap_or_else(|| "unknown".to_string());
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Stored supervisor job.result for job {} ({})",
job_id, key
)],
)
.await;
}
Err(e) => {
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"job.result fetch error for job {}: {}",
job_id, e
)],
)
.await;
}
}
// Mark message as processed
let _ = service_poll
.update_message_status(
context_id,
caller_id,
id,
MessageStatus::Processed,
)
.await;
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Supervisor job.status for job {} -> {} (mapped to {:?})",
job_id, remote_status, mapped
)],
)
.await;
}
} else {
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Unknown supervisor status '{}' for job {}",
remote_status, job_id
)],
)
.await;
}
}
Err(e) => {
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"job.status sync error: {}",
e
)],
)
.await;
}
}
}
}
break;
}
if matches!(s, TransportStatus::Failed) {
@@ -287,6 +425,17 @@ fn parse_message_key(s: &str) -> Option<(u32, u32)> {
}
}
/// Map supervisor job.status -> (local JobStatus, terminal)
fn map_supervisor_job_status(s: &str) -> Option<(JobStatus, bool)> {
match s {
"created" | "queued" => Some((JobStatus::Dispatched, false)),
"running" => Some((JobStatus::Started, false)),
"completed" => Some((JobStatus::Finished, true)),
"failed" | "timeout" => Some((JobStatus::Error, true)),
_ => None,
}
}
/// Auto-discover contexts periodically and ensure a router loop exists for each.
/// Returns a JoinHandle of the discovery task (router loops are detached).

View File

@@ -974,6 +974,22 @@ impl AppService {
.await
}
/// Bypass-permission variant to merge into a job's result field.
/// Intended for internal router/scheduler use where no actor identity is present.
pub async fn update_job_result_merge_unchecked(
&self,
context_id: u32,
caller_id: u32,
job_id: u32,
patch: HashMap<String, String>,
) -> Result<(), BoxError> {
// Ensure job exists, then write directly
let _ = self.redis.load_job(context_id, caller_id, job_id).await?;
self.redis
.update_job_result_merge(context_id, caller_id, job_id, patch)
.await
}
pub async fn append_message_logs(
&self,
context_id: u32,