Allow updating DAG

Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
Lee Smet
2025-08-22 10:59:46 +02:00
parent 1939a3d09d
commit e57c76fb36
2 changed files with 156 additions and 0 deletions

View File

@@ -14,6 +14,10 @@ pub enum DagError {
Storage(Box<dyn std::error::Error + Send + Sync>), Storage(Box<dyn std::error::Error + Send + Sync>),
MissingDependency { job: u32, depends_on: u32 }, MissingDependency { job: u32, depends_on: u32 },
CycleDetected { remaining: Vec<u32> }, CycleDetected { remaining: Vec<u32> },
UnknownJob { job: u32 },
DependenciesIncomplete { job: u32, missing: Vec<u32> },
FlowFailed { failed_job: u32 },
JobNotStarted { job: u32 },
} }
impl fmt::Display for DagError { impl fmt::Display for DagError {
@@ -28,6 +32,20 @@ impl fmt::Display for DagError {
DagError::CycleDetected { remaining } => { DagError::CycleDetected { remaining } => {
write!(f, "Cycle detected; unresolved nodes: {:?}", remaining) write!(f, "Cycle detected; unresolved nodes: {:?}", remaining)
} }
DagError::UnknownJob { job } => write!(f, "Unknown job id: {}", job),
DagError::DependenciesIncomplete { job, missing } => write!(
f,
"Job {} cannot start; missing completed deps: {:?}",
job, missing
),
DagError::FlowFailed { failed_job } => {
write!(f, "Flow failed due to job {}", failed_job)
}
DagError::JobNotStarted { job } => write!(
f,
"Job {} cannot be completed because it is not marked as started",
job
),
} }
} }
} }
@@ -59,6 +77,10 @@ pub struct FlowDag {
pub roots: Vec<u32>, // in_degree == 0 pub roots: Vec<u32>, // in_degree == 0
pub leaves: Vec<u32>, // out_degree == 0 pub leaves: Vec<u32>, // out_degree == 0
pub levels: Vec<Vec<u32>>, // topological layers for parallel execution pub levels: Vec<Vec<u32>>, // topological layers for parallel execution
// Runtime execution state
pub started: HashSet<u32>,
pub completed: HashSet<u32>,
pub failed_job: Option<u32>,
} }
pub async fn build_flow_dag( pub async fn build_flow_dag(
@@ -200,8 +222,122 @@ pub async fn build_flow_dag(
roots, roots,
leaves, leaves,
levels, levels,
started: HashSet::new(),
completed: HashSet::new(),
failed_job: None,
}; };
Ok(dag) Ok(dag)
} }
impl FlowDag {
/// Return all jobs that are ready to be processed.
/// A job is ready if:
/// - it exists in the DAG
/// - it is not already started or completed
/// - it has no dependencies, or all dependencies are completed
/// If any job has failed, the entire flow is considered failed and an error is returned.
pub fn ready_jobs(&self) -> DagResult<Vec<u32>> {
if let Some(failed_job) = self.failed_job {
return Err(DagError::FlowFailed { failed_job });
}
let mut ready: Vec<u32> = Vec::new();
for (&jid, summary) in &self.nodes {
if self.completed.contains(&jid) || self.started.contains(&jid) {
continue;
}
let mut deps_ok = true;
for dep in &summary.depends {
if !self.completed.contains(dep) {
deps_ok = false;
break;
}
}
if deps_ok {
ready.push(jid);
}
}
ready.sort_unstable();
Ok(ready)
}
/// Mark a job as started.
/// Strict validation rules:
/// - Unknown jobs are rejected with UnknownJob
/// - If the flow has already failed, return FlowFailed
/// - If the job is already started or completed, this is a no-op (idempotent)
/// - If any dependency is not completed, return DependenciesIncomplete with the missing deps
pub fn mark_job_started(&mut self, job: u32) -> DagResult<()> {
if !self.nodes.contains_key(&job) {
return Err(DagError::UnknownJob { job });
}
if self.completed.contains(&job) || self.started.contains(&job) {
return Ok(());
}
if let Some(failed_job) = self.failed_job {
return Err(DagError::FlowFailed { failed_job });
}
let summary = self.nodes.get(&job).expect("checked contains_key");
let missing: Vec<u32> = summary
.depends
.iter()
.copied()
.filter(|d| !self.completed.contains(d))
.collect();
if !missing.is_empty() {
return Err(DagError::DependenciesIncomplete { job, missing });
}
self.started.insert(job);
Ok(())
}
/// Mark a job as completed.
/// Strict validation rules:
/// - Unknown jobs are rejected with UnknownJob
/// - If the job is already completed, this is a no-op (idempotent)
/// - If the flow has already failed, return FlowFailed
/// - If the job was not previously started, return JobNotStarted
pub fn mark_job_completed(&mut self, job: u32) -> DagResult<()> {
if !self.nodes.contains_key(&job) {
return Err(DagError::UnknownJob { job });
}
if self.completed.contains(&job) {
return Ok(());
}
if let Some(failed_job) = self.failed_job {
return Err(DagError::FlowFailed { failed_job });
}
if !self.started.contains(&job) {
return Err(DagError::JobNotStarted { job });
}
self.started.remove(&job);
self.completed.insert(job);
Ok(())
}
/// Mark a job as failed.
/// Behavior:
/// - Unknown jobs are rejected with UnknownJob
/// - If a failure is already recorded:
/// - If it is the same job, no-op (idempotent)
/// - If it is a different job, return FlowFailed with the already-failed job
/// - Otherwise record this job as the failed job
pub fn mark_job_failed(&mut self, job: u32) -> DagResult<()> {
if !self.nodes.contains_key(&job) {
return Err(DagError::UnknownJob { job });
}
match self.failed_job {
Some(existing) if existing == job => Ok(()),
Some(existing) => Err(DagError::FlowFailed { failed_job: existing }),
None => {
self.failed_job = Some(job);
Ok(())
}
}
}
}

View File

@@ -59,6 +59,26 @@ fn dag_err(e: DagError) -> ErrorObjectOwned {
"DAG Cycle Detected", "DAG Cycle Detected",
Some(Value::String(e.to_string())), Some(Value::String(e.to_string())),
), ),
DagError::UnknownJob { .. } => ErrorObjectOwned::owned(
-32022,
"DAG Unknown Job",
Some(Value::String(e.to_string())),
),
DagError::DependenciesIncomplete { .. } => ErrorObjectOwned::owned(
-32023,
"DAG Dependencies Incomplete",
Some(Value::String(e.to_string())),
),
DagError::FlowFailed { .. } => ErrorObjectOwned::owned(
-32024,
"DAG Flow Failed",
Some(Value::String(e.to_string())),
),
DagError::JobNotStarted { .. } => ErrorObjectOwned::owned(
-32025,
"DAG Job Not Started",
Some(Value::String(e.to_string())),
),
} }
} }