Compare commits
7 Commits
66c89d2485
...
main
Author | SHA1 | Date | |
---|---|---|---|
|
688c42493e
|
||
|
a75fb9c55d
|
||
|
5ed9739d68
|
||
|
3cd1a55768
|
||
|
c860553acd
|
||
|
78a776877a
|
||
|
8cea17f4ec
|
26
README.md
26
README.md
@@ -1,2 +1,28 @@
|
||||
# herocoordinator
|
||||
|
||||
## Demo setup
|
||||
|
||||
A python script is provided in the [scripts directory](./scripts/supervisor_flow_demo.py). This script
|
||||
generates some demo jobs to be run by [a supervisor](https://git.ourworld.tf/herocode/supervisor).
|
||||
Communication happens over [mycelium](https://github.com/threefoldtech/mycelium). To run the demo a
|
||||
supervisor must be running, which uses a mycelium instance to read and write messages. A __different__
|
||||
mycelium instance needs to run for the coordinator (the supervisor can run on a different host than
|
||||
the coordinator, so long as the 2 mycelium instances used can reach eachother).
|
||||
|
||||
An example of a local setup:
|
||||
|
||||
```bash
|
||||
# Run a redis docker
|
||||
docker run -it -d -p 6379:6379 --name redis redis
|
||||
# Spawn mycelium node 1 with default settings. This also creates a TUN interface though that is not
|
||||
# necessary for the messages
|
||||
mycelium
|
||||
# Spawn mycelium node 2, connect to the first node
|
||||
mycelium --key-file key.bin --peers tcp://127.0.0.1:9651 --disable-quic --disable-peer-discovery --api-addr 127.0.0.1:9989 --jsonrpc-addr 127.0.0.1:9990 --no-tun -t 8651
|
||||
# Start the supervisor
|
||||
supervisor --admin-secret admin123 --user-secret user123 --register-secret register123 --mycelium-url http://127.0.0.1:9990 --topic supervisor.rpc
|
||||
# Start the coordinator
|
||||
cargo run # (alternatively if a compiled binary is present that can be run)
|
||||
# Finally, invoke the demo script
|
||||
python3 scripts/supervisor_flow_demo.py
|
||||
```
|
||||
|
@@ -48,7 +48,7 @@ impl SupervisorHub {
|
||||
fn spawn_pop_loop(hub: Arc<Self>) {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match hub.mycelium.pop_message(Some(false), Some(2), None).await {
|
||||
match hub.mycelium.pop_message(Some(false), Some(20), None).await {
|
||||
Ok(Some(inb)) => {
|
||||
// Extract and decode payload
|
||||
let Some(payload_b64) = inb.get("payload").and_then(|v| v.as_str()) else {
|
||||
|
43
src/dag.rs
43
src/dag.rs
@@ -3,7 +3,7 @@ use std::collections::{HashMap, HashSet, VecDeque};
|
||||
use std::fmt;
|
||||
|
||||
use crate::{
|
||||
models::{Flow, Job, ScriptType},
|
||||
models::{Flow, Job, JobStatus, ScriptType},
|
||||
storage::RedisDriver,
|
||||
};
|
||||
|
||||
@@ -212,6 +212,41 @@ pub async fn build_flow_dag(
|
||||
edges.sort_unstable();
|
||||
reverse_edges.sort_unstable();
|
||||
|
||||
// Populate runtime execution state from persisted Job.status()
|
||||
let mut started_set: HashSet<u32> = HashSet::new();
|
||||
let mut completed_set: HashSet<u32> = HashSet::new();
|
||||
let mut error_ids: Vec<u32> = Vec::new();
|
||||
|
||||
for (&jid, job) in &jobs {
|
||||
match job.status() {
|
||||
JobStatus::Finished => {
|
||||
completed_set.insert(jid);
|
||||
}
|
||||
JobStatus::Started => {
|
||||
started_set.insert(jid);
|
||||
}
|
||||
JobStatus::Dispatched => {
|
||||
// Consider Dispatched as "in-flight" for DAG runtime started set,
|
||||
// so queued/running work is visible in periodic snapshots.
|
||||
started_set.insert(jid);
|
||||
}
|
||||
JobStatus::Error => {
|
||||
error_ids.push(jid);
|
||||
}
|
||||
JobStatus::WaitingForPrerequisites => {
|
||||
// Neither started nor completed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Choose a deterministic failed job if any errors exist (smallest job id)
|
||||
let failed_job = if error_ids.is_empty() {
|
||||
None
|
||||
} else {
|
||||
error_ids.sort_unstable();
|
||||
Some(error_ids[0])
|
||||
};
|
||||
|
||||
let dag = FlowDag {
|
||||
flow_id,
|
||||
caller_id,
|
||||
@@ -222,9 +257,9 @@ pub async fn build_flow_dag(
|
||||
roots,
|
||||
leaves,
|
||||
levels,
|
||||
started: HashSet::new(),
|
||||
completed: HashSet::new(),
|
||||
failed_job: None,
|
||||
started: started_set,
|
||||
completed: completed_set,
|
||||
failed_job,
|
||||
};
|
||||
|
||||
Ok(dag)
|
||||
|
468
src/router.rs
468
src/router.rs
@@ -386,17 +386,32 @@ async fn deliver_one(
|
||||
|
||||
// Stop on terminal states
|
||||
if matches!(s, TransportStatus::Delivered | TransportStatus::Read) {
|
||||
// Only request a single job status/result per message
|
||||
if !requested_job_check {
|
||||
if let Some(job_id) = job_id_opt {
|
||||
// First consult Redis for the latest job state in case we already have a terminal update
|
||||
match service_poll.load_job(context_id, caller_id, job_id).await
|
||||
{
|
||||
Ok(job) => {
|
||||
match job.status() {
|
||||
JobStatus::Finished | JobStatus::Error => {
|
||||
// Local job is already terminal; skip supervisor job.status
|
||||
let _ = service_poll
|
||||
if let Some(job_id) = job_id_opt {
|
||||
// First consult Redis for the latest job state in case we already have a terminal update
|
||||
match service_poll.load_job(context_id, caller_id, job_id).await {
|
||||
Ok(job) => {
|
||||
// Promote to Started as soon as transport is delivered/read,
|
||||
// if currently Dispatched or WaitingForPrerequisites.
|
||||
// This makes DAG.started reflect "in-flight" work even when jobs
|
||||
// complete too quickly to observe an intermediate supervisor "running" status.
|
||||
if matches!(
|
||||
job.status(),
|
||||
JobStatus::Dispatched
|
||||
| JobStatus::WaitingForPrerequisites
|
||||
) {
|
||||
let _ = service_poll
|
||||
.update_job_status_unchecked(
|
||||
context_id,
|
||||
caller_id,
|
||||
job_id,
|
||||
JobStatus::Started,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
match job.status() {
|
||||
JobStatus::Finished | JobStatus::Error => {
|
||||
// Local job is already terminal; skip supervisor job.status
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
@@ -409,41 +424,41 @@ async fn deliver_one(
|
||||
)
|
||||
.await;
|
||||
|
||||
// If result is still empty, immediately request supervisor job.result
|
||||
if job.result.is_empty() {
|
||||
let sup = cache
|
||||
.get_or_create(
|
||||
sup_hub.clone(),
|
||||
sup_dest.clone(),
|
||||
sup_topic.clone(),
|
||||
secret_for_poller.clone(),
|
||||
)
|
||||
.await;
|
||||
match sup
|
||||
.job_result_wait(job_id.to_string())
|
||||
.await
|
||||
{
|
||||
Ok((_out2, reply2)) => {
|
||||
// Interpret reply synchronously: success/error/bare string
|
||||
let res = reply2.get("result");
|
||||
if let Some(obj) =
|
||||
res.and_then(|v| v.as_object())
|
||||
// If result is still empty, immediately request supervisor job.result
|
||||
if job.result.is_empty() {
|
||||
let sup = cache
|
||||
.get_or_create(
|
||||
sup_hub.clone(),
|
||||
sup_dest.clone(),
|
||||
sup_topic.clone(),
|
||||
secret_for_poller.clone(),
|
||||
)
|
||||
.await;
|
||||
match sup
|
||||
.job_result_wait(job_id.to_string())
|
||||
.await
|
||||
{
|
||||
Ok((_out2, reply2)) => {
|
||||
// Interpret reply synchronously: success/error/bare string
|
||||
let res = reply2.get("result");
|
||||
if let Some(obj) =
|
||||
res.and_then(|v| v.as_object())
|
||||
{
|
||||
if let Some(s) = obj
|
||||
.get("success")
|
||||
.and_then(|v| v.as_str())
|
||||
{
|
||||
if let Some(s) = obj
|
||||
.get("success")
|
||||
.and_then(|v| v.as_str())
|
||||
{
|
||||
let mut patch = std::collections::HashMap::new();
|
||||
patch.insert(
|
||||
"success".to_string(),
|
||||
s.to_string(),
|
||||
);
|
||||
let _ = service_poll
|
||||
let mut patch = std::collections::HashMap::new();
|
||||
patch.insert(
|
||||
"success".to_string(),
|
||||
s.to_string(),
|
||||
);
|
||||
let _ = service_poll
|
||||
.update_job_result_merge_unchecked(
|
||||
context_id, caller_id, job_id, patch,
|
||||
)
|
||||
.await;
|
||||
let _ = service_poll
|
||||
let _ = service_poll
|
||||
.update_message_status(
|
||||
context_id,
|
||||
caller_id,
|
||||
@@ -451,7 +466,24 @@ async fn deliver_one(
|
||||
MessageStatus::Processed,
|
||||
)
|
||||
.await;
|
||||
let _ = service_poll
|
||||
// Also mark job as Finished so the flow can progress (ignore invalid transitions)
|
||||
let _ = service_poll
|
||||
.update_job_status_unchecked(
|
||||
context_id, caller_id, job_id, JobStatus::Finished,
|
||||
)
|
||||
.await;
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
id,
|
||||
vec![format!(
|
||||
"Updated job {} status to Finished (sync)", job_id
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
// Existing log about storing result
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
@@ -462,21 +494,21 @@ async fn deliver_one(
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
} else if let Some(s) = obj
|
||||
.get("error")
|
||||
.and_then(|v| v.as_str())
|
||||
{
|
||||
let mut patch = std::collections::HashMap::new();
|
||||
patch.insert(
|
||||
"error".to_string(),
|
||||
s.to_string(),
|
||||
);
|
||||
let _ = service_poll
|
||||
} else if let Some(s) = obj
|
||||
.get("error")
|
||||
.and_then(|v| v.as_str())
|
||||
{
|
||||
let mut patch = std::collections::HashMap::new();
|
||||
patch.insert(
|
||||
"error".to_string(),
|
||||
s.to_string(),
|
||||
);
|
||||
let _ = service_poll
|
||||
.update_job_result_merge_unchecked(
|
||||
context_id, caller_id, job_id, patch,
|
||||
)
|
||||
.await;
|
||||
let _ = service_poll
|
||||
let _ = service_poll
|
||||
.update_message_status(
|
||||
context_id,
|
||||
caller_id,
|
||||
@@ -484,7 +516,24 @@ async fn deliver_one(
|
||||
MessageStatus::Processed,
|
||||
)
|
||||
.await;
|
||||
let _ = service_poll
|
||||
// Also mark job as Error so the flow can handle failure (ignore invalid transitions)
|
||||
let _ = service_poll
|
||||
.update_job_status_unchecked(
|
||||
context_id, caller_id, job_id, JobStatus::Error,
|
||||
)
|
||||
.await;
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
id,
|
||||
vec![format!(
|
||||
"Updated job {} status to Error (sync)", job_id
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
// Existing log about storing result
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
@@ -495,29 +544,51 @@ async fn deliver_one(
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
} else if let Some(s) =
|
||||
res.and_then(|v| v.as_str())
|
||||
{
|
||||
let mut patch = std::collections::HashMap::new();
|
||||
patch.insert(
|
||||
"success".to_string(),
|
||||
s.to_string(),
|
||||
}
|
||||
} else if let Some(s) =
|
||||
res.and_then(|v| v.as_str())
|
||||
{
|
||||
let mut patch =
|
||||
std::collections::HashMap::new(
|
||||
);
|
||||
let _ = service_poll
|
||||
patch.insert(
|
||||
"success".to_string(),
|
||||
s.to_string(),
|
||||
);
|
||||
let _ = service_poll
|
||||
.update_job_result_merge_unchecked(
|
||||
context_id, caller_id, job_id, patch,
|
||||
)
|
||||
.await;
|
||||
let _ = service_poll
|
||||
.update_message_status(
|
||||
let _ = service_poll
|
||||
.update_message_status(
|
||||
context_id,
|
||||
caller_id,
|
||||
id,
|
||||
MessageStatus::Processed,
|
||||
)
|
||||
.await;
|
||||
// Also mark job as Finished so the flow can progress (ignore invalid transitions)
|
||||
let _ = service_poll
|
||||
.update_job_status_unchecked(
|
||||
context_id,
|
||||
caller_id,
|
||||
job_id,
|
||||
JobStatus::Finished,
|
||||
)
|
||||
.await;
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
id,
|
||||
MessageStatus::Processed,
|
||||
vec![format!(
|
||||
"Updated job {} status to Finished (sync)", job_id
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
let _ = service_poll
|
||||
// Existing log about storing result
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
@@ -528,8 +599,8 @@ async fn deliver_one(
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
} else {
|
||||
let _ = service_poll
|
||||
} else {
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
@@ -537,10 +608,10 @@ async fn deliver_one(
|
||||
vec!["Supervisor job.result reply missing recognizable fields".to_string()],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = service_poll
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
@@ -551,11 +622,11 @@ async fn deliver_one(
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Result already present; nothing to fetch
|
||||
let _ = service_poll
|
||||
}
|
||||
} else {
|
||||
// Result already present; nothing to fetch
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
@@ -566,38 +637,56 @@ async fn deliver_one(
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
// Not terminal yet -> request supervisor job.status as before
|
||||
_ => {
|
||||
let sup = cache
|
||||
.get_or_create(
|
||||
sup_hub.clone(),
|
||||
sup_dest.clone(),
|
||||
sup_topic.clone(),
|
||||
secret_for_poller.clone(),
|
||||
|
||||
// Mark processed and stop polling for this message
|
||||
let _ = service_poll
|
||||
.update_message_status(
|
||||
context_id,
|
||||
caller_id,
|
||||
id,
|
||||
MessageStatus::Processed,
|
||||
)
|
||||
.await;
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
id,
|
||||
vec![format!(
|
||||
"Terminal job {} detected; stopping transport polling",
|
||||
job_id
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
match sup
|
||||
.job_status_wait(job_id.to_string())
|
||||
.await
|
||||
{
|
||||
Ok((_out_id, reply_status)) => {
|
||||
// Interpret status reply synchronously
|
||||
let result_opt =
|
||||
reply_status.get("result");
|
||||
let error_opt =
|
||||
reply_status.get("error");
|
||||
if let Some(err_obj) = error_opt {
|
||||
let _ = service_poll
|
||||
.update_job_status_unchecked(
|
||||
context_id,
|
||||
caller_id,
|
||||
job_id,
|
||||
JobStatus::Error,
|
||||
)
|
||||
.await;
|
||||
let _ = service_poll
|
||||
break;
|
||||
}
|
||||
// Not terminal yet -> request supervisor job.status as before
|
||||
_ => {
|
||||
let sup = cache
|
||||
.get_or_create(
|
||||
sup_hub.clone(),
|
||||
sup_dest.clone(),
|
||||
sup_topic.clone(),
|
||||
secret_for_poller.clone(),
|
||||
)
|
||||
.await;
|
||||
match sup.job_status_wait(job_id.to_string()).await
|
||||
{
|
||||
Ok((_out_id, reply_status)) => {
|
||||
// Interpret status reply synchronously
|
||||
let result_opt = reply_status.get("result");
|
||||
let error_opt = reply_status.get("error");
|
||||
if let Some(err_obj) = error_opt {
|
||||
let _ = service_poll
|
||||
.update_job_status_unchecked(
|
||||
context_id,
|
||||
caller_id,
|
||||
job_id,
|
||||
JobStatus::Error,
|
||||
)
|
||||
.await;
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id, caller_id, id,
|
||||
vec![format!(
|
||||
@@ -606,28 +695,25 @@ async fn deliver_one(
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
} else if let Some(res) = result_opt {
|
||||
let status_candidate = res
|
||||
.get("status")
|
||||
.and_then(|v| v.as_str())
|
||||
.or_else(|| res.as_str());
|
||||
if let Some(remote_status) =
|
||||
status_candidate
|
||||
} else if let Some(res) = result_opt {
|
||||
let status_candidate = res
|
||||
.get("status")
|
||||
.and_then(|v| v.as_str())
|
||||
.or_else(|| res.as_str());
|
||||
if let Some(remote_status) =
|
||||
status_candidate
|
||||
{
|
||||
if let Some((mapped, terminal)) =
|
||||
map_supervisor_job_status(
|
||||
remote_status,
|
||||
)
|
||||
{
|
||||
if let Some((
|
||||
mapped,
|
||||
terminal,
|
||||
)) =
|
||||
map_supervisor_job_status(
|
||||
remote_status,
|
||||
)
|
||||
{
|
||||
let _ = service_poll
|
||||
let _ = service_poll
|
||||
.update_job_status_unchecked(
|
||||
context_id, caller_id, job_id, mapped.clone(),
|
||||
)
|
||||
.await;
|
||||
let _ = service_poll
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id, caller_id, id,
|
||||
vec![format!(
|
||||
@@ -637,23 +723,23 @@ async fn deliver_one(
|
||||
)
|
||||
.await;
|
||||
|
||||
// If terminal, request job.result now (handled above for local terminal case)
|
||||
if terminal {
|
||||
// trigger job.result only if result empty to avoid spam
|
||||
if let Ok(j_after) =
|
||||
service_poll
|
||||
.load_job(
|
||||
context_id,
|
||||
caller_id,
|
||||
job_id,
|
||||
)
|
||||
.await
|
||||
// If terminal, request job.result now (handled above for local terminal case)
|
||||
if terminal {
|
||||
// trigger job.result only if result empty to avoid spam
|
||||
if let Ok(j_after) =
|
||||
service_poll
|
||||
.load_job(
|
||||
context_id,
|
||||
caller_id,
|
||||
job_id,
|
||||
)
|
||||
.await
|
||||
{
|
||||
if j_after
|
||||
.result
|
||||
.is_empty()
|
||||
{
|
||||
if j_after
|
||||
.result
|
||||
.is_empty()
|
||||
{
|
||||
let sup2 = cache
|
||||
let sup2 = cache
|
||||
.get_or_create(
|
||||
sup_hub.clone(),
|
||||
sup_dest.clone(),
|
||||
@@ -661,7 +747,7 @@ async fn deliver_one(
|
||||
secret_for_poller.clone(),
|
||||
)
|
||||
.await;
|
||||
let _ = sup2.job_result_wait(job_id.to_string()).await
|
||||
let _ = sup2.job_result_wait(job_id.to_string()).await
|
||||
.and_then(|(_oid, reply2)| {
|
||||
// Minimal parse and store
|
||||
let res2 = reply2.get("result");
|
||||
@@ -679,40 +765,65 @@ async fn deliver_one(
|
||||
}
|
||||
Ok((String::new(), Value::Null))
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Mark processed and stop polling for this message
|
||||
let _ = service_poll
|
||||
.update_message_status(
|
||||
context_id,
|
||||
caller_id,
|
||||
id,
|
||||
MessageStatus::Processed,
|
||||
)
|
||||
.await;
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
id,
|
||||
vec![format!(
|
||||
"Terminal job {} detected from supervisor status; stopping transport polling",
|
||||
job_id
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
id,
|
||||
vec![format!("job.status request error: {}", e)],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
id,
|
||||
vec![format!(
|
||||
"job.status request error: {}",
|
||||
e
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// If we cannot load the job, fall back to requesting job.status
|
||||
Err(_) => {
|
||||
let sup = cache
|
||||
.get_or_create(
|
||||
sup_hub.clone(),
|
||||
sup_dest.clone(),
|
||||
sup_topic.clone(),
|
||||
secret_for_poller.clone(),
|
||||
)
|
||||
.await;
|
||||
match sup.job_status_wait(job_id.to_string()).await {
|
||||
Ok((_out_id, _reply_status)) => {
|
||||
let _ = service_poll
|
||||
}
|
||||
// If we cannot load the job, fall back to requesting job.status
|
||||
Err(_) => {
|
||||
let sup = cache
|
||||
.get_or_create(
|
||||
sup_hub.clone(),
|
||||
sup_dest.clone(),
|
||||
sup_topic.clone(),
|
||||
secret_for_poller.clone(),
|
||||
)
|
||||
.await;
|
||||
match sup.job_status_wait(job_id.to_string()).await {
|
||||
Ok((_out_id, _reply_status)) => {
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
@@ -723,26 +834,25 @@ async fn deliver_one(
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
id,
|
||||
vec![format!(
|
||||
"job.status request error: {}",
|
||||
e
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
id,
|
||||
vec![format!(
|
||||
"job.status request error: {}",
|
||||
e
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Ensure we only do this once
|
||||
requested_job_check = true;
|
||||
}
|
||||
// Ensure we only do this once
|
||||
requested_job_check = true;
|
||||
}
|
||||
// break;
|
||||
}
|
||||
|
@@ -672,10 +672,16 @@ impl AppService {
|
||||
let allowed = match current {
|
||||
JobStatus::Dispatched => matches!(
|
||||
new_status,
|
||||
JobStatus::WaitingForPrerequisites | JobStatus::Started | JobStatus::Error
|
||||
JobStatus::WaitingForPrerequisites
|
||||
| JobStatus::Started
|
||||
| JobStatus::Finished
|
||||
| JobStatus::Error
|
||||
),
|
||||
JobStatus::WaitingForPrerequisites => {
|
||||
matches!(new_status, JobStatus::Started | JobStatus::Error)
|
||||
matches!(
|
||||
new_status,
|
||||
JobStatus::Started | JobStatus::Finished | JobStatus::Error
|
||||
)
|
||||
}
|
||||
JobStatus::Started => matches!(new_status, JobStatus::Finished | JobStatus::Error),
|
||||
JobStatus::Finished | JobStatus::Error => false,
|
||||
@@ -714,10 +720,16 @@ impl AppService {
|
||||
let allowed = match current {
|
||||
JobStatus::Dispatched => matches!(
|
||||
new_status,
|
||||
JobStatus::WaitingForPrerequisites | JobStatus::Started | JobStatus::Error
|
||||
JobStatus::WaitingForPrerequisites
|
||||
| JobStatus::Started
|
||||
| JobStatus::Finished
|
||||
| JobStatus::Error
|
||||
),
|
||||
JobStatus::WaitingForPrerequisites => {
|
||||
matches!(new_status, JobStatus::Started | JobStatus::Error)
|
||||
matches!(
|
||||
new_status,
|
||||
JobStatus::Started | JobStatus::Finished | JobStatus::Error
|
||||
)
|
||||
}
|
||||
JobStatus::Started => matches!(new_status, JobStatus::Finished | JobStatus::Error),
|
||||
JobStatus::Finished | JobStatus::Error => false,
|
||||
|
Reference in New Issue
Block a user