Compare commits
2 Commits
eb69a44039
...
1939a3d09d
Author | SHA1 | Date | |
---|---|---|---|
|
1939a3d09d
|
||
|
ec91a15131
|
@@ -241,6 +241,41 @@
|
|||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"name": "flow.dag",
|
||||||
|
"summary": "Compute and return the execution DAG for a Flow",
|
||||||
|
"params": [
|
||||||
|
{
|
||||||
|
"name": "params",
|
||||||
|
"schema": {
|
||||||
|
"$ref": "#/components/schemas/FlowLoadParams"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"result": {
|
||||||
|
"name": "result",
|
||||||
|
"schema": {
|
||||||
|
"$ref": "#/components/schemas/FlowDag"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"errors": [
|
||||||
|
{
|
||||||
|
"$ref": "#/components/errors/InvalidParams"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$ref": "#/components/errors/NotFound"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$ref": "#/components/errors/StorageError"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$ref": "#/components/errors/DagMissingDependency"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"$ref": "#/components/errors/DagCycleDetected"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"name": "job.create",
|
"name": "job.create",
|
||||||
"summary": "Create/Upsert Job in a context",
|
"summary": "Create/Upsert Job in a context",
|
||||||
@@ -747,6 +782,125 @@
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"JobSummary": {
|
||||||
|
"type": "object",
|
||||||
|
"required": [
|
||||||
|
"id",
|
||||||
|
"depends",
|
||||||
|
"prerequisites",
|
||||||
|
"script_type"
|
||||||
|
],
|
||||||
|
"properties": {
|
||||||
|
"id": {
|
||||||
|
"type": "integer",
|
||||||
|
"format": "uint32"
|
||||||
|
},
|
||||||
|
"depends": {
|
||||||
|
"type": "array",
|
||||||
|
"items": {
|
||||||
|
"type": "integer",
|
||||||
|
"format": "uint32"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"prerequisites": {
|
||||||
|
"type": "array",
|
||||||
|
"items": {
|
||||||
|
"type": "string"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"script_type": {
|
||||||
|
"$ref": "#/components/schemas/ScriptType"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"EdgeTuple": {
|
||||||
|
"type": "array",
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"type": "integer",
|
||||||
|
"format": "uint32"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "integer",
|
||||||
|
"format": "uint32"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"minItems": 2,
|
||||||
|
"maxItems": 2,
|
||||||
|
"description": "Tuple [from, to] representing a directed edge"
|
||||||
|
},
|
||||||
|
"FlowDag": {
|
||||||
|
"type": "object",
|
||||||
|
"required": [
|
||||||
|
"flow_id",
|
||||||
|
"caller_id",
|
||||||
|
"context_id",
|
||||||
|
"nodes",
|
||||||
|
"edges",
|
||||||
|
"reverse_edges",
|
||||||
|
"roots",
|
||||||
|
"leaves",
|
||||||
|
"levels"
|
||||||
|
],
|
||||||
|
"properties": {
|
||||||
|
"flow_id": {
|
||||||
|
"type": "integer",
|
||||||
|
"format": "uint32"
|
||||||
|
},
|
||||||
|
"caller_id": {
|
||||||
|
"type": "integer",
|
||||||
|
"format": "uint32"
|
||||||
|
},
|
||||||
|
"context_id": {
|
||||||
|
"type": "integer",
|
||||||
|
"format": "uint32"
|
||||||
|
},
|
||||||
|
"nodes": {
|
||||||
|
"type": "object",
|
||||||
|
"additionalProperties": {
|
||||||
|
"$ref": "#/components/schemas/JobSummary"
|
||||||
|
},
|
||||||
|
"description": "Map keyed by job id (serialized as string in JSON)"
|
||||||
|
},
|
||||||
|
"edges": {
|
||||||
|
"type": "array",
|
||||||
|
"items": {
|
||||||
|
"$ref": "#/components/schemas/EdgeTuple"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"reverse_edges": {
|
||||||
|
"type": "array",
|
||||||
|
"items": {
|
||||||
|
"$ref": "#/components/schemas/EdgeTuple"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"roots": {
|
||||||
|
"type": "array",
|
||||||
|
"items": {
|
||||||
|
"type": "integer",
|
||||||
|
"format": "uint32"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"leaves": {
|
||||||
|
"type": "array",
|
||||||
|
"items": {
|
||||||
|
"type": "integer",
|
||||||
|
"format": "uint32"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"levels": {
|
||||||
|
"type": "array",
|
||||||
|
"items": {
|
||||||
|
"type": "array",
|
||||||
|
"items": {
|
||||||
|
"type": "integer",
|
||||||
|
"format": "uint32"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"description": "Topological execution layers (parallelizable batches)"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
"ActorCreate": {
|
"ActorCreate": {
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"required": [
|
"required": [
|
||||||
@@ -1219,6 +1373,14 @@
|
|||||||
"StorageError": {
|
"StorageError": {
|
||||||
"code": -32010,
|
"code": -32010,
|
||||||
"message": "Storage Error"
|
"message": "Storage Error"
|
||||||
|
},
|
||||||
|
"DagMissingDependency": {
|
||||||
|
"code": -32020,
|
||||||
|
"message": "DAG Missing Dependency"
|
||||||
|
},
|
||||||
|
"DagCycleDetected": {
|
||||||
|
"code": -32021,
|
||||||
|
"message": "DAG Cycle Detected"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
207
src/dag.rs
Normal file
207
src/dag.rs
Normal file
@@ -0,0 +1,207 @@
|
|||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::collections::{HashMap, HashSet, VecDeque};
|
||||||
|
use std::fmt;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
models::{Flow, Job, ScriptType},
|
||||||
|
storage::RedisDriver,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub type DagResult<T> = Result<T, DagError>;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum DagError {
|
||||||
|
Storage(Box<dyn std::error::Error + Send + Sync>),
|
||||||
|
MissingDependency { job: u32, depends_on: u32 },
|
||||||
|
CycleDetected { remaining: Vec<u32> },
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for DagError {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
DagError::Storage(e) => write!(f, "Storage error: {}", e),
|
||||||
|
DagError::MissingDependency { job, depends_on } => write!(
|
||||||
|
f,
|
||||||
|
"Job {} depends on {}, which is not part of the flow.jobs list",
|
||||||
|
job, depends_on
|
||||||
|
),
|
||||||
|
DagError::CycleDetected { remaining } => {
|
||||||
|
write!(f, "Cycle detected; unresolved nodes: {:?}", remaining)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::error::Error for DagError {}
|
||||||
|
|
||||||
|
impl From<Box<dyn std::error::Error + Send + Sync>> for DagError {
|
||||||
|
fn from(e: Box<dyn std::error::Error + Send + Sync>) -> Self {
|
||||||
|
DagError::Storage(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct JobSummary {
|
||||||
|
pub id: u32,
|
||||||
|
pub depends: Vec<u32>,
|
||||||
|
pub prerequisites: Vec<String>,
|
||||||
|
pub script_type: ScriptType,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct FlowDag {
|
||||||
|
pub flow_id: u32,
|
||||||
|
pub caller_id: u32,
|
||||||
|
pub context_id: u32,
|
||||||
|
pub nodes: HashMap<u32, JobSummary>,
|
||||||
|
pub edges: Vec<(u32, u32)>, // (from prerequisite, to job)
|
||||||
|
pub reverse_edges: Vec<(u32, u32)>, // (from job, to prerequisite)
|
||||||
|
pub roots: Vec<u32>, // in_degree == 0
|
||||||
|
pub leaves: Vec<u32>, // out_degree == 0
|
||||||
|
pub levels: Vec<Vec<u32>>, // topological layers for parallel execution
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn build_flow_dag(
|
||||||
|
redis: &RedisDriver,
|
||||||
|
context_id: u32,
|
||||||
|
flow_id: u32,
|
||||||
|
) -> DagResult<FlowDag> {
|
||||||
|
// Load flow
|
||||||
|
let flow: Flow = redis
|
||||||
|
.load_flow(context_id, flow_id)
|
||||||
|
.await
|
||||||
|
.map_err(DagError::from)?;
|
||||||
|
let caller_id = flow.caller_id();
|
||||||
|
let flow_job_ids = flow.jobs().clone();
|
||||||
|
|
||||||
|
// Build a set for faster membership tests
|
||||||
|
let job_id_set: HashSet<u32> = flow_job_ids.iter().copied().collect();
|
||||||
|
|
||||||
|
// Load all jobs
|
||||||
|
let mut jobs: HashMap<u32, Job> = HashMap::with_capacity(flow_job_ids.len());
|
||||||
|
for jid in &flow_job_ids {
|
||||||
|
let job = redis
|
||||||
|
.load_job(context_id, caller_id, *jid)
|
||||||
|
.await
|
||||||
|
.map_err(DagError::from)?;
|
||||||
|
jobs.insert(*jid, job);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate dependencies and construct adjacency
|
||||||
|
let mut edges: Vec<(u32, u32)> = Vec::new();
|
||||||
|
let mut reverse_edges: Vec<(u32, u32)> = Vec::new();
|
||||||
|
let mut adj: HashMap<u32, Vec<u32>> = HashMap::with_capacity(jobs.len());
|
||||||
|
let mut rev_adj: HashMap<u32, Vec<u32>> = HashMap::with_capacity(jobs.len());
|
||||||
|
let mut in_degree: HashMap<u32, usize> = HashMap::with_capacity(jobs.len());
|
||||||
|
|
||||||
|
for &jid in &flow_job_ids {
|
||||||
|
adj.entry(jid).or_default();
|
||||||
|
rev_adj.entry(jid).or_default();
|
||||||
|
in_degree.entry(jid).or_insert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (&jid, job) in &jobs {
|
||||||
|
for &dep in job.depends() {
|
||||||
|
if !job_id_set.contains(&dep) {
|
||||||
|
return Err(DagError::MissingDependency {
|
||||||
|
job: jid,
|
||||||
|
depends_on: dep,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
// edge: dep -> jid
|
||||||
|
edges.push((dep, jid));
|
||||||
|
reverse_edges.push((jid, dep));
|
||||||
|
adj.get_mut(&dep).unwrap().push(jid);
|
||||||
|
rev_adj.get_mut(&jid).unwrap().push(dep);
|
||||||
|
*in_degree.get_mut(&jid).unwrap() += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Kahn's algorithm for topological sorting, with level construction
|
||||||
|
let mut zero_in: VecDeque<u32> = in_degree
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(k, v)| if *v == 0 { Some(*k) } else { None })
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let mut processed_count = 0usize;
|
||||||
|
let mut levels: Vec<Vec<u32>> = Vec::new();
|
||||||
|
|
||||||
|
// To make deterministic, sort initial zero_in
|
||||||
|
{
|
||||||
|
let mut tmp: Vec<u32> = zero_in.iter().copied().collect();
|
||||||
|
tmp.sort_unstable();
|
||||||
|
zero_in = tmp.into_iter().collect();
|
||||||
|
}
|
||||||
|
|
||||||
|
while !zero_in.is_empty() {
|
||||||
|
let mut level: Vec<u32> = Vec::new();
|
||||||
|
// drain current frontier
|
||||||
|
let mut next_zero: Vec<u32> = Vec::new();
|
||||||
|
let mut current_frontier: Vec<u32> = zero_in.drain(..).collect();
|
||||||
|
current_frontier.sort_unstable();
|
||||||
|
for u in current_frontier {
|
||||||
|
level.push(u);
|
||||||
|
processed_count += 1;
|
||||||
|
if let Some(children) = adj.get(&u) {
|
||||||
|
let mut sorted_children = children.clone();
|
||||||
|
sorted_children.sort_unstable();
|
||||||
|
for &v in &sorted_children {
|
||||||
|
let d = in_degree.get_mut(&v).unwrap();
|
||||||
|
*d -= 1;
|
||||||
|
if *d == 0 {
|
||||||
|
next_zero.push(v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
next_zero.sort_unstable();
|
||||||
|
zero_in = next_zero.into_iter().collect();
|
||||||
|
levels.push(level);
|
||||||
|
}
|
||||||
|
|
||||||
|
if processed_count != jobs.len() {
|
||||||
|
let remaining: Vec<u32> = in_degree
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|(k, v)| if v > 0 { Some(k) } else { None })
|
||||||
|
.collect();
|
||||||
|
return Err(DagError::CycleDetected { remaining });
|
||||||
|
}
|
||||||
|
|
||||||
|
// Roots and leaves
|
||||||
|
let roots: Vec<u32> = levels.first().cloned().unwrap_or_default();
|
||||||
|
let leaves: Vec<u32> = adj
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(k, v)| if v.is_empty() { Some(*k) } else { None })
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Nodes map (JobSummary)
|
||||||
|
let mut nodes: HashMap<u32, JobSummary> = HashMap::with_capacity(jobs.len());
|
||||||
|
for (&jid, job) in &jobs {
|
||||||
|
let summary = JobSummary {
|
||||||
|
id: jid,
|
||||||
|
depends: job.depends().to_vec(),
|
||||||
|
prerequisites: job.prerequisites().to_vec(),
|
||||||
|
script_type: job.script_type(),
|
||||||
|
};
|
||||||
|
nodes.insert(jid, summary);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort edges deterministically
|
||||||
|
edges.sort_unstable();
|
||||||
|
reverse_edges.sort_unstable();
|
||||||
|
|
||||||
|
let dag = FlowDag {
|
||||||
|
flow_id,
|
||||||
|
caller_id,
|
||||||
|
context_id,
|
||||||
|
nodes,
|
||||||
|
edges,
|
||||||
|
reverse_edges,
|
||||||
|
roots,
|
||||||
|
leaves,
|
||||||
|
levels,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(dag)
|
||||||
|
}
|
||||||
|
|
@@ -1,4 +1,5 @@
|
|||||||
pub mod models;
|
pub mod models;
|
||||||
pub mod storage;
|
pub mod storage;
|
||||||
mod time;
|
mod time;
|
||||||
|
pub mod dag;
|
||||||
pub mod rpc;
|
pub mod rpc;
|
||||||
|
@@ -31,3 +31,18 @@ pub enum FlowStatus {
|
|||||||
Error,
|
Error,
|
||||||
Finished,
|
Finished,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Flow {
|
||||||
|
pub fn id(&self) -> u32 {
|
||||||
|
self.id
|
||||||
|
}
|
||||||
|
pub fn caller_id(&self) -> u32 {
|
||||||
|
self.caller_id
|
||||||
|
}
|
||||||
|
pub fn context_id(&self) -> u32 {
|
||||||
|
self.context_id
|
||||||
|
}
|
||||||
|
pub fn jobs(&self) -> &Vec<u32> {
|
||||||
|
&self.jobs
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@@ -36,3 +36,24 @@ pub enum JobStatus {
|
|||||||
Error,
|
Error,
|
||||||
Finished,
|
Finished,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Job {
|
||||||
|
pub fn id(&self) -> u32 {
|
||||||
|
self.id
|
||||||
|
}
|
||||||
|
pub fn caller_id(&self) -> u32 {
|
||||||
|
self.caller_id
|
||||||
|
}
|
||||||
|
pub fn context_id(&self) -> u32 {
|
||||||
|
self.context_id
|
||||||
|
}
|
||||||
|
pub fn depends(&self) -> &Vec<u32> {
|
||||||
|
&self.depends
|
||||||
|
}
|
||||||
|
pub fn prerequisites(&self) -> &Vec<String> {
|
||||||
|
&self.prerequisites
|
||||||
|
}
|
||||||
|
pub fn script_type(&self) -> ScriptType {
|
||||||
|
self.script_type.clone()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
34
src/rpc.rs
34
src/rpc.rs
@@ -9,10 +9,11 @@ use jsonrpsee::{
|
|||||||
server::{ServerBuilder, ServerHandle},
|
server::{ServerBuilder, ServerHandle},
|
||||||
types::error::ErrorObjectOwned,
|
types::error::ErrorObjectOwned,
|
||||||
};
|
};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::Deserialize;
|
||||||
use serde_json::{Value, json};
|
use serde_json::{Value, json};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
dag::{DagError, FlowDag, build_flow_dag},
|
||||||
models::{Actor, Context, Flow, Job, Message, MessageFormatType, Runner, ScriptType},
|
models::{Actor, Context, Flow, Job, Message, MessageFormatType, Runner, ScriptType},
|
||||||
storage::RedisDriver,
|
storage::RedisDriver,
|
||||||
time::current_timestamp,
|
time::current_timestamp,
|
||||||
@@ -45,6 +46,22 @@ fn storage_err(e: Box<dyn std::error::Error + Send + Sync>) -> ErrorObjectOwned
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn dag_err(e: DagError) -> ErrorObjectOwned {
|
||||||
|
match e {
|
||||||
|
DagError::Storage(inner) => storage_err(inner),
|
||||||
|
DagError::MissingDependency { .. } => ErrorObjectOwned::owned(
|
||||||
|
-32020,
|
||||||
|
"DAG Missing Dependency",
|
||||||
|
Some(Value::String(e.to_string())),
|
||||||
|
),
|
||||||
|
DagError::CycleDetected { .. } => ErrorObjectOwned::owned(
|
||||||
|
-32021,
|
||||||
|
"DAG Cycle Detected",
|
||||||
|
Some(Value::String(e.to_string())),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// -----------------------------
|
// -----------------------------
|
||||||
// Create DTOs and Param wrappers
|
// Create DTOs and Param wrappers
|
||||||
// -----------------------------
|
// -----------------------------
|
||||||
@@ -447,6 +464,21 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
|
|||||||
})
|
})
|
||||||
.expect("register flow.load");
|
.expect("register flow.load");
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
let state = state.clone();
|
||||||
|
module
|
||||||
|
.register_async_method("flow.dag", move |params, _caller, _ctx| {
|
||||||
|
let state = state.clone();
|
||||||
|
async move {
|
||||||
|
let p: FlowLoadParams = params.parse().map_err(invalid_params_err)?;
|
||||||
|
let dag: FlowDag = build_flow_dag(&state.redis, p.context_id, p.id)
|
||||||
|
.await
|
||||||
|
.map_err(dag_err)?;
|
||||||
|
Ok::<_, ErrorObjectOwned>(dag)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.expect("register flow.dag");
|
||||||
|
}
|
||||||
|
|
||||||
// Job
|
// Job
|
||||||
{
|
{
|
||||||
|
Reference in New Issue
Block a user