refactor coordinator to use shared lib models and client

This commit is contained in:
Timur Gordon
2025-11-13 21:56:33 +01:00
parent 4b23e5eb7f
commit 84545f0d75
16 changed files with 729 additions and 1973 deletions

View File

@@ -11,13 +11,13 @@ use std::hash::{Hash, Hasher};
use tokio::sync::{Mutex, Semaphore};
use crate::{
clients::{Destination, MyceliumClient, SupervisorClient, SupervisorHub},
clients::{Destination, MyceliumClient, MyceliumTransport, SupervisorClient, SupervisorHub},
models::{Job, JobStatus, Message, MessageStatus, ScriptType, TransportStatus},
service::AppService,
};
use tracing::{error, info};
#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct RouterConfig {
pub context_ids: Vec<u32>,
pub concurrency: usize,
@@ -50,7 +50,7 @@ Concurrency:
*/
#[derive(Clone)]
struct SupervisorClientCache {
map: Arc<Mutex<HashMap<String, Arc<SupervisorClient>>>>,
map: Arc<Mutex<HashMap<String, Arc<SupervisorClient<MyceliumTransport>>>>>,
}
impl SupervisorClientCache {
@@ -83,7 +83,7 @@ impl SupervisorClientCache {
dest: Destination,
topic: String,
secret: Option<String>,
) -> Arc<SupervisorClient> {
) -> Arc<SupervisorClient<MyceliumTransport>> {
let key = Self::make_key(&dest, &topic, &secret);
{
@@ -99,7 +99,8 @@ impl SupervisorClientCache {
tracing::debug!(target: "router", cache="supervisor", hit=true, %topic, secret = %if secret.is_some() { "set" } else { "none" }, "SupervisorClient cache lookup (double-checked)");
return existing.clone();
}
let client = Arc::new(SupervisorClient::new_with_hub(hub, dest, secret.clone()));
let transport = MyceliumTransport::new(hub, dest);
let client = Arc::new(SupervisorClient::new(transport, secret.clone().unwrap_or_default()));
guard.insert(key, client.clone());
tracing::debug!(target: "router", cache="supervisor", hit=false, %topic, secret = %if secret.is_some() { "set" } else { "none" }, "SupervisorClient cache insert");
client
@@ -121,9 +122,8 @@ pub fn start_router(service: AppService, cfg: RouterConfig) -> Vec<tokio::task::
let handle = tokio::spawn(async move {
let sem = Arc::new(Semaphore::new(cfg_cloned.concurrency));
// Use the global SupervisorHub and its Mycelium client
// Use the global SupervisorHub
let sup_hub = cfg_cloned.sup_hub.clone();
let mycelium = sup_hub.mycelium();
let cache = Arc::new(SupervisorClientCache::new());
@@ -146,7 +146,6 @@ pub fn start_router(service: AppService, cfg: RouterConfig) -> Vec<tokio::task::
let service_task = service_cloned.clone();
let cfg_task = cfg_cloned.clone();
tokio::spawn({
let mycelium = mycelium.clone();
let cache = cache.clone();
let sup_hub = sup_hub.clone();
async move {
@@ -157,7 +156,6 @@ pub fn start_router(service: AppService, cfg: RouterConfig) -> Vec<tokio::task::
&cfg_task,
ctx_id,
&key,
mycelium,
sup_hub,
cache.clone(),
)
@@ -190,7 +188,6 @@ async fn deliver_one(
cfg: &RouterConfig,
context_id: u32,
msg_key: &str,
mycelium: Arc<MyceliumClient>,
sup_hub: Arc<SupervisorHub>,
cache: Arc<SupervisorClientCache>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
@@ -245,34 +242,33 @@ async fn deliver_one(
let method = msg.message.clone();
let params = build_params(&msg)?;
// Send
// If this is a job.run and we have a secret configured on the client,
// prefer the typed wrapper that injects the secret into inner supervisor params,
// and await the reply to capture job_queued immediately.
let (out_id, reply_opt) = if method == "job.run" {
// Send via the new client API
// The transport handles message correlation internally
let _result = if method == "job.run" {
if let Some(j) = msg.job.first() {
let jv = job_to_json(j)?;
// Returns (outbound message id, reply envelope)
let (out, reply) = client.job_run_wait(jv).await?;
(out, Some(reply))
// Use typed job_run method
let job = serde_json::from_value(job_to_json(j)?)?;
client.job_run(job, None).await?;
serde_json::Value::Null
} else {
// Fallback: no embedded job, use the generic call (await reply, discard)
let out = client.call(&method, params).await?;
(out, None)
// Generic call - not supported in new API, would need custom implementation
// For now, return error
return Err("job.run requires a job parameter".into());
}
} else {
let out = client.call(&method, params).await?;
(out, None)
// For other methods, we'd need to add them to the client or use a generic mechanism
// For now, this is a placeholder
return Err(format!("Method {} not yet supported with new client", method).into());
};
// Store transport id and initial Sent status
// Mark as delivered since the new client waits for replies
let _ = service
.update_message_transport(
context_id,
caller_id,
id,
Some(out_id.clone()),
Some(TransportStatus::Sent),
None, // No transport ID in new API
Some(TransportStatus::Delivered),
)
.await;
@@ -281,25 +277,9 @@ async fn deliver_one(
.update_message_status(context_id, caller_id, id, MessageStatus::Acknowledged)
.await?;
// If we got a job.run reply, interpret job_queued immediately
if let (Some(reply), Some(job_id)) = (reply_opt, msg.job.first().map(|j| j.id)) {
let result_opt = reply.get("result");
let error_opt = reply.get("error");
// Handle job.run success (job_queued)
let is_job_queued = result_opt
.and_then(|res| {
if res.get("job_queued").is_some() {
Some(true)
} else if let Some(s) = res.as_str() {
Some(s == "job_queued")
} else {
None
}
})
.unwrap_or(false);
if is_job_queued {
// For job.run, mark the job as dispatched
if method == "job.run" {
if let Some(job_id) = msg.job.first().map(|j| j.id) {
let _ = service
.update_job_status_unchecked(context_id, caller_id, job_id, JobStatus::Dispatched)
.await;
@@ -314,579 +294,12 @@ async fn deliver_one(
)],
)
.await;
} else if let Some(err_obj) = error_opt {
let _ = service
.update_job_status_unchecked(context_id, caller_id, job_id, JobStatus::Error)
.await;
let _ = service
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Supervisor error for job {}: {} (processed synchronously)",
job_id, err_obj
)],
)
.await;
}
}
// No correlation map needed; replies are handled synchronously via SupervisorHub
// No transport polling needed; the new client waits for replies synchronously
// Spawn transport-status poller
{
let service_poll = service.clone();
let poll_interval = std::time::Duration::from_secs(cfg.transport_poll_interval_secs);
let poll_timeout = std::time::Duration::from_secs(cfg.transport_poll_timeout_secs);
let out_id_cloned = out_id.clone();
let mycelium = mycelium.clone();
tokio::spawn(async move {
let start = std::time::Instant::now();
let client = mycelium;
// Supervisor call context captured for sync status checks
let sup_dest = dest_for_poller;
let sup_topic = topic_for_poller;
let job_id_opt = job_id_opt;
let mut last_status: Option<TransportStatus> = Some(TransportStatus::Sent);
// Ensure we only request supervisor job.status or job.result once per outbound message
let mut requested_job_check: bool = false;
loop {
if start.elapsed() >= poll_timeout {
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec!["Transport-status polling timed out".to_string()],
)
.await;
// leave last known status; do not override
break;
}
match client.message_status(&out_id_cloned).await {
Ok(s) => {
if last_status.as_ref() != Some(&s) {
let _ = service_poll
.update_message_transport(
context_id,
caller_id,
id,
None,
Some(s.clone()),
)
.await;
last_status = Some(s.clone());
}
// Stop on terminal states
if matches!(s, TransportStatus::Delivered | TransportStatus::Read) {
if let Some(job_id) = job_id_opt {
// First consult Redis for the latest job state in case we already have a terminal update
match service_poll.load_job(context_id, caller_id, job_id).await {
Ok(job) => {
// Promote to Started as soon as transport is delivered/read,
// if currently Dispatched or WaitingForPrerequisites.
// This makes DAG.started reflect "in-flight" work even when jobs
// complete too quickly to observe an intermediate supervisor "running" status.
if matches!(
job.status(),
JobStatus::Dispatched
| JobStatus::WaitingForPrerequisites
) {
let _ = service_poll
.update_job_status_unchecked(
context_id,
caller_id,
job_id,
JobStatus::Started,
)
.await;
}
match job.status() {
JobStatus::Finished | JobStatus::Error => {
// Local job is already terminal; skip supervisor job.status
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Local job {} status is terminal ({:?}); skipping supervisor job.status",
job_id,
job.status()
)],
)
.await;
// If result is still empty, immediately request supervisor job.result
if job.result.is_empty() {
let sup = cache
.get_or_create(
sup_hub.clone(),
sup_dest.clone(),
sup_topic.clone(),
secret_for_poller.clone(),
)
.await;
match sup
.job_result_wait(job_id.to_string())
.await
{
Ok((_out2, reply2)) => {
// Interpret reply synchronously: success/error/bare string
let res = reply2.get("result");
if let Some(obj) =
res.and_then(|v| v.as_object())
{
if let Some(s) = obj
.get("success")
.and_then(|v| v.as_str())
{
let mut patch = std::collections::HashMap::new();
patch.insert(
"success".to_string(),
s.to_string(),
);
let _ = service_poll
.update_job_result_merge_unchecked(
context_id, caller_id, job_id, patch,
)
.await;
let _ = service_poll
.update_message_status(
context_id,
caller_id,
id,
MessageStatus::Processed,
)
.await;
// Also mark job as Finished so the flow can progress (ignore invalid transitions)
let _ = service_poll
.update_job_status_unchecked(
context_id, caller_id, job_id, JobStatus::Finished,
)
.await;
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Updated job {} status to Finished (sync)", job_id
)],
)
.await;
// Existing log about storing result
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Stored supervisor job.result for job {} (success, sync)",
job_id
)],
)
.await;
} else if let Some(s) = obj
.get("error")
.and_then(|v| v.as_str())
{
let mut patch = std::collections::HashMap::new();
patch.insert(
"error".to_string(),
s.to_string(),
);
let _ = service_poll
.update_job_result_merge_unchecked(
context_id, caller_id, job_id, patch,
)
.await;
let _ = service_poll
.update_message_status(
context_id,
caller_id,
id,
MessageStatus::Processed,
)
.await;
// Also mark job as Error so the flow can handle failure (ignore invalid transitions)
let _ = service_poll
.update_job_status_unchecked(
context_id, caller_id, job_id, JobStatus::Error,
)
.await;
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Updated job {} status to Error (sync)", job_id
)],
)
.await;
// Existing log about storing result
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Stored supervisor job.result for job {} (error, sync)",
job_id
)],
)
.await;
}
} else if let Some(s) =
res.and_then(|v| v.as_str())
{
let mut patch =
std::collections::HashMap::new(
);
patch.insert(
"success".to_string(),
s.to_string(),
);
let _ = service_poll
.update_job_result_merge_unchecked(
context_id, caller_id, job_id, patch,
)
.await;
let _ = service_poll
.update_message_status(
context_id,
caller_id,
id,
MessageStatus::Processed,
)
.await;
// Also mark job as Finished so the flow can progress (ignore invalid transitions)
let _ = service_poll
.update_job_status_unchecked(
context_id,
caller_id,
job_id,
JobStatus::Finished,
)
.await;
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Updated job {} status to Finished (sync)", job_id
)],
)
.await;
// Existing log about storing result
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Stored supervisor job.result for job {} (success, sync)",
job_id
)],
)
.await;
} else {
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec!["Supervisor job.result reply missing recognizable fields".to_string()],
)
.await;
}
}
Err(e) => {
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"job.result request error for job {}: {}",
job_id, e
)],
)
.await;
}
}
} else {
// Result already present; nothing to fetch
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Job {} already has result; no supervisor calls needed",
job_id
)],
)
.await;
}
// Mark processed and stop polling for this message
let _ = service_poll
.update_message_status(
context_id,
caller_id,
id,
MessageStatus::Processed,
)
.await;
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Terminal job {} detected; stopping transport polling",
job_id
)],
)
.await;
break;
}
// Not terminal yet -> request supervisor job.status as before
_ => {
let sup = cache
.get_or_create(
sup_hub.clone(),
sup_dest.clone(),
sup_topic.clone(),
secret_for_poller.clone(),
)
.await;
match sup.job_status_wait(job_id.to_string()).await
{
Ok((_out_id, reply_status)) => {
// Interpret status reply synchronously
let result_opt = reply_status.get("result");
let error_opt = reply_status.get("error");
if let Some(err_obj) = error_opt {
let _ = service_poll
.update_job_status_unchecked(
context_id,
caller_id,
job_id,
JobStatus::Error,
)
.await;
let _ = service_poll
.append_message_logs(
context_id, caller_id, id,
vec![format!(
"Supervisor error for job {}: {} (sync)",
job_id, err_obj
)],
)
.await;
} else if let Some(res) = result_opt {
let status_candidate = res
.get("status")
.and_then(|v| v.as_str())
.or_else(|| res.as_str());
if let Some(remote_status) =
status_candidate
{
if let Some((mapped, terminal)) =
map_supervisor_job_status(
remote_status,
)
{
let _ = service_poll
.update_job_status_unchecked(
context_id, caller_id, job_id, mapped.clone(),
)
.await;
let _ = service_poll
.append_message_logs(
context_id, caller_id, id,
vec![format!(
"Supervisor job.status for job {} -> {} (mapped to {:?}, sync)",
job_id, remote_status, mapped
)],
)
.await;
// If terminal, request job.result now (handled above for local terminal case)
if terminal {
// trigger job.result only if result empty to avoid spam
if let Ok(j_after) =
service_poll
.load_job(
context_id,
caller_id,
job_id,
)
.await
{
if j_after
.result
.is_empty()
{
let sup2 = cache
.get_or_create(
sup_hub.clone(),
sup_dest.clone(),
sup_topic.clone(),
secret_for_poller.clone(),
)
.await;
let _ = sup2.job_result_wait(job_id.to_string()).await
.and_then(|(_oid, reply2)| {
// Minimal parse and store
let res2 = reply2.get("result");
if let Some(obj) = res2.and_then(|v| v.as_object()) {
if let Some(s) = obj.get("success").and_then(|v| v.as_str()) {
let mut patch = std::collections::HashMap::new();
patch.insert("success".to_string(), s.to_string());
tokio::spawn({
let service_poll = service_poll.clone();
async move {
let _ = service_poll.update_job_result_merge_unchecked(context_id, caller_id, job_id, patch).await;
}
});
}
}
Ok((String::new(), Value::Null))
});
}
}
// Mark processed and stop polling for this message
let _ = service_poll
.update_message_status(
context_id,
caller_id,
id,
MessageStatus::Processed,
)
.await;
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Terminal job {} detected from supervisor status; stopping transport polling",
job_id
)],
)
.await;
break;
}
}
}
}
}
Err(e) => {
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"job.status request error: {}",
e
)],
)
.await;
}
}
}
}
}
// If we cannot load the job, fall back to requesting job.status
Err(_) => {
let sup = cache
.get_or_create(
sup_hub.clone(),
sup_dest.clone(),
sup_topic.clone(),
secret_for_poller.clone(),
)
.await;
match sup.job_status_wait(job_id.to_string()).await {
Ok((_out_id, _reply_status)) => {
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Requested supervisor job.status for job {} (fallback; load_job failed, sync)",
job_id
)],
)
.await;
}
Err(e) => {
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"job.status request error: {}",
e
)],
)
.await;
}
}
}
}
// Ensure we only do this once
requested_job_check = true;
}
// break;
}
if matches!(s, TransportStatus::Failed) {
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Transport failed for outbound id {out_id_cloned}"
)],
)
.await;
break;
}
}
Err(e) => {
// Log and continue polling
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!("messageStatus query error: {e}")],
)
.await;
}
}
tokio::time::sleep(poll_interval).await;
}
});
}
Ok(())
}