Add job status update in service layer

Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
Lee Smet
2025-08-22 12:45:50 +02:00
parent f30706a25a
commit 74995fa6fe
2 changed files with 73 additions and 2 deletions

View File

@@ -28,7 +28,7 @@ pub struct Job {
status: JobStatus, status: JobStatus,
} }
#[derive(Clone, Serialize, Deserialize)] #[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
pub enum JobStatus { pub enum JobStatus {
Dispatched, Dispatched,
WaitingForPrerequisites, WaitingForPrerequisites,
@@ -56,4 +56,7 @@ impl Job {
pub fn script_type(&self) -> ScriptType { pub fn script_type(&self) -> ScriptType {
self.script_type.clone() self.script_type.clone()
} }
pub fn status(&self) -> JobStatus {
self.status.clone()
}
} }

View File

@@ -1,9 +1,27 @@
use crate::dag::{build_flow_dag, DagResult, FlowDag}; use crate::dag::{build_flow_dag, DagResult, FlowDag};
use crate::models::{Actor, Context, Flow, Job, Message, Runner}; use crate::models::{Actor, Context, Flow, Job, JobStatus, Message, Runner};
use crate::storage::RedisDriver; use crate::storage::RedisDriver;
pub type BoxError = Box<dyn std::error::Error + Send + Sync>; pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
#[derive(Debug)]
struct InvalidJobStatusTransition {
from: JobStatus,
to: JobStatus,
}
impl std::fmt::Display for InvalidJobStatusTransition {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Invalid job status transition: {:?} -> {:?}",
self.from, self.to
)
}
}
impl std::error::Error for InvalidJobStatusTransition {}
pub struct AppService { pub struct AppService {
redis: RedisDriver, redis: RedisDriver,
} }
@@ -82,6 +100,56 @@ impl AppService {
Ok(job) Ok(job)
} }
/// Update a job status with transition validation.
/// Allowed transitions:
/// - Dispatched -> WaitingForPrerequisites | Started | Error
/// - WaitingForPrerequisites -> Started | Error
/// - Started -> Finished | Error
/// - Finished, Error -> terminal (no transitions)
/// If the new status equals the current status, this is a no-op.
pub async fn update_job_status(
&self,
context_id: u32,
caller_id: u32,
id: u32,
new_status: JobStatus,
) -> Result<(), BoxError> {
let job = self.redis.load_job(context_id, caller_id, id).await?;
let current = job.status();
if new_status == current {
// Idempotent: don't touch storage if no change
return Ok(());
}
let allowed = match current {
JobStatus::Dispatched => matches!(
new_status,
JobStatus::WaitingForPrerequisites | JobStatus::Started | JobStatus::Error
),
JobStatus::WaitingForPrerequisites => matches!(
new_status,
JobStatus::Started | JobStatus::Error
),
JobStatus::Started => matches!(
new_status,
JobStatus::Finished | JobStatus::Error
),
JobStatus::Finished | JobStatus::Error => false,
};
if !allowed {
return Err(Box::new(InvalidJobStatusTransition {
from: current,
to: new_status,
}));
}
self.redis
.update_job_status(context_id, caller_id, id, new_status)
.await?;
Ok(())
}
// ----------------------------- // -----------------------------
// Message // Message
// ----------------------------- // -----------------------------