diff --git a/src/rpc.rs b/src/rpc.rs index c481e1c..2a35123 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -541,6 +541,23 @@ pub fn build_module(state: Arc) -> RpcModule<()> { }) .expect("register flow.dag"); } + { + let state = state.clone(); + module + .register_async_method("flow.start", move |params, _caller, _ctx| { + let state = state.clone(); + async move { + let p: FlowLoadParams = params.parse().map_err(invalid_params_err)?; + let started: bool = state + .service + .flow_start(p.context_id, p.id) + .await + .map_err(storage_err)?; + Ok::<_, ErrorObjectOwned>(started) + } + }) + .expect("register flow.start"); + } // Job { diff --git a/src/service.rs b/src/service.rs index 3047f44..b330323 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,12 +1,16 @@ -use crate::dag::{DagResult, FlowDag, build_flow_dag}; +use crate::dag::{DagError, DagResult, FlowDag, build_flow_dag}; use crate::models::{ - Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageStatus, Runner, + Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageFormatType, MessageStatus, + Runner, }; use crate::storage::RedisDriver; use serde::Serialize; use serde_json::Value; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use tokio::sync::Mutex; +use tokio::time::{sleep, Duration}; pub type BoxError = Box; @@ -112,10 +116,10 @@ fn contains_key_not_found(e: &BoxError) -> bool { fn has_duplicate_u32s(list: &Vec) -> bool { let mut seen = std::collections::HashSet::new(); for it in list { - if let Some(x) = it.as_u64() - && !seen.insert(x) - { - return true; + if let Some(x) = it.as_u64() { + if !seen.insert(x) { + return true; + } } } false @@ -306,12 +310,16 @@ fn validate_message(context_id: u32, msg: &Message) -> Result<(), BoxError> { // ----------------------------- pub struct AppService { - redis: RedisDriver, + redis: Arc, + schedulers: Arc>>, } impl AppService { pub fn new(redis: RedisDriver) -> Self { - Self { redis } + Self { + redis: Arc::new(redis), + schedulers: Arc::new(Mutex::new(HashSet::new())), + } } // ----------------------------- @@ -395,6 +403,371 @@ impl AppService { build_flow_dag(&self.redis, context_id, flow_id).await } + /// Start a background scheduler for a flow. + /// - Ticks every 1 second. + /// - Sets Flow status to Started immediately. + /// - Dispatches jobs whose dependencies are Finished: creates a Message and LPUSHes its key into msg_out, + /// and marks the job status to Dispatched. + /// - When all jobs are Finished sets Flow to Finished; if any job is Error sets Flow to Error. + /// Returns: + /// - Ok(true) if a scheduler was started + /// - Ok(false) if a scheduler was already running for this (context_id, flow_id) + pub async fn flow_start(&self, context_id: u32, flow_id: u32) -> Result { + // Ensure flow exists (and load caller_id) + let flow = self.redis.load_flow(context_id, flow_id).await?; + let caller_id = flow.caller_id(); + + // Try to register this flow in the active scheduler set + { + let mut guard = self.schedulers.lock().await; + if !guard.insert((context_id, flow_id)) { + // Already running + return Ok(false); + } + } + + // Clone resources for background task + let redis = self.redis.clone(); + let schedulers = self.schedulers.clone(); + + // Set Flow status to Started + let _ = redis + .update_flow_status(context_id, flow_id, FlowStatus::Started) + .await; + + tokio::spawn(async move { + // Background loop + loop { + // Load current flow; stop if missing + let flow = match redis.load_flow(context_id, flow_id).await { + Ok(f) => f, + Err(_) => break, + }; + + // Track aggregate state + let mut all_finished = true; + let mut any_error = false; + + // Iterate jobs declared in the flow + for jid in flow.jobs() { + // Load job + let job = match redis.load_job(context_id, caller_id, *jid).await { + Ok(j) => j, + Err(_) => { + // If job is missing treat as error state for the flow and stop + any_error = true; + all_finished = false; + break; + } + }; + + match job.status() { + JobStatus::Finished => { + // done + } + JobStatus::Error => { + any_error = true; + all_finished = false; + } + JobStatus::Dispatched | JobStatus::Started => { + all_finished = false; + } + JobStatus::WaitingForPrerequisites => { + all_finished = false; + + // Check dependencies complete + let mut deps_ok = true; + for dep in job.depends() { + match redis.load_job(context_id, caller_id, *dep).await { + Ok(dj) => { + if dj.status() != JobStatus::Finished { + deps_ok = false; + break; + } + } + Err(_) => { + deps_ok = false; + break; + } + } + } + + if deps_ok { + // Build Message embedding this job + let ts = crate::time::current_timestamp(); + let msg_id: u32 = job.id(); // deterministic message id per job for now + + let message = Message { + id: msg_id, + caller_id: job.caller_id(), + context_id, + message: "job.run".to_string(), + message_type: job.script_type(), + message_format_type: MessageFormatType::Text, + timeout: job.timeout, + timeout_ack: 10, + timeout_result: job.timeout, + job: vec![job.clone()], + logs: Vec::new(), + created_at: ts, + updated_at: ts, + status: MessageStatus::Dispatched, + }; + + // Persist the message and enqueue it + if redis.save_message(context_id, &message).await.is_ok() { + let _ = redis + .enqueue_msg_out(context_id, job.caller_id(), msg_id) + .await; + // Mark job as Dispatched + let _ = redis + .update_job_status( + context_id, + job.caller_id(), + job.id(), + JobStatus::Dispatched, + ) + .await; + } + } + } + } + } + + if any_error { + let _ = redis + .update_flow_status(context_id, flow_id, FlowStatus::Error) + .await; + break; + } + if all_finished { + let _ = redis + .update_flow_status(context_id, flow_id, FlowStatus::Finished) + .await; + break; + } + + sleep(Duration::from_secs(1)).await; + } + + // Remove from active schedulers set + let mut guard = schedulers.lock().await; + guard.remove(&(context_id, flow_id)); + }); + + Ok(true) + } + + /// Start a background scheduler for a flow. + /// - Ticks every 1 second. + /// - Sets Flow status to Started immediately. + /// - Dispatches jobs whose dependencies are Finished: creates a Message and LPUSHes its key into msg_out, + /// and marks the job status to Dispatched. + /// - When all jobs are Finished sets Flow to Finished; if any job is Error sets Flow to Error. + /// Returns: + /// - Ok(true) if a scheduler was started + /// - Ok(false) if a scheduler was already running for this (context_id, flow_id) + pub async fn flow_start(&self, context_id: u32, flow_id: u32) -> Result { + // Ensure flow exists (and load caller_id) + let flow = self.redis.load_flow(context_id, flow_id).await?; + let caller_id = flow.caller_id(); + + // Try to register this flow in the active scheduler set + { + let mut guard = self.schedulers.lock().await; + if !guard.insert((context_id, flow_id)) { + // Already running + return Ok(false); + } + } + + // Clone resources for background task + let redis = self.redis.clone(); + let schedulers = self.schedulers.clone(); + + // Set Flow status to Started + let _ = redis + .update_flow_status(context_id, flow_id, FlowStatus::Started) + .await; + + tokio::spawn(async move { + // Background loop + loop { + // Load current flow; stop if missing + let flow = match redis.load_flow(context_id, flow_id).await { + Ok(f) => f, + Err(_) => break, + }; + + // Track aggregate state + let mut all_finished = true; + let mut any_error = false; + + // Iterate jobs declared in the flow + for jid in flow.jobs() { + // Load job + let job = match redis.load_job(context_id, caller_id, *jid).await { + Ok(j) => j, + Err(_) => { + // If job is missing treat as error state for the flow and stop + any_error = true; + all_finished = false; + break; + } + }; + + match job.status() { + JobStatus::Finished => { + // done + } + JobStatus::Error => { + any_error = true; + all_finished = false; + } + JobStatus::Dispatched | JobStatus::Started => { + all_finished = false; + } + JobStatus::WaitingForPrerequisites => { + all_finished = false; + + // Check dependencies complete + let mut deps_ok = true; + for dep in job.depends() { + match redis.load_job(context_id, caller_id, *dep).await { + Ok(dj) => { + if dj.status() != JobStatus::Finished { + deps_ok = false; + break; + } + } + Err(_) => { + deps_ok = false; + break; + } + } + } + + if deps_ok { + // Build Message embedding this job + let ts = crate::time::current_timestamp(); + let msg_id: u32 = job.id(); // deterministic message id per job for now + + let message = Message { + id: msg_id, + caller_id: job.caller_id(), + context_id, + message: "job.run".to_string(), + message_type: job.script_type(), + message_format_type: MessageFormatType::Text, + timeout: job.timeout, + timeout_ack: 10, + timeout_result: job.timeout, + job: vec![job.clone()], + logs: Vec::new(), + created_at: ts, + updated_at: ts, + status: MessageStatus::Dispatched, + }; + + // Persist the message and enqueue it + if redis.save_message(context_id, &message).await.is_ok() { + let _ = redis + .enqueue_msg_out(context_id, job.caller_id(), msg_id) + .await; + // Mark job as Dispatched + let _ = redis + .update_job_status( + context_id, + job.caller_id(), + job.id(), + JobStatus::Dispatched, + ) + .await; + } + } + } + } + } + + if any_error { + let _ = redis + .update_flow_status(context_id, flow_id, FlowStatus::Error) + .await; + break; + } + if all_finished { + let _ = redis + .update_flow_status(context_id, flow_id, FlowStatus::Finished) + .await; + break; + } + + sleep(Duration::from_secs(1)).await; + } + + // Remove from active schedulers set + let mut guard = schedulers.lock().await; + guard.remove(&(context_id, flow_id)); + }); + + Ok(true) + } + + /// Execute a flow: compute DAG, create Message entries for ready jobs, and enqueue their keys to msg_out. + /// Returns the list of enqueued message keys ("message:{caller_id}:{id}") in deterministic order (by job id). + pub async fn flow_execute(&self, context_id: u32, flow_id: u32) -> DagResult> { + let dag = build_flow_dag(&self.redis, context_id, flow_id).await?; + let mut ready = dag.ready_jobs()?; + ready.sort_unstable(); + + let mut queued: Vec = Vec::with_capacity(ready.len()); + for jid in ready { + // Load the concrete Job + let job = self + .redis + .load_job(context_id, dag.caller_id, jid) + .await + .map_err(DagError::from)?; + + // Build a Message that embeds this job + let ts = crate::time::current_timestamp(); + let msg_id: u32 = job.id(); // deterministic; adjust strategy later if needed + + let message = Message { + id: msg_id, + caller_id: job.caller_id(), + context_id, + message: "job.run".to_string(), + message_type: job.script_type(), // uses ScriptType (matches model) + message_format_type: MessageFormatType::Text, + timeout: job.timeout, + timeout_ack: 10, + timeout_result: job.timeout, + job: vec![job.clone()], + logs: Vec::new(), + created_at: ts, + updated_at: ts, + status: MessageStatus::Dispatched, + }; + + // Persist the Message and enqueue its key to the outbound queue + let _ = self + .create_message(context_id, message) + .await + .map_err(DagError::from)?; + + self.redis + .enqueue_msg_out(context_id, job.caller_id(), msg_id) + .await + .map_err(DagError::from)?; + + let key = format!("message:{}:{}", job.caller_id(), msg_id); + queued.push(key); + } + + Ok(queued) + } + // ----------------------------- // Job // ----------------------------- diff --git a/src/storage/redis.rs b/src/storage/redis.rs index c0b7e11..56d0ae9 100644 --- a/src/storage/redis.rs +++ b/src/storage/redis.rs @@ -533,4 +533,22 @@ impl RedisDriver { let _: usize = cm.hset_multiple(key, &pairs).await?; Ok(()) } + + // ----------------------------- + // Queues (lists) + // ----------------------------- + + /// Push a value onto a Redis list using LPUSH in the given DB. + pub async fn lpush_list(&self, db: u32, list: &str, value: &str) -> Result<()> { + let mut cm = self.manager_for_db(db).await?; + let _: i64 = cm.lpush(list, value).await?; + Ok(()) + } + + /// Enqueue a message key onto the outbound queue (msg_out). + /// The value is the canonical message key "message:{caller_id}:{id}". + pub async fn enqueue_msg_out(&self, db: u32, caller_id: u32, id: u32) -> Result<()> { + let key = Self::message_key(caller_id, id); + self.lpush_list(db, "msg_out", &key).await + } }