Compare commits

...

5 Commits

Author SHA1 Message Date
Lee Smet
688c42493e Add info on how to run demo script
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-09-09 11:10:46 +02:00
Lee Smet
a75fb9c55d Format code
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-09-08 14:25:35 +02:00
Lee Smet
5ed9739d68 Properly update DAG view with started/finished jobs
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-09-08 14:12:40 +02:00
Lee Smet
3cd1a55768 Fix job status transitions
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-09-08 13:37:42 +02:00
Lee Smet
c860553acd Stop polling when a job reached terminal status
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
2025-09-08 12:07:26 +02:00
4 changed files with 197 additions and 8 deletions

View File

@@ -1,2 +1,28 @@
# herocoordinator
## Demo setup
A python script is provided in the [scripts directory](./scripts/supervisor_flow_demo.py). This script
generates some demo jobs to be run by [a supervisor](https://git.ourworld.tf/herocode/supervisor).
Communication happens over [mycelium](https://github.com/threefoldtech/mycelium). To run the demo a
supervisor must be running, which uses a mycelium instance to read and write messages. A __different__
mycelium instance needs to run for the coordinator (the supervisor can run on a different host than
the coordinator, so long as the 2 mycelium instances used can reach eachother).
An example of a local setup:
```bash
# Run a redis docker
docker run -it -d -p 6379:6379 --name redis redis
# Spawn mycelium node 1 with default settings. This also creates a TUN interface though that is not
# necessary for the messages
mycelium
# Spawn mycelium node 2, connect to the first node
mycelium --key-file key.bin --peers tcp://127.0.0.1:9651 --disable-quic --disable-peer-discovery --api-addr 127.0.0.1:9989 --jsonrpc-addr 127.0.0.1:9990 --no-tun -t 8651
# Start the supervisor
supervisor --admin-secret admin123 --user-secret user123 --register-secret register123 --mycelium-url http://127.0.0.1:9990 --topic supervisor.rpc
# Start the coordinator
cargo run # (alternatively if a compiled binary is present that can be run)
# Finally, invoke the demo script
python3 scripts/supervisor_flow_demo.py
```

View File

@@ -3,7 +3,7 @@ use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt;
use crate::{
models::{Flow, Job, ScriptType},
models::{Flow, Job, JobStatus, ScriptType},
storage::RedisDriver,
};
@@ -212,6 +212,41 @@ pub async fn build_flow_dag(
edges.sort_unstable();
reverse_edges.sort_unstable();
// Populate runtime execution state from persisted Job.status()
let mut started_set: HashSet<u32> = HashSet::new();
let mut completed_set: HashSet<u32> = HashSet::new();
let mut error_ids: Vec<u32> = Vec::new();
for (&jid, job) in &jobs {
match job.status() {
JobStatus::Finished => {
completed_set.insert(jid);
}
JobStatus::Started => {
started_set.insert(jid);
}
JobStatus::Dispatched => {
// Consider Dispatched as "in-flight" for DAG runtime started set,
// so queued/running work is visible in periodic snapshots.
started_set.insert(jid);
}
JobStatus::Error => {
error_ids.push(jid);
}
JobStatus::WaitingForPrerequisites => {
// Neither started nor completed
}
}
}
// Choose a deterministic failed job if any errors exist (smallest job id)
let failed_job = if error_ids.is_empty() {
None
} else {
error_ids.sort_unstable();
Some(error_ids[0])
};
let dag = FlowDag {
flow_id,
caller_id,
@@ -222,9 +257,9 @@ pub async fn build_flow_dag(
roots,
leaves,
levels,
started: HashSet::new(),
completed: HashSet::new(),
failed_job: None,
started: started_set,
completed: completed_set,
failed_job,
};
Ok(dag)

View File

@@ -390,6 +390,24 @@ async fn deliver_one(
// First consult Redis for the latest job state in case we already have a terminal update
match service_poll.load_job(context_id, caller_id, job_id).await {
Ok(job) => {
// Promote to Started as soon as transport is delivered/read,
// if currently Dispatched or WaitingForPrerequisites.
// This makes DAG.started reflect "in-flight" work even when jobs
// complete too quickly to observe an intermediate supervisor "running" status.
if matches!(
job.status(),
JobStatus::Dispatched
| JobStatus::WaitingForPrerequisites
) {
let _ = service_poll
.update_job_status_unchecked(
context_id,
caller_id,
job_id,
JobStatus::Started,
)
.await;
}
match job.status() {
JobStatus::Finished | JobStatus::Error => {
// Local job is already terminal; skip supervisor job.status
@@ -448,6 +466,23 @@ async fn deliver_one(
MessageStatus::Processed,
)
.await;
// Also mark job as Finished so the flow can progress (ignore invalid transitions)
let _ = service_poll
.update_job_status_unchecked(
context_id, caller_id, job_id, JobStatus::Finished,
)
.await;
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Updated job {} status to Finished (sync)", job_id
)],
)
.await;
// Existing log about storing result
let _ = service_poll
.append_message_logs(
context_id,
@@ -481,6 +516,23 @@ async fn deliver_one(
MessageStatus::Processed,
)
.await;
// Also mark job as Error so the flow can handle failure (ignore invalid transitions)
let _ = service_poll
.update_job_status_unchecked(
context_id, caller_id, job_id, JobStatus::Error,
)
.await;
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Updated job {} status to Error (sync)", job_id
)],
)
.await;
// Existing log about storing result
let _ = service_poll
.append_message_logs(
context_id,
@@ -516,6 +568,26 @@ async fn deliver_one(
MessageStatus::Processed,
)
.await;
// Also mark job as Finished so the flow can progress (ignore invalid transitions)
let _ = service_poll
.update_job_status_unchecked(
context_id,
caller_id,
job_id,
JobStatus::Finished,
)
.await;
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Updated job {} status to Finished (sync)", job_id
)],
)
.await;
// Existing log about storing result
let _ = service_poll
.append_message_logs(
context_id,
@@ -566,6 +638,28 @@ async fn deliver_one(
)
.await;
}
// Mark processed and stop polling for this message
let _ = service_poll
.update_message_status(
context_id,
caller_id,
id,
MessageStatus::Processed,
)
.await;
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Terminal job {} detected; stopping transport polling",
job_id
)],
)
.await;
break;
}
// Not terminal yet -> request supervisor job.status as before
_ => {
@@ -673,6 +767,28 @@ async fn deliver_one(
});
}
}
// Mark processed and stop polling for this message
let _ = service_poll
.update_message_status(
context_id,
caller_id,
id,
MessageStatus::Processed,
)
.await;
let _ = service_poll
.append_message_logs(
context_id,
caller_id,
id,
vec![format!(
"Terminal job {} detected from supervisor status; stopping transport polling",
job_id
)],
)
.await;
break;
}
}
}

