General code improvements

Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
Lee Smet
2025-08-22 12:48:36 +02:00
parent 74995fa6fe
commit bc6cb16732
4 changed files with 32 additions and 19 deletions

View File

@@ -94,14 +94,14 @@ pub async fn build_flow_dag(
.await .await
.map_err(DagError::from)?; .map_err(DagError::from)?;
let caller_id = flow.caller_id(); let caller_id = flow.caller_id();
let flow_job_ids = flow.jobs().clone(); let flow_job_ids = flow.jobs();
// Build a set for faster membership tests // Build a set for faster membership tests
let job_id_set: HashSet<u32> = flow_job_ids.iter().copied().collect(); let job_id_set: HashSet<u32> = flow_job_ids.iter().copied().collect();
// Load all jobs // Load all jobs
let mut jobs: HashMap<u32, Job> = HashMap::with_capacity(flow_job_ids.len()); let mut jobs: HashMap<u32, Job> = HashMap::with_capacity(flow_job_ids.len());
for jid in &flow_job_ids { for jid in flow_job_ids {
let job = redis let job = redis
.load_job(context_id, caller_id, *jid) .load_job(context_id, caller_id, *jid)
.await .await
@@ -116,7 +116,7 @@ pub async fn build_flow_dag(
let mut rev_adj: HashMap<u32, Vec<u32>> = HashMap::with_capacity(jobs.len()); let mut rev_adj: HashMap<u32, Vec<u32>> = HashMap::with_capacity(jobs.len());
let mut in_degree: HashMap<u32, usize> = HashMap::with_capacity(jobs.len()); let mut in_degree: HashMap<u32, usize> = HashMap::with_capacity(jobs.len());
for &jid in &flow_job_ids { for &jid in flow_job_ids {
adj.entry(jid).or_default(); adj.entry(jid).or_default();
rev_adj.entry(jid).or_default(); rev_adj.entry(jid).or_default();
in_degree.entry(jid).or_insert(0); in_degree.entry(jid).or_insert(0);

View File

@@ -42,7 +42,7 @@ impl Flow {
pub fn context_id(&self) -> u32 { pub fn context_id(&self) -> u32 {
self.context_id self.context_id
} }
pub fn jobs(&self) -> &Vec<u32> { pub fn jobs(&self) -> &[u32] {
&self.jobs &self.jobs
} }
} }

View File

@@ -47,10 +47,10 @@ impl Job {
pub fn context_id(&self) -> u32 { pub fn context_id(&self) -> u32 {
self.context_id self.context_id
} }
pub fn depends(&self) -> &Vec<u32> { pub fn depends(&self) -> &[u32] {
&self.depends &self.depends
} }
pub fn prerequisites(&self) -> &Vec<String> { pub fn prerequisites(&self) -> &[String] {
&self.prerequisites &self.prerequisites
} }
pub fn script_type(&self) -> ScriptType { pub fn script_type(&self) -> ScriptType {

View File

@@ -1,4 +1,4 @@
use crate::dag::{build_flow_dag, DagResult, FlowDag}; use crate::dag::{DagResult, FlowDag, build_flow_dag};
use crate::models::{Actor, Context, Flow, Job, JobStatus, Message, Runner}; use crate::models::{Actor, Context, Flow, Job, JobStatus, Message, Runner};
use crate::storage::RedisDriver; use crate::storage::RedisDriver;
@@ -95,17 +95,24 @@ impl AppService {
Ok(job) Ok(job)
} }
pub async fn load_job(&self, context_id: u32, caller_id: u32, id: u32) -> Result<Job, BoxError> { pub async fn load_job(
&self,
context_id: u32,
caller_id: u32,
id: u32,
) -> Result<Job, BoxError> {
let job = self.redis.load_job(context_id, caller_id, id).await?; let job = self.redis.load_job(context_id, caller_id, id).await?;
Ok(job) Ok(job)
} }
/// Update a job status with transition validation. /// Update a job status with transition validation.
///
/// Allowed transitions: /// Allowed transitions:
/// - Dispatched -> WaitingForPrerequisites | Started | Error /// - Dispatched -> WaitingForPrerequisites | Started | Error
/// - WaitingForPrerequisites -> Started | Error /// - WaitingForPrerequisites -> Started | Error
/// - Started -> Finished | Error /// - Started -> Finished | Error
/// - Finished, Error -> terminal (no transitions) /// - Finished, Error -> terminal (no transitions)
///
/// If the new status equals the current status, this is a no-op. /// If the new status equals the current status, this is a no-op.
pub async fn update_job_status( pub async fn update_job_status(
&self, &self,
@@ -127,14 +134,10 @@ impl AppService {
new_status, new_status,
JobStatus::WaitingForPrerequisites | JobStatus::Started | JobStatus::Error JobStatus::WaitingForPrerequisites | JobStatus::Started | JobStatus::Error
), ),
JobStatus::WaitingForPrerequisites => matches!( JobStatus::WaitingForPrerequisites => {
new_status, matches!(new_status, JobStatus::Started | JobStatus::Error)
JobStatus::Started | JobStatus::Error }
), JobStatus::Started => matches!(new_status, JobStatus::Finished | JobStatus::Error),
JobStatus::Started => matches!(
new_status,
JobStatus::Finished | JobStatus::Error
),
JobStatus::Finished | JobStatus::Error => false, JobStatus::Finished | JobStatus::Error => false,
}; };
@@ -153,13 +156,23 @@ impl AppService {
// ----------------------------- // -----------------------------
// Message // Message
// ----------------------------- // -----------------------------
pub async fn create_message(&self, context_id: u32, message: Message) -> Result<Message, BoxError> { pub async fn create_message(
&self,
context_id: u32,
message: Message,
) -> Result<Message, BoxError> {
self.redis.save_message(context_id, &message).await?; self.redis.save_message(context_id, &message).await?;
Ok(message) Ok(message)
} }
pub async fn load_message(&self, context_id: u32, caller_id: u32, id: u32) -> Result<Message, BoxError> { pub async fn load_message(
&self,
context_id: u32,
caller_id: u32,
id: u32,
) -> Result<Message, BoxError> {
let msg = self.redis.load_message(context_id, caller_id, id).await?; let msg = self.redis.load_message(context_id, caller_id, id).await?;
Ok(msg) Ok(msg)
} }
} }