diff --git a/src/dag.rs b/src/dag.rs index 753a206..5a2b74c 100644 --- a/src/dag.rs +++ b/src/dag.rs @@ -94,14 +94,14 @@ pub async fn build_flow_dag( .await .map_err(DagError::from)?; let caller_id = flow.caller_id(); - let flow_job_ids = flow.jobs().clone(); + let flow_job_ids = flow.jobs(); // Build a set for faster membership tests let job_id_set: HashSet = flow_job_ids.iter().copied().collect(); // Load all jobs let mut jobs: HashMap = HashMap::with_capacity(flow_job_ids.len()); - for jid in &flow_job_ids { + for jid in flow_job_ids { let job = redis .load_job(context_id, caller_id, *jid) .await @@ -116,7 +116,7 @@ pub async fn build_flow_dag( let mut rev_adj: HashMap> = HashMap::with_capacity(jobs.len()); let mut in_degree: HashMap = HashMap::with_capacity(jobs.len()); - for &jid in &flow_job_ids { + for &jid in flow_job_ids { adj.entry(jid).or_default(); rev_adj.entry(jid).or_default(); in_degree.entry(jid).or_insert(0); diff --git a/src/models/flow.rs b/src/models/flow.rs index d726cbc..36b6922 100644 --- a/src/models/flow.rs +++ b/src/models/flow.rs @@ -42,7 +42,7 @@ impl Flow { pub fn context_id(&self) -> u32 { self.context_id } - pub fn jobs(&self) -> &Vec { + pub fn jobs(&self) -> &[u32] { &self.jobs } } diff --git a/src/models/job.rs b/src/models/job.rs index b3a7e79..18f7855 100644 --- a/src/models/job.rs +++ b/src/models/job.rs @@ -47,10 +47,10 @@ impl Job { pub fn context_id(&self) -> u32 { self.context_id } - pub fn depends(&self) -> &Vec { + pub fn depends(&self) -> &[u32] { &self.depends } - pub fn prerequisites(&self) -> &Vec { + pub fn prerequisites(&self) -> &[String] { &self.prerequisites } pub fn script_type(&self) -> ScriptType { diff --git a/src/service.rs b/src/service.rs index 5c5d609..c8cee0c 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,4 +1,4 @@ -use crate::dag::{build_flow_dag, DagResult, FlowDag}; +use crate::dag::{DagResult, FlowDag, build_flow_dag}; use crate::models::{Actor, Context, Flow, Job, JobStatus, Message, Runner}; use crate::storage::RedisDriver; @@ -95,17 +95,24 @@ impl AppService { Ok(job) } - pub async fn load_job(&self, context_id: u32, caller_id: u32, id: u32) -> Result { + pub async fn load_job( + &self, + context_id: u32, + caller_id: u32, + id: u32, + ) -> Result { let job = self.redis.load_job(context_id, caller_id, id).await?; Ok(job) } /// Update a job status with transition validation. + /// /// Allowed transitions: /// - Dispatched -> WaitingForPrerequisites | Started | Error /// - WaitingForPrerequisites -> Started | Error /// - Started -> Finished | Error /// - Finished, Error -> terminal (no transitions) + /// /// If the new status equals the current status, this is a no-op. pub async fn update_job_status( &self, @@ -127,14 +134,10 @@ impl AppService { new_status, JobStatus::WaitingForPrerequisites | JobStatus::Started | JobStatus::Error ), - JobStatus::WaitingForPrerequisites => matches!( - new_status, - JobStatus::Started | JobStatus::Error - ), - JobStatus::Started => matches!( - new_status, - JobStatus::Finished | JobStatus::Error - ), + JobStatus::WaitingForPrerequisites => { + matches!(new_status, JobStatus::Started | JobStatus::Error) + } + JobStatus::Started => matches!(new_status, JobStatus::Finished | JobStatus::Error), JobStatus::Finished | JobStatus::Error => false, }; @@ -153,13 +156,23 @@ impl AppService { // ----------------------------- // Message // ----------------------------- - pub async fn create_message(&self, context_id: u32, message: Message) -> Result { + pub async fn create_message( + &self, + context_id: u32, + message: Message, + ) -> Result { self.redis.save_message(context_id, &message).await?; Ok(message) } - pub async fn load_message(&self, context_id: u32, caller_id: u32, id: u32) -> Result { + pub async fn load_message( + &self, + context_id: u32, + caller_id: u32, + id: u32, + ) -> Result { let msg = self.redis.load_message(context_id, caller_id, id).await?; Ok(msg) } -} \ No newline at end of file +} +