//! Orchestrated Flow model for DAG-based workflow execution use heromodels_core::BaseModelData; use serde::{Deserialize, Serialize}; use std::collections::HashSet; use thiserror::Error; use super::orchestrated_flow_step::OrchestratedFlowStep; /// Extended Flow with orchestrator-specific steps #[derive(Debug, Clone, Serialize, Deserialize)] pub struct OrchestratedFlow { /// Base model data (id, created_at, updated_at) pub base_data: BaseModelData, /// Name of the flow pub name: String, /// Orchestrated steps with dependencies pub orchestrated_steps: Vec, } impl OrchestratedFlow { /// Create a new orchestrated flow pub fn new(name: &str) -> Self { Self { base_data: BaseModelData::new(), name: name.to_string(), orchestrated_steps: Vec::new(), } } /// Add a step to the flow pub fn add_step(mut self, step: OrchestratedFlowStep) -> Self { self.orchestrated_steps.push(step); self } /// Get the flow ID pub fn id(&self) -> u32 { self.base_data.id } /// Validate the DAG structure (no cycles) pub fn validate_dag(&self) -> Result<(), OrchestratorError> { let mut visited = HashSet::new(); let mut rec_stack = HashSet::new(); for step in &self.orchestrated_steps { if !visited.contains(&step.id()) { if self.has_cycle(step.id(), &mut visited, &mut rec_stack)? { return Err(OrchestratorError::CyclicDependency); } } } Ok(()) } /// Check for cycles in the dependency graph fn has_cycle( &self, step_id: u32, visited: &mut HashSet, rec_stack: &mut HashSet, ) -> Result { visited.insert(step_id); rec_stack.insert(step_id); let step = self.orchestrated_steps .iter() .find(|s| s.id() == step_id) .ok_or(OrchestratorError::StepNotFound(step_id))?; for &dep_id in &step.depends_on { if !visited.contains(&dep_id) { if self.has_cycle(dep_id, visited, rec_stack)? { return Ok(true); } } else if rec_stack.contains(&dep_id) { return Ok(true); } } rec_stack.remove(&step_id); Ok(false) } } /// Orchestrator errors #[derive(Error, Debug)] pub enum OrchestratorError { #[error("Database error: {0}")] DatabaseError(String), #[error("Executor error: {0}")] ExecutorError(String), #[error("No ready steps found - possible deadlock")] NoReadySteps, #[error("Step {0} failed: {1:?}")] StepFailed(u32, Option), #[error("Cyclic dependency detected in workflow")] CyclicDependency, #[error("Step {0} not found")] StepNotFound(u32), #[error("Invalid dependency: step {0} depends on non-existent step {1}")] InvalidDependency(u32, u32), } /// Flow execution status #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub enum FlowStatus { Pending, Running, Completed, Failed, } #[cfg(test)] mod tests { use super::*; #[test] fn test_orchestrated_flow_builder() { let step1 = OrchestratedFlowStep::new("step1").script("let x = 1;"); let step2 = OrchestratedFlowStep::new("step2").script("let y = 2;"); let flow = OrchestratedFlow::new("test_flow") .add_step(step1) .add_step(step2); assert_eq!(flow.name, "test_flow"); assert_eq!(flow.orchestrated_steps.len(), 2); } #[test] fn test_dag_validation_no_cycle() { let step1 = OrchestratedFlowStep::new("step1").script("let x = 1;"); let step2 = OrchestratedFlowStep::new("step2") .script("let y = 2;") .depends_on(step1.id()); let flow = OrchestratedFlow::new("test_flow") .add_step(step1) .add_step(step2); assert!(flow.validate_dag().is_ok()); } }