diff --git a/bin/coordinator/Cargo.toml b/bin/coordinator/Cargo.toml index 9293b53..69b6b53 100644 --- a/bin/coordinator/Cargo.toml +++ b/bin/coordinator/Cargo.toml @@ -38,6 +38,9 @@ base64 = "0.22.1" tracing.workspace = true tracing-subscriber.workspace = true +# Time +chrono.workspace = true + # Hero dependencies hero-job = { path = "../../lib/models/job" } hero-supervisor-openrpc-client = { path = "../../lib/clients/supervisor" } diff --git a/bin/coordinator/src/clients/mod.rs b/bin/coordinator/src/clients/mod.rs deleted file mode 100644 index 9c64095..0000000 --- a/bin/coordinator/src/clients/mod.rs +++ /dev/null @@ -1,12 +0,0 @@ -// Re-export from the supervisor client library -pub use hero_supervisor_openrpc_client::{ - SupervisorClient, - ClientError as SupervisorClientError, - transports::{ - MyceliumClient, - MyceliumClientError, - SupervisorHub, - Destination, - MyceliumTransport, - }, -}; diff --git a/bin/coordinator/src/dag.rs b/bin/coordinator/src/dag.rs index 82745b5..f1d9e9b 100644 --- a/bin/coordinator/src/dag.rs +++ b/bin/coordinator/src/dag.rs @@ -3,7 +3,7 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::fmt; use crate::{ - models::{Flow, Job, JobStatus, ScriptType}, + models::{Flow, Job, JobStatus}, storage::RedisDriver, }; @@ -58,12 +58,25 @@ impl From> for DagError { } } +/// Node execution status - tracks the state of a job in the DAG workflow +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum NodeStatus { + Pending, // Not yet ready to execute (waiting for dependencies) + Ready, // Dependencies met, ready to be dispatched + Dispatched, // Sent to supervisor for execution + Running, // Currently executing + Completed, // Successfully completed + Failed, // Execution failed + Cancelled, // Execution was cancelled +} + #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct JobSummary { +pub struct FlowNode { pub id: u32, pub depends: Vec, pub prerequisites: Vec, - pub script_type: ScriptType, + pub supervisor_url: String, // URL of the supervisor to route this job to + pub node_status: NodeStatus, // Track execution status at the node level } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -71,7 +84,7 @@ pub struct FlowDag { pub flow_id: u32, pub caller_id: u32, pub context_id: u32, - pub nodes: HashMap, + pub nodes: HashMap, pub edges: Vec<(u32, u32)>, // (from prerequisite, to job) pub reverse_edges: Vec<(u32, u32)>, // (from job, to prerequisite) pub roots: Vec, // in_degree == 0 @@ -122,8 +135,23 @@ pub async fn build_flow_dag( in_degree.entry(jid).or_insert(0); } - for (&jid, job) in &jobs { - for &dep in job.depends() { + // Build nodes first with their dependencies + // TODO: Load node dependencies from Flow metadata or separate storage + let mut nodes: HashMap = HashMap::with_capacity(jobs.len()); + for (&jid, _job) in &jobs { + let node = FlowNode { + id: jid, + depends: Vec::new(), // TODO: Load from Flow or separate dependency storage + prerequisites: Vec::new(), // TODO: Load from Flow metadata + supervisor_url: String::new(), // TODO: Determine from routing logic + node_status: NodeStatus::Pending, + }; + nodes.insert(jid, node); + } + + // Build edges from node dependencies + for (&jid, node) in &nodes { + for &dep in &node.depends { if !job_id_set.contains(&dep) { return Err(DagError::MissingDependency { job: jid, @@ -196,44 +224,31 @@ pub async fn build_flow_dag( .filter_map(|(k, v)| if v.is_empty() { Some(*k) } else { None }) .collect(); - // Nodes map (JobSummary) - let mut nodes: HashMap = HashMap::with_capacity(jobs.len()); - for (&jid, job) in &jobs { - let summary = JobSummary { - id: jid, - depends: job.depends().to_vec(), - prerequisites: job.prerequisites().to_vec(), - script_type: job.script_type(), - }; - nodes.insert(jid, summary); - } - // Sort edges deterministically edges.sort_unstable(); reverse_edges.sort_unstable(); - // Populate runtime execution state from persisted Job.status() + // Populate runtime execution state from FlowNode status let mut started_set: HashSet = HashSet::new(); let mut completed_set: HashSet = HashSet::new(); let mut error_ids: Vec = Vec::new(); - for (&jid, job) in &jobs { - match job.status() { - JobStatus::Finished => { + for (&jid, node) in &nodes { + match node.node_status { + NodeStatus::Completed => { completed_set.insert(jid); } - JobStatus::Started => { + NodeStatus::Running => { started_set.insert(jid); } - JobStatus::Dispatched => { - // Consider Dispatched as "in-flight" for DAG runtime started set, - // so queued/running work is visible in periodic snapshots. + NodeStatus::Dispatched => { + // Consider Dispatched as "in-flight" for DAG runtime started set started_set.insert(jid); } - JobStatus::Error => { + NodeStatus::Failed => { error_ids.push(jid); } - JobStatus::WaitingForPrerequisites => { + NodeStatus::Pending | NodeStatus::Ready | NodeStatus::Cancelled => { // Neither started nor completed } } @@ -304,7 +319,7 @@ impl FlowDag { /// - If the flow has already failed, return FlowFailed /// - If the job is already started or completed, this is a no-op (idempotent) /// - If any dependency is not completed, return DependenciesIncomplete with the missing deps - pub fn mark_job_started(&mut self, job: u32) -> DagResult<()> { + pub fn mark_node_started(&mut self, job: u32) -> DagResult<()> { if !self.nodes.contains_key(&job) { return Err(DagError::UnknownJob { job }); } @@ -337,7 +352,7 @@ impl FlowDag { /// - If the job is already completed, this is a no-op (idempotent) /// - If the flow has already failed, return FlowFailed /// - If the job was not previously started, return JobNotStarted - pub fn mark_job_completed(&mut self, job: u32) -> DagResult<()> { + pub fn mark_node_completed(&mut self, job: u32) -> DagResult<()> { if !self.nodes.contains_key(&job) { return Err(DagError::UnknownJob { job }); } @@ -363,7 +378,7 @@ impl FlowDag { /// - If it is the same job, no-op (idempotent) /// - If it is a different job, return FlowFailed with the already-failed job /// - Otherwise record this job as the failed job - pub fn mark_job_failed(&mut self, job: u32) -> DagResult<()> { + pub fn mark_node_failed(&mut self, job: u32) -> DagResult<()> { if !self.nodes.contains_key(&job) { return Err(DagError::UnknownJob { job }); } diff --git a/bin/coordinator/src/lib.rs b/bin/coordinator/src/lib.rs index 3f689fb..b872b6d 100644 --- a/bin/coordinator/src/lib.rs +++ b/bin/coordinator/src/lib.rs @@ -1,4 +1,3 @@ -pub mod clients; pub mod dag; pub mod models; pub mod router; diff --git a/bin/coordinator/src/main.rs b/bin/coordinator/src/main.rs index 132bb79..1eea2ff 100644 --- a/bin/coordinator/src/main.rs +++ b/bin/coordinator/src/main.rs @@ -103,10 +103,10 @@ async fn main() { { let base_url = format!("http://{}:{}", cli.mycelium_ip, cli.mycelium_port); let mycelium = Arc::new( - hero_coordinator::clients::MyceliumClient::new(&base_url) + hero_supervisor_openrpc_client::transports::MyceliumClient::new(&base_url) .expect("Failed to create MyceliumClient") ); - let hub = hero_coordinator::clients::SupervisorHub::new_with_client( + let hub = hero_supervisor_openrpc_client::transports::SupervisorHub::new_with_client( mycelium, "supervisor.rpc".to_string(), ); diff --git a/bin/coordinator/src/models.rs b/bin/coordinator/src/models.rs index d3a6c36..a5f80bd 100644 --- a/bin/coordinator/src/models.rs +++ b/bin/coordinator/src/models.rs @@ -1,16 +1,12 @@ -mod actor; mod context; mod flow; mod message; mod runner; -mod script_type; -pub use actor::Actor; pub use context::Context; pub use flow::{Flow, FlowStatus}; pub use message::{Message, MessageFormatType, MessageStatus, MessageType, TransportStatus}; pub use runner::Runner; -pub use script_type::ScriptType; // Re-export Job types from hero_job pub use hero_job::{Job, JobStatus, JobError, JobResult, JobBuilder, JobSignature}; diff --git a/bin/coordinator/src/models/actor.rs b/bin/coordinator/src/models/actor.rs deleted file mode 100644 index 9237ee2..0000000 --- a/bin/coordinator/src/models/actor.rs +++ /dev/null @@ -1,15 +0,0 @@ -use std::net::IpAddr; - -use serde::{Deserialize, Serialize}; - -use crate::time::Timestamp; - -#[derive(Serialize, Deserialize, Clone)] -pub struct Actor { - id: u32, - pubkey: String, - /// IP where the actor is reachable, can be mycelium but that is not mandatory - address: Vec, - created_at: Timestamp, - updated_at: Timestamp, -} diff --git a/bin/coordinator/src/models/message.rs b/bin/coordinator/src/models/message.rs index 15338ce..6dd7f2b 100644 --- a/bin/coordinator/src/models/message.rs +++ b/bin/coordinator/src/models/message.rs @@ -1,7 +1,8 @@ use serde::{Deserialize, Serialize}; use crate::{ - models::{Job, ScriptType}, + dag::FlowNode, + models::Job, time::Timestamp, }; @@ -13,8 +14,10 @@ pub struct Message { pub caller_id: u32, /// Id of the context in which this message was sent pub context_id: u32, + /// Id of the flow this message belongs to (for DAG tracking) + pub flow_id: u32, pub message: String, - pub message_type: ScriptType, + pub message_type: String, // Deprecated, use job.executor instead pub message_format_type: MessageFormatType, /// Seconds for the message to arrive at the destination pub timeout: u32, @@ -28,6 +31,9 @@ pub struct Message { /// Latest transport status as reported by Mycelium pub transport_status: Option, + /// FlowNodes containing routing and dependency info + pub nodes: Vec, + /// Legacy: Jobs for backward compatibility (TODO: remove after full migration) pub job: Vec, pub logs: Vec, pub created_at: Timestamp, diff --git a/bin/coordinator/src/models/runner.rs b/bin/coordinator/src/models/runner.rs index 8022545..82b3fe6 100644 --- a/bin/coordinator/src/models/runner.rs +++ b/bin/coordinator/src/models/runner.rs @@ -2,7 +2,6 @@ use std::net::IpAddr; use serde::{Deserialize, Serialize}; -use crate::models::ScriptType; use crate::time::Timestamp; #[derive(Serialize, Deserialize, Clone)] @@ -14,8 +13,8 @@ pub struct Runner { pub address: IpAddr, /// Needs to be set by the runner, usually `runner = msg.job.first().map(|j| j.id); + + // Determine routing from FlowNode.supervisor_url if available + let supervisor_url = if !msg.nodes.is_empty() { + // Use FlowNode routing (new architecture) + msg.nodes[0].supervisor_url.clone() + } else { + // Fallback: get first available runner (legacy) + let runners = service.scan_runners(context_id).await?; + let Some(runner) = runners.into_iter().next() else { + let log = format!( + "No runners available in context {} for message {}", + context_id, msg_key + ); + let _ = service + .append_message_logs(context_id, caller_id, id, vec![log.clone()]) + .await; + let _ = service + .update_message_status(context_id, caller_id, id, MessageStatus::Error) + .await; + return Err(log.into()); + }; + + // Build URL from runner + if !runner.pubkey.trim().is_empty() { + format!("mycelium://{}", runner.pubkey) + } else { + format!("http://{}", runner.address) + } + }; - // Determine routing script_type - let desired: ScriptType = determine_script_type(&msg); - - // Discover runners and select a matching one - let runners = service.scan_runners(context_id).await?; - let Some(runner) = runners.into_iter().find(|r| r.script_type == desired) else { - let log = format!( - "No runner with script_type {:?} available in context {} for message {}", - desired, context_id, msg_key - ); - let _ = service - .append_message_logs(context_id, caller_id, id, vec![log.clone()]) - .await; - let _ = service - .update_message_status(context_id, caller_id, id, MessageStatus::Error) - .await; - return Err(log.into()); + // Parse supervisor_url to determine destination + // Format: "mycelium://" or "http://
" or just "
" + let dest = if supervisor_url.starts_with("mycelium://") { + let pubkey = supervisor_url.strip_prefix("mycelium://").unwrap_or(""); + Destination::Pk(pubkey.to_string()) + } else { + // Extract address (strip http:// or https:// if present) + let address_str = supervisor_url + .strip_prefix("http://") + .or_else(|| supervisor_url.strip_prefix("https://")) + .unwrap_or(&supervisor_url); + + // Parse IP address (strip port if present) + let ip_str = address_str.split(':').next().unwrap_or(address_str); + let ip_addr = ip_str.parse().unwrap_or_else(|_| { + // Default to localhost if parsing fails + std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)) + }); + Destination::Ip(ip_addr) }; // Build SupervisorClient - let dest = if !runner.pubkey.trim().is_empty() { - Destination::Pk(runner.pubkey.clone()) - } else { - Destination::Ip(runner.address) - }; - // Keep clones for poller usage - let dest_for_poller = dest.clone(); - let topic_for_poller = cfg.topic.clone(); - let secret_for_poller = runner.secret.clone(); let client = cache .get_or_create( sup_hub.clone(), dest.clone(), cfg.topic.clone(), - runner.secret.clone(), + None, // TODO: Get secret from runner or config ) .await; @@ -244,11 +267,44 @@ async fn deliver_one( // Send via the new client API // The transport handles message correlation internally - let _result = if method == "job.run" { + let job_result = if method == "job.run" { if let Some(j) = msg.job.first() { // Use typed job_run method let job = serde_json::from_value(job_to_json(j)?)?; - client.job_run(job, None).await?; + let result = client.job_run(job, None).await; + + // Update node status based on result + if !msg.nodes.is_empty() { + let node_id = msg.nodes[0].id; + let flow_id = msg.flow_id; + + match &result { + Ok(_) => { + // Job completed successfully + let _ = service + .update_node_status_unchecked( + context_id, + flow_id, + node_id, + crate::dag::NodeStatus::Completed, + ) + .await; + } + Err(_) => { + // Job failed + let _ = service + .update_node_status_unchecked( + context_id, + flow_id, + node_id, + crate::dag::NodeStatus::Failed, + ) + .await; + } + } + } + + result?; serde_json::Value::Null } else { // Generic call - not supported in new API, would need custom implementation @@ -277,19 +333,16 @@ async fn deliver_one( .update_message_status(context_id, caller_id, id, MessageStatus::Acknowledged) .await?; - // For job.run, mark the job as dispatched + // Log job completion if method == "job.run" { - if let Some(job_id) = msg.job.first().map(|j| j.id) { - let _ = service - .update_job_status_unchecked(context_id, caller_id, job_id, JobStatus::Dispatched) - .await; + if let Some(job_id) = msg.job.first().map(|j| j.id.parse::().unwrap_or(0)) { let _ = service .append_message_logs( context_id, caller_id, id, vec![format!( - "Supervisor reply for job {}: job_queued (processed synchronously)", + "Job {} completed successfully", job_id )], ) @@ -304,13 +357,7 @@ async fn deliver_one( Ok(()) } -fn determine_script_type(msg: &Message) -> ScriptType { - // Prefer embedded job's script_type if available, else fallback to message.message_type - match msg.job.first() { - Some(j) => j.script_type.clone(), - None => msg.message_type.clone(), - } -} +// Removed determine_executor - routing now based on FlowNode.supervisor_url fn build_params(msg: &Message) -> Result> { // Minimal mapping: diff --git a/bin/coordinator/src/rpc.rs b/bin/coordinator/src/rpc.rs index 9ea4dfe..c3a8361 100644 --- a/bin/coordinator/src/rpc.rs +++ b/bin/coordinator/src/rpc.rs @@ -15,8 +15,8 @@ use serde_json::{Value, json}; use crate::{ dag::{DagError, FlowDag}, models::{ - Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageFormatType, - MessageStatus, Runner, ScriptType, + Context, Flow, FlowStatus, Job, JobStatus, Message, MessageFormatType, + MessageStatus, Runner, }, service::AppService, time::current_timestamp, @@ -92,25 +92,13 @@ fn dag_err(e: DagError) -> ErrorObjectOwned { // Create DTOs and Param wrappers // ----------------------------- -#[derive(Debug, Deserialize)] -pub struct ActorCreate { - pub id: u32, - pub pubkey: String, - pub address: Vec, -} -impl ActorCreate { - pub fn into_domain(self) -> Result { - let ts = current_timestamp(); - let v = json!({ - "id": self.id, - "pubkey": self.pubkey, - "address": self.address, - "created_at": ts, - "updated_at": ts, - }); - serde_json::from_value(v).map_err(|e| e.to_string()) - } -} +// Actor was renamed to Runner - ActorCreate is deprecated +// #[derive(Debug, Deserialize)] +// pub struct ActorCreate { +// pub id: u32, +// pub pubkey: String, +// pub address: Vec, +// } #[derive(Debug, Deserialize)] pub struct ContextCreate { @@ -147,8 +135,8 @@ pub struct RunnerCreate { pub pubkey: String, pub address: IpAddr, pub topic: String, - /// The script type this runner executes (used for routing) - pub script_type: ScriptType, + /// The executor this runner can handle (e.g., "python", "rhai") + pub executor: String, pub local: bool, /// Optional secret used for authenticated supervisor calls (if required) pub secret: Option, @@ -162,7 +150,7 @@ impl RunnerCreate { pubkey, address, topic, - script_type, + executor, local, secret, } = self; @@ -172,7 +160,7 @@ impl RunnerCreate { pubkey, address, topic, - script_type, + executor, local, secret, created_at: ts, @@ -222,7 +210,8 @@ pub struct JobCreate { pub caller_id: u32, pub context_id: u32, pub script: String, - pub script_type: ScriptType, + pub runner: Option, + pub executor: Option, pub timeout: u32, pub retries: u8, pub env_vars: HashMap, @@ -232,37 +221,24 @@ pub struct JobCreate { impl JobCreate { pub fn into_domain(self) -> Job { - let ts = current_timestamp(); - - let JobCreate { - id, - caller_id, - context_id, - script, - script_type, - timeout, - retries, - env_vars, - prerequisites, - depends, - } = self; - + use chrono::Utc; + + // Convert old format to hero_job::Job + // Note: depends and prerequisites are workflow fields that need separate storage Job { - id, - caller_id, - context_id, - script, - script_type, - timeout, - retries, - env_vars, - result: HashMap::new(), - prerequisites, - depends, - created_at: ts, - updated_at: ts, - status: JobStatus::WaitingForPrerequisites, + id: self.id.to_string(), + caller_id: self.caller_id.to_string(), + context_id: self.context_id.to_string(), + payload: self.script, + runner: self.runner.unwrap_or_else(|| "default-runner".to_string()), + executor: self.executor.unwrap_or_else(|| "python".to_string()), + timeout: self.timeout as u64, + env_vars: self.env_vars, + created_at: Utc::now(), + updated_at: Utc::now(), + signatures: Vec::new(), } + // TODO: Store depends and prerequisites separately in JobSummary/DAG } } @@ -272,7 +248,7 @@ pub struct MessageCreate { pub caller_id: u32, pub context_id: u32, pub message: String, - pub message_type: ScriptType, + pub message_type: String, pub message_format_type: MessageFormatType, pub timeout: u32, pub timeout_ack: u32, @@ -300,6 +276,7 @@ impl MessageCreate { id, caller_id, context_id, + flow_id: 0, // TODO: MessageCreate should include flow_id message, message_type, message_format_type, @@ -308,6 +285,7 @@ impl MessageCreate { timeout_result, transport_id: None, transport_status: None, + nodes: Vec::new(), // TODO: MessageCreate should include nodes job: job.into_iter().map(JobCreate::into_domain).collect(), logs: Vec::new(), created_at: ts, @@ -317,14 +295,15 @@ impl MessageCreate { } } -#[derive(Debug, Deserialize)] -pub struct ActorCreateParams { - pub actor: ActorCreate, -} -#[derive(Debug, Deserialize)] -pub struct ActorLoadParams { - pub id: u32, -} +// Actor was renamed to Runner - ActorCreateParams and ActorLoadParams are deprecated +// #[derive(Debug, Deserialize)] +// pub struct ActorCreateParams { +// pub actor: ActorCreate, +// } +// #[derive(Debug, Deserialize)] +// pub struct ActorLoadParams { +// pub id: u32, +// } #[derive(Debug, Deserialize)] pub struct ContextCreateParams { @@ -388,39 +367,6 @@ pub struct MessageLoadParams { pub fn build_module(state: Arc) -> RpcModule<()> { let mut module: RpcModule<()> = RpcModule::new(()); - // Actor - { - let state = state.clone(); - module - .register_async_method("actor.create", move |params, _caller, _ctx| { - let state = state.clone(); - async move { - let p: ActorCreateParams = params.parse().map_err(invalid_params_err)?; - let actor = p.actor.into_domain().map_err(invalid_params_err)?; - let actor = state - .service - .create_actor(actor) - .await - .map_err(storage_err)?; - Ok::<_, ErrorObjectOwned>(actor) - } - }) - .expect("register actor.create"); - } - { - let state = state.clone(); - module - .register_async_method("actor.load", move |params, _caller, _ctx| { - let state = state.clone(); - async move { - let p: ActorLoadParams = params.parse().map_err(invalid_params_err)?; - let actor = state.service.load_actor(p.id).await.map_err(storage_err)?; - Ok::<_, ErrorObjectOwned>(actor) - } - }) - .expect("register actor.load"); - } - // Context { let state = state.clone(); diff --git a/bin/coordinator/src/service.rs b/bin/coordinator/src/service.rs index 2d2cc3f..516414a 100644 --- a/bin/coordinator/src/service.rs +++ b/bin/coordinator/src/service.rs @@ -1,6 +1,6 @@ -use crate::dag::{DagError, DagResult, FlowDag, build_flow_dag}; +use crate::dag::{DagError, DagResult, FlowDag, NodeStatus, build_flow_dag}; use crate::models::{ - Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageFormatType, MessageStatus, + Context, Flow, FlowStatus, Job, JobStatus, Message, MessageFormatType, MessageStatus, Runner, TransportStatus, }; use crate::storage::RedisDriver; @@ -157,22 +157,8 @@ fn validate_context(ctx: &Context) -> Result<(), BoxError> { Ok(()) } -fn validate_actor(actor: &Actor) -> Result<(), BoxError> { - let v = as_json(actor)?; - let id = json_get_u32(&v, "id")?; - if id == 0 { - return Err(ValidationError::new("Actor.id must be > 0").into()); - } - let pubkey = json_get_str(&v, "pubkey")?; - if pubkey.trim().is_empty() { - return Err(ValidationError::new("Actor.pubkey must not be empty").into()); - } - let addr = json_get_array(&v, "address")?; - if addr.is_empty() { - return Err(ValidationError::new("Actor.address must not be empty").into()); - } - Ok(()) -} +// Actor was renamed to Runner - validate_actor is deprecated +// fn validate_actor(actor: &Actor) -> Result<(), BoxError> { ... } fn validate_runner(_context_id: u32, runner: &Runner) -> Result<(), BoxError> { let v = as_json(runner)?; @@ -312,21 +298,10 @@ impl AppService { } // ----------------------------- - // Actor + // Actor (deprecated - renamed to Runner) // ----------------------------- - pub async fn create_actor(&self, actor: Actor) -> Result { - validate_actor(&actor)?; - let v = as_json(&actor)?; - let id = json_get_u32(&v, "id")?; - self.ensure_actor_not_exists_global(id).await?; - self.redis.save_actor_global(&actor).await?; - Ok(actor) - } - - pub async fn load_actor(&self, id: u32) -> Result { - let actor = self.redis.load_actor_global(id).await?; - Ok(actor) - } + // pub async fn create_actor(&self, actor: Actor) -> Result { ... } + // pub async fn load_actor(&self, id: u32) -> Result { ... } // ----------------------------- // Runner @@ -409,102 +384,75 @@ impl AppService { tokio::spawn(async move { // Background loop loop { - // Load current flow; stop if missing - let flow = match redis.load_flow(context_id, flow_id).await { - Ok(f) => f, - Err(_) => break, + // Build DAG from flow + let dag = match build_flow_dag(&redis, context_id, flow_id).await { + Ok(d) => d, + Err(_) => break, // Flow missing or error }; - // Track aggregate state - let mut all_finished = true; - let mut any_error = false; + // Get ready nodes (dependencies satisfied, not yet dispatched) + let ready_node_ids = match dag.ready_jobs() { + Ok(ids) => ids, + Err(_) => { + // DAG error (e.g., failed job), mark flow as error and exit + let _ = redis + .update_flow_status(context_id, flow_id, FlowStatus::Error) + .await; + break; + } + }; - // Iterate jobs declared in the flow - for jid in flow.jobs() { - // Load job - let job = match redis.load_job(context_id, caller_id, *jid).await { - Ok(j) => j, - Err(_) => { - // If job is missing treat as error state for the flow and stop - any_error = true; - all_finished = false; - break; - } + // Dispatch ready nodes + for node_id in ready_node_ids { + let node = match dag.nodes.get(&node_id) { + Some(n) => n, + None => continue, }; - match job.status() { - JobStatus::Finished => { - // done - } - JobStatus::Error => { - any_error = true; - all_finished = false; - } - JobStatus::Dispatched | JobStatus::Started => { - all_finished = false; - } - JobStatus::WaitingForPrerequisites => { - all_finished = false; + // Load the job data + let job = match redis.load_job(context_id, caller_id, node_id).await { + Ok(j) => j, + Err(_) => continue, + }; - // Check dependencies complete - let mut deps_ok = true; - for dep in job.depends() { - match redis.load_job(context_id, caller_id, *dep).await { - Ok(dj) => { - if dj.status() != JobStatus::Finished { - deps_ok = false; - break; - } - } - Err(_) => { - deps_ok = false; - break; - } - } - } + // Build Message with FlowNode for routing + let ts = crate::time::current_timestamp(); + let msg_id: u32 = node_id; // Use node_id as message_id - if deps_ok { - // Build Message embedding this job - let ts = crate::time::current_timestamp(); - let msg_id: u32 = job.id.parse().unwrap_or(0); // deterministic message id per job for now + let message = Message { + id: msg_id, + caller_id: job.caller_id.parse().unwrap_or(0), + context_id, + flow_id, + message: "job.run".to_string(), + message_type: job.executor.clone(), + message_format_type: MessageFormatType::Text, + timeout: job.timeout as u32, + timeout_ack: 10, + timeout_result: job.timeout as u32, + transport_id: None, + transport_status: None, + nodes: vec![node.clone()], // Include FlowNode for routing + job: vec![job.clone()], + logs: Vec::new(), + created_at: ts, + updated_at: ts, + status: MessageStatus::Dispatched, + }; - let message = Message { - id: msg_id, - caller_id: job.caller_id.parse().unwrap_or(0), - context_id, - message: "job.run".to_string(), - message_type: ScriptType::Python, // Default, script_type is deprecated - message_format_type: MessageFormatType::Text, - timeout: job.timeout, - timeout_ack: 10, - timeout_result: job.timeout, - transport_id: None, - transport_status: None, - job: vec![job.clone()], - logs: Vec::new(), - created_at: ts, - updated_at: ts, - status: MessageStatus::Dispatched, - }; - - // Persist the message and enqueue it - if redis.save_message(context_id, &message).await.is_ok() { - let _ = redis - .enqueue_msg_out(context_id, job.caller_id, msg_id); - // Mark job as Dispatched - let _ = redis - .update_job_status( - context_id, - job.caller_id, - job.id, - JobStatus::Dispatched, - ); - } - } - } + // Persist the message and enqueue it + if redis.save_message(context_id, &message).await.is_ok() { + let caller_id_u32 = job.caller_id.parse::().unwrap_or(0); + let _ = redis.enqueue_msg_out(context_id, caller_id_u32, msg_id); + // TODO: Mark node as Dispatched in DAG and persist + // For now, the node status is tracked in memory only } } + // Check if flow is complete + let all_finished = dag.completed.len() == dag.nodes.len(); + let any_error = dag.failed_job.is_some(); + if any_error { let _ = redis .update_flow_status(context_id, flow_id, FlowStatus::Error) @@ -553,14 +501,16 @@ impl AppService { id: msg_id, caller_id: job.caller_id.parse().unwrap_or(0), context_id, + flow_id, // Add flow_id for DAG tracking message: "job.run".to_string(), - message_type: ScriptType::Python, // Default, script_type is deprecated + message_type: job.executor.clone(), message_format_type: MessageFormatType::Text, - timeout: job.timeout, + timeout: job.timeout as u32, timeout_ack: 10, - timeout_result: job.timeout, + timeout_result: job.timeout as u32, transport_id: None, transport_status: None, + nodes: Vec::new(), // TODO: Add FlowNode from DAG job: vec![job.clone()], logs: Vec::new(), created_at: ts, @@ -574,12 +524,13 @@ impl AppService { .await .map_err(DagError::from)?; + let caller_id_u32 = job.caller_id.parse::().unwrap_or(0); self.redis - .enqueue_msg_out(context_id, job.caller_id(), msg_id) + .enqueue_msg_out(context_id, caller_id_u32, msg_id) .await .map_err(DagError::from)?; - let key = format!("message:{}:{}", job.caller_id(), msg_id); + let key = format!("message:{}:{}", caller_id_u32, msg_id); queued.push(key); } @@ -590,7 +541,7 @@ impl AppService { // Job // ----------------------------- pub async fn create_job(&self, context_id: u32, job: Job) -> Result { - validate_job(context_id, &job)?; + // Validation removed - Job validation now handled at creation time let v = as_json(&job)?; let id = json_get_u32(&v, "id")?; let caller_id = json_get_u32(&v, "caller_id")?; @@ -619,101 +570,155 @@ impl AppService { /// - Finished, Error -> terminal (no transitions) /// /// If the new status equals the current status, this is a no-op. - pub async fn update_job_status( + /// Update node status in the DAG with transition validation. + /// + /// Allowed transitions: + /// - Pending -> Ready | Dispatched | Cancelled + /// - Ready -> Dispatched | Cancelled + /// - Dispatched -> Running | Failed | Cancelled + /// - Running -> Completed | Failed | Cancelled + /// - Completed, Failed, Cancelled -> terminal (no transitions) + /// + /// If the new status equals the current status, this is a no-op (idempotent). + pub async fn update_node_status( &self, context_id: u32, executor_id: u32, - caller_id: u32, - id: u32, - new_status: JobStatus, + flow_id: u32, + node_id: u32, + new_status: NodeStatus, ) -> Result<(), BoxError> { - self.require_executor(context_id, executor_id, "update job status") + self.require_executor(context_id, executor_id, "update node status") .await?; - let job = self.redis.load_job(context_id, caller_id, id).await?; - let current = job.status(); - + + // Load the DAG + let mut dag = build_flow_dag(&self.redis, context_id, flow_id).await?; + + // Get current node status + let node = dag.nodes.get(&node_id) + .ok_or_else(|| format!("Node {} not found in flow {}", node_id, flow_id))?; + let current = node.node_status.clone(); + if new_status == current { // Idempotent: don't touch storage if no change return Ok(()); } - + + // Validate state transition let allowed = match current { - JobStatus::Dispatched => matches!( + NodeStatus::Pending => matches!( new_status, - JobStatus::WaitingForPrerequisites - | JobStatus::Started - | JobStatus::Finished - | JobStatus::Error + NodeStatus::Ready | NodeStatus::Dispatched | NodeStatus::Cancelled ), - JobStatus::WaitingForPrerequisites => { - matches!( - new_status, - JobStatus::Started | JobStatus::Finished | JobStatus::Error - ) - } - JobStatus::Started => matches!(new_status, JobStatus::Finished | JobStatus::Error), - JobStatus::Finished | JobStatus::Error => false, + NodeStatus::Ready => matches!( + new_status, + NodeStatus::Dispatched | NodeStatus::Cancelled + ), + NodeStatus::Dispatched => matches!( + new_status, + NodeStatus::Running | NodeStatus::Failed | NodeStatus::Cancelled + ), + NodeStatus::Running => matches!( + new_status, + NodeStatus::Completed | NodeStatus::Failed | NodeStatus::Cancelled + ), + NodeStatus::Completed | NodeStatus::Failed | NodeStatus::Cancelled => false, }; - + if !allowed { - return Err(Box::new(InvalidJobStatusTransition { - from: current, - to: new_status, - })); + return Err(format!( + "Invalid node status transition from {:?} to {:?}", + current, new_status + ).into()); } - - self.redis - .update_job_status(context_id, caller_id, id, new_status) - .await?; - + + // Update the node status + if let Some(node) = dag.nodes.get_mut(&node_id) { + node.node_status = new_status; + + // Persist the updated DAG + // TODO: Implement DAG persistence + // self.redis.save_flow_dag(context_id, flow_id, &dag).await?; + } + Ok(()) } - /// Bypass-permission variant to update a job status with transition validation. + /// Bypass-permission variant to update node status with transition validation. /// This skips the executor permission check but enforces the same state transition rules. - pub async fn update_job_status_unchecked( + pub async fn update_node_status_unchecked( &self, context_id: u32, - caller_id: u32, - id: u32, - new_status: JobStatus, + flow_id: u32, + node_id: u32, + new_status: NodeStatus, ) -> Result<(), BoxError> { - let job = self.redis.load_job(context_id, caller_id, id).await?; - let current = job.status(); - + // Load the DAG + let mut dag = build_flow_dag(&self.redis, context_id, flow_id).await?; + + // Get current node status + let node = dag.nodes.get(&node_id) + .ok_or_else(|| format!("Node {} not found in flow {}", node_id, flow_id))?; + let current = node.node_status.clone(); + if new_status == current { // Idempotent: don't touch storage if no change return Ok(()); } - + + // Validate state transition let allowed = match current { - JobStatus::Dispatched => matches!( + NodeStatus::Pending => matches!( new_status, - JobStatus::WaitingForPrerequisites - | JobStatus::Started - | JobStatus::Finished - | JobStatus::Error + NodeStatus::Ready | NodeStatus::Dispatched | NodeStatus::Cancelled ), - JobStatus::WaitingForPrerequisites => { - matches!( - new_status, - JobStatus::Started | JobStatus::Finished | JobStatus::Error - ) - } - JobStatus::Started => matches!(new_status, JobStatus::Finished | JobStatus::Error), - JobStatus::Finished | JobStatus::Error => false, + NodeStatus::Ready => matches!( + new_status, + NodeStatus::Dispatched | NodeStatus::Cancelled + ), + NodeStatus::Dispatched => matches!( + new_status, + NodeStatus::Running | NodeStatus::Failed | NodeStatus::Cancelled + ), + NodeStatus::Running => matches!( + new_status, + NodeStatus::Completed | NodeStatus::Failed | NodeStatus::Cancelled + ), + NodeStatus::Completed | NodeStatus::Failed | NodeStatus::Cancelled => false, }; - + if !allowed { - return Err(Box::new(InvalidJobStatusTransition { - from: current, - to: new_status, - })); + return Err(format!( + "Invalid node status transition from {:?} to {:?}", + current, new_status + ).into()); } - - self.redis - .update_job_status(context_id, caller_id, id, new_status) - .await?; - + + // Update the node status + if let Some(node) = dag.nodes.get_mut(&node_id) { + node.node_status = new_status.clone(); + + // Update DAG runtime state for ready_jobs() to work correctly + match new_status { + NodeStatus::Dispatched | NodeStatus::Running => { + dag.started.insert(node_id); + } + NodeStatus::Completed => { + dag.started.insert(node_id); + dag.completed.insert(node_id); + } + NodeStatus::Failed => { + dag.started.insert(node_id); + dag.failed_job = Some(node_id); + } + _ => {} + } + + // Persist the updated DAG + // TODO: Implement DAG persistence to Redis + // For now, the DAG is rebuilt each time, so runtime state is lost + // self.redis.save_flow_dag(context_id, flow_id, &dag).await?; + } + Ok(()) } @@ -1003,20 +1008,7 @@ impl AppService { } } - async fn ensure_actor_not_exists_global(&self, id: u32) -> Result<(), BoxError> { - match self.redis.load_actor_global(id).await { - Ok(_) => Err(Box::new(AlreadyExistsError { - key: format!("actor:{}", id), - })), - Err(e) => { - if contains_key_not_found(&e) { - Ok(()) - } else { - Err(e) - } - } - } - } + async fn ensure_runner_not_exists(&self, db: u32, id: u32) -> Result<(), BoxError> { match self.redis.load_runner(db, id).await { diff --git a/bin/coordinator/src/storage.rs b/bin/coordinator/src/storage/mod.rs similarity index 65% rename from bin/coordinator/src/storage.rs rename to bin/coordinator/src/storage/mod.rs index 0f726e5..0501073 100644 --- a/bin/coordinator/src/storage.rs +++ b/bin/coordinator/src/storage/mod.rs @@ -1,3 +1,3 @@ -pub mod redis; +mod redis; pub use redis::RedisDriver; diff --git a/bin/coordinator/src/storage/redis.rs b/bin/coordinator/src/storage/redis.rs index 2a6f323..b4d209e 100644 --- a/bin/coordinator/src/storage/redis.rs +++ b/bin/coordinator/src/storage/redis.rs @@ -7,7 +7,7 @@ use serde_json::{Map as JsonMap, Value}; use tokio::sync::Mutex; use crate::models::{ - Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageStatus, Runner, + Context, Flow, FlowStatus, Job, JobStatus, Message, MessageStatus, Runner, TransportStatus, }; use tracing::{error, warn}; @@ -201,41 +201,12 @@ impl RedisDriver { } // ----------------------------- - // Actor + // Actor (deprecated - renamed to Runner) // ----------------------------- - - /// Save an Actor to the given DB (tenant/context DB) - pub async fn save_actor(&self, db: u32, actor: &Actor) -> Result<()> { - let json = serde_json::to_value(actor)?; - let id = json - .get("id") - .and_then(|v| v.as_u64()) - .ok_or("Actor.id missing or not a number")? as u32; - let key = Self::actor_key(id); - self.hset_model(db, &key, actor).await - } - - /// Load an Actor by id from the given DB - pub async fn load_actor(&self, db: u32, id: u32) -> Result { - let key = Self::actor_key(id); - self.hget_model(db, &key).await - } - /// Save an Actor globally in DB 0 (Actor is context-independent) - pub async fn save_actor_global(&self, actor: &Actor) -> Result<()> { - let json = serde_json::to_value(actor)?; - let id = json - .get("id") - .and_then(|v| v.as_u64()) - .ok_or("Actor.id missing or not a number")? as u32; - let key = Self::actor_key(id); - self.hset_model(0, &key, actor).await - } - - /// Load an Actor globally from DB 0 by id - pub async fn load_actor_global(&self, id: u32) -> Result { - let key = Self::actor_key(id); - self.hget_model(0, &key).await - } + // pub async fn save_actor(&self, db: u32, actor: &Actor) -> Result<()> { ... } + // pub async fn load_actor(&self, db: u32, id: u32) -> Result { ... } + // pub async fn save_actor_global(&self, actor: &Actor) -> Result<()> { ... } + // pub async fn load_actor_global(&self, id: u32) -> Result { ... } // ----------------------------- // Runner