From 74995fa6fe1af1a08db472a1924486359bc0faea Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Fri, 22 Aug 2025 12:45:50 +0200 Subject: [PATCH] Add job status update in service layer Signed-off-by: Lee Smet --- src/models/job.rs | 5 +++- src/service.rs | 70 ++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 73 insertions(+), 2 deletions(-) diff --git a/src/models/job.rs b/src/models/job.rs index 50205dd..b3a7e79 100644 --- a/src/models/job.rs +++ b/src/models/job.rs @@ -28,7 +28,7 @@ pub struct Job { status: JobStatus, } -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)] pub enum JobStatus { Dispatched, WaitingForPrerequisites, @@ -56,4 +56,7 @@ impl Job { pub fn script_type(&self) -> ScriptType { self.script_type.clone() } + pub fn status(&self) -> JobStatus { + self.status.clone() + } } diff --git a/src/service.rs b/src/service.rs index d91520b..5c5d609 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,9 +1,27 @@ use crate::dag::{build_flow_dag, DagResult, FlowDag}; -use crate::models::{Actor, Context, Flow, Job, Message, Runner}; +use crate::models::{Actor, Context, Flow, Job, JobStatus, Message, Runner}; use crate::storage::RedisDriver; pub type BoxError = Box; +#[derive(Debug)] +struct InvalidJobStatusTransition { + from: JobStatus, + to: JobStatus, +} + +impl std::fmt::Display for InvalidJobStatusTransition { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Invalid job status transition: {:?} -> {:?}", + self.from, self.to + ) + } +} + +impl std::error::Error for InvalidJobStatusTransition {} + pub struct AppService { redis: RedisDriver, } @@ -82,6 +100,56 @@ impl AppService { Ok(job) } + /// Update a job status with transition validation. + /// Allowed transitions: + /// - Dispatched -> WaitingForPrerequisites | Started | Error + /// - WaitingForPrerequisites -> Started | Error + /// - Started -> Finished | Error + /// - Finished, Error -> terminal (no transitions) + /// If the new status equals the current status, this is a no-op. + pub async fn update_job_status( + &self, + context_id: u32, + caller_id: u32, + id: u32, + new_status: JobStatus, + ) -> Result<(), BoxError> { + let job = self.redis.load_job(context_id, caller_id, id).await?; + let current = job.status(); + + if new_status == current { + // Idempotent: don't touch storage if no change + return Ok(()); + } + + let allowed = match current { + JobStatus::Dispatched => matches!( + new_status, + JobStatus::WaitingForPrerequisites | JobStatus::Started | JobStatus::Error + ), + JobStatus::WaitingForPrerequisites => matches!( + new_status, + JobStatus::Started | JobStatus::Error + ), + JobStatus::Started => matches!( + new_status, + JobStatus::Finished | JobStatus::Error + ), + JobStatus::Finished | JobStatus::Error => false, + }; + + if !allowed { + return Err(Box::new(InvalidJobStatusTransition { + from: current, + to: new_status, + })); + } + + self.redis + .update_job_status(context_id, caller_id, id, new_status) + .await?; + Ok(()) + } // ----------------------------- // Message // -----------------------------