From e57c76fb36261c2877338580e58f24f8535df924 Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Fri, 22 Aug 2025 10:59:46 +0200 Subject: [PATCH] Allow updating DAG Signed-off-by: Lee Smet --- src/dag.rs | 136 +++++++++++++++++++++++++++++++++++++++++++++++++++++ src/rpc.rs | 20 ++++++++ 2 files changed, 156 insertions(+) diff --git a/src/dag.rs b/src/dag.rs index 845cff5..62cb2b7 100644 --- a/src/dag.rs +++ b/src/dag.rs @@ -14,6 +14,10 @@ pub enum DagError { Storage(Box), MissingDependency { job: u32, depends_on: u32 }, CycleDetected { remaining: Vec }, + UnknownJob { job: u32 }, + DependenciesIncomplete { job: u32, missing: Vec }, + FlowFailed { failed_job: u32 }, + JobNotStarted { job: u32 }, } impl fmt::Display for DagError { @@ -28,6 +32,20 @@ impl fmt::Display for DagError { DagError::CycleDetected { remaining } => { write!(f, "Cycle detected; unresolved nodes: {:?}", remaining) } + DagError::UnknownJob { job } => write!(f, "Unknown job id: {}", job), + DagError::DependenciesIncomplete { job, missing } => write!( + f, + "Job {} cannot start; missing completed deps: {:?}", + job, missing + ), + DagError::FlowFailed { failed_job } => { + write!(f, "Flow failed due to job {}", failed_job) + } + DagError::JobNotStarted { job } => write!( + f, + "Job {} cannot be completed because it is not marked as started", + job + ), } } } @@ -59,6 +77,10 @@ pub struct FlowDag { pub roots: Vec, // in_degree == 0 pub leaves: Vec, // out_degree == 0 pub levels: Vec>, // topological layers for parallel execution + // Runtime execution state + pub started: HashSet, + pub completed: HashSet, + pub failed_job: Option, } pub async fn build_flow_dag( @@ -200,8 +222,122 @@ pub async fn build_flow_dag( roots, leaves, levels, + started: HashSet::new(), + completed: HashSet::new(), + failed_job: None, }; Ok(dag) } +impl FlowDag { + /// Return all jobs that are ready to be processed. + /// A job is ready if: + /// - it exists in the DAG + /// - it is not already started or completed + /// - it has no dependencies, or all dependencies are completed + /// If any job has failed, the entire flow is considered failed and an error is returned. + pub fn ready_jobs(&self) -> DagResult> { + if let Some(failed_job) = self.failed_job { + return Err(DagError::FlowFailed { failed_job }); + } + + let mut ready: Vec = Vec::new(); + for (&jid, summary) in &self.nodes { + if self.completed.contains(&jid) || self.started.contains(&jid) { + continue; + } + let mut deps_ok = true; + for dep in &summary.depends { + if !self.completed.contains(dep) { + deps_ok = false; + break; + } + } + if deps_ok { + ready.push(jid); + } + } + ready.sort_unstable(); + Ok(ready) + } + + /// Mark a job as started. + /// Strict validation rules: + /// - Unknown jobs are rejected with UnknownJob + /// - If the flow has already failed, return FlowFailed + /// - If the job is already started or completed, this is a no-op (idempotent) + /// - If any dependency is not completed, return DependenciesIncomplete with the missing deps + pub fn mark_job_started(&mut self, job: u32) -> DagResult<()> { + if !self.nodes.contains_key(&job) { + return Err(DagError::UnknownJob { job }); + } + if self.completed.contains(&job) || self.started.contains(&job) { + return Ok(()); + } + if let Some(failed_job) = self.failed_job { + return Err(DagError::FlowFailed { failed_job }); + } + + let summary = self.nodes.get(&job).expect("checked contains_key"); + let missing: Vec = summary + .depends + .iter() + .copied() + .filter(|d| !self.completed.contains(d)) + .collect(); + + if !missing.is_empty() { + return Err(DagError::DependenciesIncomplete { job, missing }); + } + + self.started.insert(job); + Ok(()) + } + + /// Mark a job as completed. + /// Strict validation rules: + /// - Unknown jobs are rejected with UnknownJob + /// - If the job is already completed, this is a no-op (idempotent) + /// - If the flow has already failed, return FlowFailed + /// - If the job was not previously started, return JobNotStarted + pub fn mark_job_completed(&mut self, job: u32) -> DagResult<()> { + if !self.nodes.contains_key(&job) { + return Err(DagError::UnknownJob { job }); + } + if self.completed.contains(&job) { + return Ok(()); + } + if let Some(failed_job) = self.failed_job { + return Err(DagError::FlowFailed { failed_job }); + } + if !self.started.contains(&job) { + return Err(DagError::JobNotStarted { job }); + } + + self.started.remove(&job); + self.completed.insert(job); + Ok(()) + } + + /// Mark a job as failed. + /// Behavior: + /// - Unknown jobs are rejected with UnknownJob + /// - If a failure is already recorded: + /// - If it is the same job, no-op (idempotent) + /// - If it is a different job, return FlowFailed with the already-failed job + /// - Otherwise record this job as the failed job + pub fn mark_job_failed(&mut self, job: u32) -> DagResult<()> { + if !self.nodes.contains_key(&job) { + return Err(DagError::UnknownJob { job }); + } + match self.failed_job { + Some(existing) if existing == job => Ok(()), + Some(existing) => Err(DagError::FlowFailed { failed_job: existing }), + None => { + self.failed_job = Some(job); + Ok(()) + } + } + } +} diff --git a/src/rpc.rs b/src/rpc.rs index 851e0a1..121c2aa 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -59,6 +59,26 @@ fn dag_err(e: DagError) -> ErrorObjectOwned { "DAG Cycle Detected", Some(Value::String(e.to_string())), ), + DagError::UnknownJob { .. } => ErrorObjectOwned::owned( + -32022, + "DAG Unknown Job", + Some(Value::String(e.to_string())), + ), + DagError::DependenciesIncomplete { .. } => ErrorObjectOwned::owned( + -32023, + "DAG Dependencies Incomplete", + Some(Value::String(e.to_string())), + ), + DagError::FlowFailed { .. } => ErrorObjectOwned::owned( + -32024, + "DAG Flow Failed", + Some(Value::String(e.to_string())), + ), + DagError::JobNotStarted { .. } => ErrorObjectOwned::owned( + -32025, + "DAG Job Not Started", + Some(Value::String(e.to_string())), + ), } }