use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet, VecDeque}; use std::fmt; use crate::{ models::{Flow, Job, ScriptType}, storage::RedisDriver, }; pub type DagResult = Result; #[derive(Debug)] pub enum DagError { Storage(Box), MissingDependency { job: u32, depends_on: u32 }, CycleDetected { remaining: Vec }, UnknownJob { job: u32 }, DependenciesIncomplete { job: u32, missing: Vec }, FlowFailed { failed_job: u32 }, JobNotStarted { job: u32 }, } impl fmt::Display for DagError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { DagError::Storage(e) => write!(f, "Storage error: {}", e), DagError::MissingDependency { job, depends_on } => write!( f, "Job {} depends on {}, which is not part of the flow.jobs list", job, depends_on ), DagError::CycleDetected { remaining } => { write!(f, "Cycle detected; unresolved nodes: {:?}", remaining) } DagError::UnknownJob { job } => write!(f, "Unknown job id: {}", job), DagError::DependenciesIncomplete { job, missing } => write!( f, "Job {} cannot start; missing completed deps: {:?}", job, missing ), DagError::FlowFailed { failed_job } => { write!(f, "Flow failed due to job {}", failed_job) } DagError::JobNotStarted { job } => write!( f, "Job {} cannot be completed because it is not marked as started", job ), } } } impl std::error::Error for DagError {} impl From> for DagError { fn from(e: Box) -> Self { DagError::Storage(e) } } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct JobSummary { pub id: u32, pub depends: Vec, pub prerequisites: Vec, pub script_type: ScriptType, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FlowDag { pub flow_id: u32, pub caller_id: u32, pub context_id: u32, pub nodes: HashMap, pub edges: Vec<(u32, u32)>, // (from prerequisite, to job) pub reverse_edges: Vec<(u32, u32)>, // (from job, to prerequisite) pub roots: Vec, // in_degree == 0 pub leaves: Vec, // out_degree == 0 pub levels: Vec>, // topological layers for parallel execution // Runtime execution state pub started: HashSet, pub completed: HashSet, pub failed_job: Option, } pub async fn build_flow_dag( redis: &RedisDriver, context_id: u32, flow_id: u32, ) -> DagResult { // Load flow let flow: Flow = redis .load_flow(context_id, flow_id) .await .map_err(DagError::from)?; let caller_id = flow.caller_id(); let flow_job_ids = flow.jobs(); // Build a set for faster membership tests let job_id_set: HashSet = flow_job_ids.iter().copied().collect(); // Load all jobs let mut jobs: HashMap = HashMap::with_capacity(flow_job_ids.len()); for jid in flow_job_ids { let job = redis .load_job(context_id, caller_id, *jid) .await .map_err(DagError::from)?; jobs.insert(*jid, job); } // Validate dependencies and construct adjacency let mut edges: Vec<(u32, u32)> = Vec::new(); let mut reverse_edges: Vec<(u32, u32)> = Vec::new(); let mut adj: HashMap> = HashMap::with_capacity(jobs.len()); let mut rev_adj: HashMap> = HashMap::with_capacity(jobs.len()); let mut in_degree: HashMap = HashMap::with_capacity(jobs.len()); for &jid in flow_job_ids { adj.entry(jid).or_default(); rev_adj.entry(jid).or_default(); in_degree.entry(jid).or_insert(0); } for (&jid, job) in &jobs { for &dep in job.depends() { if !job_id_set.contains(&dep) { return Err(DagError::MissingDependency { job: jid, depends_on: dep, }); } // edge: dep -> jid edges.push((dep, jid)); reverse_edges.push((jid, dep)); adj.get_mut(&dep).unwrap().push(jid); rev_adj.get_mut(&jid).unwrap().push(dep); *in_degree.get_mut(&jid).unwrap() += 1; } } // Kahn's algorithm for topological sorting, with level construction let mut zero_in: VecDeque = in_degree .iter() .filter_map(|(k, v)| if *v == 0 { Some(*k) } else { None }) .collect(); let mut processed_count = 0usize; let mut levels: Vec> = Vec::new(); // To make deterministic, sort initial zero_in { let mut tmp: Vec = zero_in.iter().copied().collect(); tmp.sort_unstable(); zero_in = tmp.into_iter().collect(); } while !zero_in.is_empty() { let mut level: Vec = Vec::new(); // drain current frontier let mut next_zero: Vec = Vec::new(); let mut current_frontier: Vec = zero_in.drain(..).collect(); current_frontier.sort_unstable(); for u in current_frontier { level.push(u); processed_count += 1; if let Some(children) = adj.get(&u) { let mut sorted_children = children.clone(); sorted_children.sort_unstable(); for &v in &sorted_children { let d = in_degree.get_mut(&v).unwrap(); *d -= 1; if *d == 0 { next_zero.push(v); } } } } next_zero.sort_unstable(); zero_in = next_zero.into_iter().collect(); levels.push(level); } if processed_count != jobs.len() { let remaining: Vec = in_degree .into_iter() .filter_map(|(k, v)| if v > 0 { Some(k) } else { None }) .collect(); return Err(DagError::CycleDetected { remaining }); } // Roots and leaves let roots: Vec = levels.first().cloned().unwrap_or_default(); let leaves: Vec = adj .iter() .filter_map(|(k, v)| if v.is_empty() { Some(*k) } else { None }) .collect(); // Nodes map (JobSummary) let mut nodes: HashMap = HashMap::with_capacity(jobs.len()); for (&jid, job) in &jobs { let summary = JobSummary { id: jid, depends: job.depends().to_vec(), prerequisites: job.prerequisites().to_vec(), script_type: job.script_type(), }; nodes.insert(jid, summary); } // Sort edges deterministically edges.sort_unstable(); reverse_edges.sort_unstable(); let dag = FlowDag { flow_id, caller_id, context_id, nodes, edges, reverse_edges, roots, leaves, levels, started: HashSet::new(), completed: HashSet::new(), failed_job: None, }; Ok(dag) } impl FlowDag { /// Return all jobs that are ready to be processed. /// A job is ready if: /// - it exists in the DAG /// - it is not already started or completed /// - it has no dependencies, or all dependencies are completed /// /// If any job has failed, the entire flow is considered failed and an error is returned. pub fn ready_jobs(&self) -> DagResult> { if let Some(failed_job) = self.failed_job { return Err(DagError::FlowFailed { failed_job }); } let mut ready: Vec = Vec::new(); for (&jid, summary) in &self.nodes { if self.completed.contains(&jid) || self.started.contains(&jid) { continue; } let mut deps_ok = true; for dep in &summary.depends { if !self.completed.contains(dep) { deps_ok = false; break; } } if deps_ok { ready.push(jid); } } ready.sort_unstable(); Ok(ready) } /// Mark a job as started. /// Strict validation rules: /// - Unknown jobs are rejected with UnknownJob /// - If the flow has already failed, return FlowFailed /// - If the job is already started or completed, this is a no-op (idempotent) /// - If any dependency is not completed, return DependenciesIncomplete with the missing deps pub fn mark_job_started(&mut self, job: u32) -> DagResult<()> { if !self.nodes.contains_key(&job) { return Err(DagError::UnknownJob { job }); } if self.completed.contains(&job) || self.started.contains(&job) { return Ok(()); } if let Some(failed_job) = self.failed_job { return Err(DagError::FlowFailed { failed_job }); } let summary = self.nodes.get(&job).expect("checked contains_key"); let missing: Vec = summary .depends .iter() .copied() .filter(|d| !self.completed.contains(d)) .collect(); if !missing.is_empty() { return Err(DagError::DependenciesIncomplete { job, missing }); } self.started.insert(job); Ok(()) } /// Mark a job as completed. /// Strict validation rules: /// - Unknown jobs are rejected with UnknownJob /// - If the job is already completed, this is a no-op (idempotent) /// - If the flow has already failed, return FlowFailed /// - If the job was not previously started, return JobNotStarted pub fn mark_job_completed(&mut self, job: u32) -> DagResult<()> { if !self.nodes.contains_key(&job) { return Err(DagError::UnknownJob { job }); } if self.completed.contains(&job) { return Ok(()); } if let Some(failed_job) = self.failed_job { return Err(DagError::FlowFailed { failed_job }); } if !self.started.contains(&job) { return Err(DagError::JobNotStarted { job }); } self.started.remove(&job); self.completed.insert(job); Ok(()) } /// Mark a job as failed. /// Behavior: /// - Unknown jobs are rejected with UnknownJob /// - If a failure is already recorded: /// - If it is the same job, no-op (idempotent) /// - If it is a different job, return FlowFailed with the already-failed job /// - Otherwise record this job as the failed job pub fn mark_job_failed(&mut self, job: u32) -> DagResult<()> { if !self.nodes.contains_key(&job) { return Err(DagError::UnknownJob { job }); } match self.failed_job { Some(existing) if existing == job => Ok(()), Some(existing) => Err(DagError::FlowFailed { failed_job: existing, }), None => { self.failed_job = Some(job); Ok(()) } } } }