Convert jobs to messages

Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
Lee Smet
2025-08-27 15:33:43 +02:00
parent fde456fd5e
commit 6f7fded175
3 changed files with 417 additions and 9 deletions

View File

@@ -541,6 +541,23 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
}) })
.expect("register flow.dag"); .expect("register flow.dag");
} }
{
let state = state.clone();
module
.register_async_method("flow.start", move |params, _caller, _ctx| {
let state = state.clone();
async move {
let p: FlowLoadParams = params.parse().map_err(invalid_params_err)?;
let started: bool = state
.service
.flow_start(p.context_id, p.id)
.await
.map_err(storage_err)?;
Ok::<_, ErrorObjectOwned>(started)
}
})
.expect("register flow.start");
}
// Job // Job
{ {

View File

@@ -1,12 +1,16 @@
use crate::dag::{DagResult, FlowDag, build_flow_dag}; use crate::dag::{DagError, DagResult, FlowDag, build_flow_dag};
use crate::models::{ use crate::models::{
Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageStatus, Runner, Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageFormatType, MessageStatus,
Runner,
}; };
use crate::storage::RedisDriver; use crate::storage::RedisDriver;
use serde::Serialize; use serde::Serialize;
use serde_json::Value; use serde_json::Value;
use std::collections::HashMap; use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{sleep, Duration};
pub type BoxError = Box<dyn std::error::Error + Send + Sync>; pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
@@ -112,10 +116,10 @@ fn contains_key_not_found(e: &BoxError) -> bool {
fn has_duplicate_u32s(list: &Vec<Value>) -> bool { fn has_duplicate_u32s(list: &Vec<Value>) -> bool {
let mut seen = std::collections::HashSet::new(); let mut seen = std::collections::HashSet::new();
for it in list { for it in list {
if let Some(x) = it.as_u64() if let Some(x) = it.as_u64() {
&& !seen.insert(x) if !seen.insert(x) {
{ return true;
return true; }
} }
} }
false false
@@ -306,12 +310,16 @@ fn validate_message(context_id: u32, msg: &Message) -> Result<(), BoxError> {
// ----------------------------- // -----------------------------
pub struct AppService { pub struct AppService {
redis: RedisDriver, redis: Arc<RedisDriver>,
schedulers: Arc<Mutex<HashSet<(u32, u32)>>>,
} }
impl AppService { impl AppService {
pub fn new(redis: RedisDriver) -> Self { pub fn new(redis: RedisDriver) -> Self {
Self { redis } Self {
redis: Arc::new(redis),
schedulers: Arc::new(Mutex::new(HashSet::new())),
}
} }
// ----------------------------- // -----------------------------
@@ -395,6 +403,371 @@ impl AppService {
build_flow_dag(&self.redis, context_id, flow_id).await build_flow_dag(&self.redis, context_id, flow_id).await
} }
/// Start a background scheduler for a flow.
/// - Ticks every 1 second.
/// - Sets Flow status to Started immediately.
/// - Dispatches jobs whose dependencies are Finished: creates a Message and LPUSHes its key into msg_out,
/// and marks the job status to Dispatched.
/// - When all jobs are Finished sets Flow to Finished; if any job is Error sets Flow to Error.
/// Returns:
/// - Ok(true) if a scheduler was started
/// - Ok(false) if a scheduler was already running for this (context_id, flow_id)
pub async fn flow_start(&self, context_id: u32, flow_id: u32) -> Result<bool, BoxError> {
// Ensure flow exists (and load caller_id)
let flow = self.redis.load_flow(context_id, flow_id).await?;
let caller_id = flow.caller_id();
// Try to register this flow in the active scheduler set
{
let mut guard = self.schedulers.lock().await;
if !guard.insert((context_id, flow_id)) {
// Already running
return Ok(false);
}
}
// Clone resources for background task
let redis = self.redis.clone();
let schedulers = self.schedulers.clone();
// Set Flow status to Started
let _ = redis
.update_flow_status(context_id, flow_id, FlowStatus::Started)
.await;
tokio::spawn(async move {
// Background loop
loop {
// Load current flow; stop if missing
let flow = match redis.load_flow(context_id, flow_id).await {
Ok(f) => f,
Err(_) => break,
};
// Track aggregate state
let mut all_finished = true;
let mut any_error = false;
// Iterate jobs declared in the flow
for jid in flow.jobs() {
// Load job
let job = match redis.load_job(context_id, caller_id, *jid).await {
Ok(j) => j,
Err(_) => {
// If job is missing treat as error state for the flow and stop
any_error = true;
all_finished = false;
break;
}
};
match job.status() {
JobStatus::Finished => {
// done
}
JobStatus::Error => {
any_error = true;
all_finished = false;
}
JobStatus::Dispatched | JobStatus::Started => {
all_finished = false;
}
JobStatus::WaitingForPrerequisites => {
all_finished = false;
// Check dependencies complete
let mut deps_ok = true;
for dep in job.depends() {
match redis.load_job(context_id, caller_id, *dep).await {
Ok(dj) => {
if dj.status() != JobStatus::Finished {
deps_ok = false;
break;
}
}
Err(_) => {
deps_ok = false;
break;
}
}
}
if deps_ok {
// Build Message embedding this job
let ts = crate::time::current_timestamp();
let msg_id: u32 = job.id(); // deterministic message id per job for now
let message = Message {
id: msg_id,
caller_id: job.caller_id(),
context_id,
message: "job.run".to_string(),
message_type: job.script_type(),
message_format_type: MessageFormatType::Text,
timeout: job.timeout,
timeout_ack: 10,
timeout_result: job.timeout,
job: vec![job.clone()],
logs: Vec::new(),
created_at: ts,
updated_at: ts,
status: MessageStatus::Dispatched,
};
// Persist the message and enqueue it
if redis.save_message(context_id, &message).await.is_ok() {
let _ = redis
.enqueue_msg_out(context_id, job.caller_id(), msg_id)
.await;
// Mark job as Dispatched
let _ = redis
.update_job_status(
context_id,
job.caller_id(),
job.id(),
JobStatus::Dispatched,
)
.await;
}
}
}
}
}
if any_error {
let _ = redis
.update_flow_status(context_id, flow_id, FlowStatus::Error)
.await;
break;
}
if all_finished {
let _ = redis
.update_flow_status(context_id, flow_id, FlowStatus::Finished)
.await;
break;
}
sleep(Duration::from_secs(1)).await;
}
// Remove from active schedulers set
let mut guard = schedulers.lock().await;
guard.remove(&(context_id, flow_id));
});
Ok(true)
}
/// Start a background scheduler for a flow.
/// - Ticks every 1 second.
/// - Sets Flow status to Started immediately.
/// - Dispatches jobs whose dependencies are Finished: creates a Message and LPUSHes its key into msg_out,
/// and marks the job status to Dispatched.
/// - When all jobs are Finished sets Flow to Finished; if any job is Error sets Flow to Error.
/// Returns:
/// - Ok(true) if a scheduler was started
/// - Ok(false) if a scheduler was already running for this (context_id, flow_id)
pub async fn flow_start(&self, context_id: u32, flow_id: u32) -> Result<bool, BoxError> {
// Ensure flow exists (and load caller_id)
let flow = self.redis.load_flow(context_id, flow_id).await?;
let caller_id = flow.caller_id();
// Try to register this flow in the active scheduler set
{
let mut guard = self.schedulers.lock().await;
if !guard.insert((context_id, flow_id)) {
// Already running
return Ok(false);
}
}
// Clone resources for background task
let redis = self.redis.clone();
let schedulers = self.schedulers.clone();
// Set Flow status to Started
let _ = redis
.update_flow_status(context_id, flow_id, FlowStatus::Started)
.await;
tokio::spawn(async move {
// Background loop
loop {
// Load current flow; stop if missing
let flow = match redis.load_flow(context_id, flow_id).await {
Ok(f) => f,
Err(_) => break,
};
// Track aggregate state
let mut all_finished = true;
let mut any_error = false;
// Iterate jobs declared in the flow
for jid in flow.jobs() {
// Load job
let job = match redis.load_job(context_id, caller_id, *jid).await {
Ok(j) => j,
Err(_) => {
// If job is missing treat as error state for the flow and stop
any_error = true;
all_finished = false;
break;
}
};
match job.status() {
JobStatus::Finished => {
// done
}
JobStatus::Error => {
any_error = true;
all_finished = false;
}
JobStatus::Dispatched | JobStatus::Started => {
all_finished = false;
}
JobStatus::WaitingForPrerequisites => {
all_finished = false;
// Check dependencies complete
let mut deps_ok = true;
for dep in job.depends() {
match redis.load_job(context_id, caller_id, *dep).await {
Ok(dj) => {
if dj.status() != JobStatus::Finished {
deps_ok = false;
break;
}
}
Err(_) => {
deps_ok = false;
break;
}
}
}
if deps_ok {
// Build Message embedding this job
let ts = crate::time::current_timestamp();
let msg_id: u32 = job.id(); // deterministic message id per job for now
let message = Message {
id: msg_id,
caller_id: job.caller_id(),
context_id,
message: "job.run".to_string(),
message_type: job.script_type(),
message_format_type: MessageFormatType::Text,
timeout: job.timeout,
timeout_ack: 10,
timeout_result: job.timeout,
job: vec![job.clone()],
logs: Vec::new(),
created_at: ts,
updated_at: ts,
status: MessageStatus::Dispatched,
};
// Persist the message and enqueue it
if redis.save_message(context_id, &message).await.is_ok() {
let _ = redis
.enqueue_msg_out(context_id, job.caller_id(), msg_id)
.await;
// Mark job as Dispatched
let _ = redis
.update_job_status(
context_id,
job.caller_id(),
job.id(),
JobStatus::Dispatched,
)
.await;
}
}
}
}
}
if any_error {
let _ = redis
.update_flow_status(context_id, flow_id, FlowStatus::Error)
.await;
break;
}
if all_finished {
let _ = redis
.update_flow_status(context_id, flow_id, FlowStatus::Finished)
.await;
break;
}
sleep(Duration::from_secs(1)).await;
}
// Remove from active schedulers set
let mut guard = schedulers.lock().await;
guard.remove(&(context_id, flow_id));
});
Ok(true)
}
/// Execute a flow: compute DAG, create Message entries for ready jobs, and enqueue their keys to msg_out.
/// Returns the list of enqueued message keys ("message:{caller_id}:{id}") in deterministic order (by job id).
pub async fn flow_execute(&self, context_id: u32, flow_id: u32) -> DagResult<Vec<String>> {
let dag = build_flow_dag(&self.redis, context_id, flow_id).await?;
let mut ready = dag.ready_jobs()?;
ready.sort_unstable();
let mut queued: Vec<String> = Vec::with_capacity(ready.len());
for jid in ready {
// Load the concrete Job
let job = self
.redis
.load_job(context_id, dag.caller_id, jid)
.await
.map_err(DagError::from)?;
// Build a Message that embeds this job
let ts = crate::time::current_timestamp();
let msg_id: u32 = job.id(); // deterministic; adjust strategy later if needed
let message = Message {
id: msg_id,
caller_id: job.caller_id(),
context_id,
message: "job.run".to_string(),
message_type: job.script_type(), // uses ScriptType (matches model)
message_format_type: MessageFormatType::Text,
timeout: job.timeout,
timeout_ack: 10,
timeout_result: job.timeout,
job: vec![job.clone()],
logs: Vec::new(),
created_at: ts,
updated_at: ts,
status: MessageStatus::Dispatched,
};
// Persist the Message and enqueue its key to the outbound queue
let _ = self
.create_message(context_id, message)
.await
.map_err(DagError::from)?;
self.redis
.enqueue_msg_out(context_id, job.caller_id(), msg_id)
.await
.map_err(DagError::from)?;
let key = format!("message:{}:{}", job.caller_id(), msg_id);
queued.push(key);
}
Ok(queued)
}
// ----------------------------- // -----------------------------
// Job // Job
// ----------------------------- // -----------------------------

View File

@@ -533,4 +533,22 @@ impl RedisDriver {
let _: usize = cm.hset_multiple(key, &pairs).await?; let _: usize = cm.hset_multiple(key, &pairs).await?;
Ok(()) Ok(())
} }
// -----------------------------
// Queues (lists)
// -----------------------------
/// Push a value onto a Redis list using LPUSH in the given DB.
pub async fn lpush_list(&self, db: u32, list: &str, value: &str) -> Result<()> {
let mut cm = self.manager_for_db(db).await?;
let _: i64 = cm.lpush(list, value).await?;
Ok(())
}
/// Enqueue a message key onto the outbound queue (msg_out).
/// The value is the canonical message key "message:{caller_id}:{id}".
pub async fn enqueue_msg_out(&self, db: u32, caller_id: u32, id: u32) -> Result<()> {
let key = Self::message_key(caller_id, id);
self.lpush_list(db, "msg_out", &key).await
}
} }