View File

@@ -672,10 +672,16 @@ impl AppService {
let allowed = match current {
JobStatus::Dispatched => matches!(
new_status,
JobStatus::WaitingForPrerequisites | JobStatus::Started | JobStatus::Error
JobStatus::WaitingForPrerequisites
| JobStatus::Started
| JobStatus::Finished
| JobStatus::Error
),
JobStatus::WaitingForPrerequisites => {
matches!(new_status, JobStatus::Started | JobStatus::Error)
matches!(
new_status,
JobStatus::Started | JobStatus::Finished | JobStatus::Error
)
}
JobStatus::Started => matches!(new_status, JobStatus::Finished | JobStatus::Error),
JobStatus::Finished | JobStatus::Error => false,
@@ -714,10 +720,16 @@ impl AppService {
let allowed = match current {
JobStatus::Dispatched => matches!(
new_status,
JobStatus::WaitingForPrerequisites | JobStatus::Started | JobStatus::Error
JobStatus::WaitingForPrerequisites
| JobStatus::Started
| JobStatus::Finished
| JobStatus::Error
),
JobStatus::WaitingForPrerequisites => {
matches!(new_status, JobStatus::Started | JobStatus::Error)
matches!(
new_status,
JobStatus::Started | JobStatus::Finished | JobStatus::Error
)
}
JobStatus::Started => matches!(new_status, JobStatus::Finished | JobStatus::Error),
JobStatus::Finished | JobStatus::Error => false,