diff --git a/Cargo.lock b/Cargo.lock index 3b49281..7b503f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -183,6 +183,12 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "bincode" version = "2.0.1" @@ -562,6 +568,7 @@ checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", "crypto-common", + "subtle", ] [[package]] @@ -672,6 +679,24 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "ethnum" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca81e6b4777c89fd810c25a4be2b1bd93ea034fbe58e6a75216a34c6b82c539b" + +[[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + +[[package]] +name = "fast-float2" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8eb564c5c7423d25c886fb561d1e4ee69f72354d16918afa32c08811f6b6a55" + [[package]] name = "fastrand" version = "2.3.0" @@ -889,12 +914,14 @@ version = "0.1.0" dependencies = [ "bincode", "chrono", - "derive", "heromodels-derive", "heromodels_core", + "jsonb", "ourdb", + "postgres", + "r2d2", + "r2d2_postgres", "rhai", - "rhai_client_macros", "serde", "serde_json", "strum", @@ -920,6 +947,15 @@ dependencies = [ "serde", ] +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + [[package]] name = "home" version = "0.5.11" @@ -1194,6 +1230,47 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "jiff" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be1f93b8b1eb69c77f24bbb0afdf66f54b632ee39af40ca21c4365a1d7347e49" +dependencies = [ + "jiff-static", + "jiff-tzdb-platform", + "log", + "portable-atomic", + "portable-atomic-util", + "serde", + "windows-sys 0.59.0", +] + +[[package]] +name = "jiff-static" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03343451ff899767262ec32146f6d559dd759fdadf42ff0e227c7c48f72594b4" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + +[[package]] +name = "jiff-tzdb" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1283705eb0a21404d2bfd6eef2a7593d240bc42a0bdb39db0ad6fa2ec026524" + +[[package]] +name = "jiff-tzdb-platform" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "875a5a69ac2bab1a891711cf5eccbec1ce0341ea805560dcd90b7a2e925132e8" +dependencies = [ + "jiff-tzdb", +] + [[package]] name = "js-sys" version = "0.3.77" @@ -1204,6 +1281,26 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonb" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96cbb4fba292867a2d86ed83dbe5f9d036f423bf6a491b7d884058b2fde42fcd" +dependencies = [ + "byteorder", + "ethnum", + "fast-float2", + "itoa", + "jiff", + "nom 8.0.0", + "num-traits", + "ordered-float", + "rand 0.9.2", + "ryu", + "serde", + "serde_json", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -1270,6 +1367,16 @@ dependencies = [ "serde", ] +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "memchr" version = "2.7.4" @@ -1371,6 +1478,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nom" +version = "8.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df9761775871bdef83bee530e60050f7e54b1105350d6884eb0fb4f46c2f9405" +dependencies = [ + "memchr", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1490,13 +1606,22 @@ dependencies = [ "uuid", ] +[[package]] +name = "ordered-float" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2c1f9f56e534ac6a9b8a4600bdf0f530fb393b5f393e7b4d03489c3cf0c3f01" +dependencies = [ + "num-traits", +] + [[package]] name = "ourdb" version = "0.1.0" dependencies = [ "crc32fast", "log", - "rand", + "rand 0.8.5", "thiserror", ] @@ -1561,7 +1686,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d" dependencies = [ "phf_shared", - "rand", + "rand 0.8.5", ] [[package]] @@ -1625,6 +1750,60 @@ version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e" +[[package]] +name = "portable-atomic-util" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507" +dependencies = [ + "portable-atomic", +] + +[[package]] +name = "postgres" +version = "0.19.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "363e6dfbdd780d3aa3597b6eb430db76bb315fa9bad7fae595bb8def808b8470" +dependencies = [ + "bytes", + "fallible-iterator", + "futures-util", + "log", + "tokio", + "tokio-postgres", +] + +[[package]] +name = "postgres-protocol" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ff0abab4a9b844b93ef7b81f1efc0a366062aaef2cd702c76256b5dc075c54" +dependencies = [ + "base64 0.22.1", + "byteorder", + "bytes", + "fallible-iterator", + "hmac", + "md-5", + "memchr", + "rand 0.9.2", + "sha2", + "stringprep", +] + +[[package]] +name = "postgres-types" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613283563cd90e1dfc3518d548caee47e0e725455ed619881f5cf21f36de4b48" +dependencies = [ + "bytes", + "fallible-iterator", + "postgres-protocol", + "serde", + "serde_json", +] + [[package]] name = "potential_utf" version = "0.1.2" @@ -1681,6 +1860,27 @@ version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" +[[package]] +name = "r2d2" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" +dependencies = [ + "log", + "parking_lot", + "scheduled-thread-pool", +] + +[[package]] +name = "r2d2_postgres" +version = "0.18.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efd4b47636dbca581cd057e2f27a5d39be741ea4f85fd3c29e415c55f71c7595" +dependencies = [ + "postgres", + "r2d2", +] + [[package]] name = "rand" version = "0.8.5" @@ -1688,8 +1888,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.3", ] [[package]] @@ -1699,7 +1909,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.3", ] [[package]] @@ -1711,6 +1931,15 @@ dependencies = [ "getrandom 0.2.16", ] +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.3", +] + [[package]] name = "rayon" version = "1.10.0" @@ -1807,7 +2036,7 @@ version = "0.11.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" dependencies = [ - "base64", + "base64 0.21.7", "bytes", "encoding_rs", "futures-core", @@ -1860,16 +2089,6 @@ dependencies = [ "thin-vec", ] -[[package]] -name = "rhai_client_macros" -version = "0.1.0" -dependencies = [ - "proc-macro2", - "quote", - "rhai", - "syn 2.0.101", -] - [[package]] name = "rhai_codegen" version = "2.2.0" @@ -2020,7 +2239,7 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" dependencies = [ - "base64", + "base64 0.21.7", ] [[package]] @@ -2053,6 +2272,15 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "scheduled-thread-pool" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19" +dependencies = [ + "parking_lot", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -2108,6 +2336,7 @@ version = "1.0.140" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" dependencies = [ + "indexmap", "itoa", "memchr", "ryu", @@ -2143,6 +2372,17 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" +[[package]] +name = "sha2" +version = "0.10.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -2227,6 +2467,17 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "stringprep" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", + "unicode-properties", +] + [[package]] name = "strsim" version = "0.11.1" @@ -2252,6 +2503,12 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "subtle" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" + [[package]] name = "syn" version = "1.0.109" @@ -2353,7 +2610,7 @@ checksum = "666cd3a6681775d22b200409aad3b089c5b99fb11ecdd8a204d9d62f8148498f" dependencies = [ "dirs", "fnv", - "nom", + "nom 7.1.3", "phf", "phf_codegen", ] @@ -2423,6 +2680,21 @@ dependencies = [ "serde_json", ] +[[package]] +name = "tinyvec" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09b3661f17e86524eccd4371ab0429194e0d7c008abb45f7a7495b1719463c71" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.45.1" @@ -2462,6 +2734,32 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-postgres" +version = "0.7.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c95d533c83082bb6490e0189acaa0bbeef9084e60471b696ca6988cd0541fb0" +dependencies = [ + "async-trait", + "byteorder", + "bytes", + "fallible-iterator", + "futures-channel", + "futures-util", + "log", + "parking_lot", + "percent-encoding", + "phf", + "pin-project-lite", + "postgres-protocol", + "postgres-types", + "rand 0.9.2", + "socket2", + "tokio", + "tokio-util", + "whoami", +] + [[package]] name = "tokio-stream" version = "0.1.17" @@ -2600,7 +2898,7 @@ dependencies = [ "http", "httparse", "log", - "rand", + "rand 0.8.5", "sha1", "thiserror", "url", @@ -2613,12 +2911,33 @@ version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f" +[[package]] +name = "unicode-bidi" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5" + [[package]] name = "unicode-ident" version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" +[[package]] +name = "unicode-normalization" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956" +dependencies = [ + "tinyvec", +] + +[[package]] +name = "unicode-properties" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0" + [[package]] name = "unicode-width" version = "0.1.14" @@ -2730,6 +3049,12 @@ dependencies = [ "wit-bindgen-rt", ] +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + [[package]] name = "wasm-bindgen" version = "0.2.100" @@ -2823,6 +3148,17 @@ dependencies = [ "rustix 0.38.44", ] +[[package]] +name = "whoami" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6994d13118ab492c3c80c1f81928718159254c53c472bf9ce36f8dae4add02a7" +dependencies = [ + "redox_syscall", + "wasite", + "web-sys", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/src/dsl/src/flow/flow.rs b/_archive/flow/flow.rs similarity index 90% rename from src/dsl/src/flow/flow.rs rename to _archive/flow/flow.rs index 96b0a53..66452b0 100644 --- a/src/dsl/src/flow/flow.rs +++ b/_archive/flow/flow.rs @@ -21,8 +21,8 @@ mod rhai_flow_module { use super::{Array, Dynamic, RhaiFlow, RhaiFlowStep, INT}; #[rhai_fn(name = "new_flow", return_raw)] - pub fn new_flow(flow_uuid: String) -> Result> { - Ok(Flow::new(flow_uuid)) + pub fn new_flow() -> Result> { + Ok(Flow::new()) } // --- Setters --- @@ -55,10 +55,7 @@ mod rhai_flow_module { pub fn get_id(f: &mut RhaiFlow) -> INT { f.base_data.id as INT } - #[rhai_fn(get = "flow_uuid", pure)] - pub fn get_flow_uuid(f: &mut RhaiFlow) -> String { - f.flow_uuid.clone() - } + #[rhai_fn(get = "name", pure)] pub fn get_name(f: &mut RhaiFlow) -> String { f.name.clone() @@ -97,5 +94,4 @@ pub fn register_flow_rhai_module(engine: &mut Engine) { ); engine.register_global_module(module.into()); - println!("Successfully registered flow Rhai module."); } diff --git a/src/dsl/src/flow/flow_step.rs b/_archive/flow/flow_step.rs similarity index 83% rename from src/dsl/src/flow/flow_step.rs rename to _archive/flow/flow_step.rs index d050a81..ffa7ddb 100644 --- a/src/dsl/src/flow/flow_step.rs +++ b/_archive/flow/flow_step.rs @@ -34,17 +34,6 @@ mod rhai_flow_step_module { Ok(step.clone()) } - #[rhai_fn(name = "step_order", return_raw)] - pub fn set_step_order( - step: &mut RhaiFlowStep, - step_order: INT, - ) -> Result> { - let mut owned = std::mem::take(step); - owned.step_order = step_order as u32; - *step = owned; - Ok(step.clone()) - } - #[rhai_fn(name = "status", return_raw)] pub fn set_status( step: &mut RhaiFlowStep, @@ -64,10 +53,6 @@ mod rhai_flow_step_module { pub fn get_description(s: &mut RhaiFlowStep) -> Option { s.description.clone() } - #[rhai_fn(get = "step_order", pure)] - pub fn get_step_order(s: &mut RhaiFlowStep) -> INT { - s.step_order as INT - } #[rhai_fn(get = "status", pure)] pub fn get_status(s: &mut RhaiFlowStep) -> String { s.status.clone() @@ -98,5 +83,4 @@ pub fn register_flow_step_rhai_module(engine: &mut Engine) { ); engine.register_global_module(module.into()); - println!("Successfully registered flow_step Rhai module."); } diff --git a/src/dsl/src/flow/mod.rs b/_archive/flow/mod.rs similarity index 56% rename from src/dsl/src/flow/mod.rs rename to _archive/flow/mod.rs index de9e654..c8d51f7 100644 --- a/src/dsl/src/flow/mod.rs +++ b/_archive/flow/mod.rs @@ -3,10 +3,15 @@ use rhai::Engine; pub mod flow; pub mod flow_step; pub mod signature_requirement; +pub mod orchestrated_flow; +pub mod orchestrated_flow_step; + +// Re-export the orchestrated models for easy access +pub use orchestrated_flow::{OrchestratedFlow, OrchestratorError, FlowStatus}; +pub use orchestrated_flow_step::OrchestratedFlowStep; pub fn register_flow_rhai_modules(engine: &mut Engine) { flow::register_flow_rhai_module(engine); flow_step::register_flow_step_rhai_module(engine); signature_requirement::register_signature_requirement_rhai_module(engine); - println!("Successfully registered flow Rhai modules."); } diff --git a/_archive/flow/orchestrated_flow.rs b/_archive/flow/orchestrated_flow.rs new file mode 100644 index 0000000..754203f --- /dev/null +++ b/_archive/flow/orchestrated_flow.rs @@ -0,0 +1,154 @@ +//! Orchestrated Flow model for DAG-based workflow execution + +use heromodels_core::BaseModelData; +use serde::{Deserialize, Serialize}; +use std::collections::HashSet; +use thiserror::Error; + +use super::orchestrated_flow_step::OrchestratedFlowStep; + +/// Extended Flow with orchestrator-specific steps +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OrchestratedFlow { + /// Base model data (id, created_at, updated_at) + pub base_data: BaseModelData, + + /// Name of the flow + pub name: String, + + /// Orchestrated steps with dependencies + pub orchestrated_steps: Vec, +} + +impl OrchestratedFlow { + /// Create a new orchestrated flow + pub fn new(name: &str) -> Self { + Self { + base_data: BaseModelData::new(), + name: name.to_string(), + orchestrated_steps: Vec::new(), + } + } + + /// Add a step to the flow + pub fn add_step(mut self, step: OrchestratedFlowStep) -> Self { + self.orchestrated_steps.push(step); + self + } + + /// Get the flow ID + pub fn id(&self) -> u32 { + self.base_data.id + } + + /// Validate the DAG structure (no cycles) + pub fn validate_dag(&self) -> Result<(), OrchestratorError> { + let mut visited = HashSet::new(); + let mut rec_stack = HashSet::new(); + + for step in &self.orchestrated_steps { + if !visited.contains(&step.id()) { + if self.has_cycle(step.id(), &mut visited, &mut rec_stack)? { + return Err(OrchestratorError::CyclicDependency); + } + } + } + + Ok(()) + } + + /// Check for cycles in the dependency graph + fn has_cycle( + &self, + step_id: u32, + visited: &mut HashSet, + rec_stack: &mut HashSet, + ) -> Result { + visited.insert(step_id); + rec_stack.insert(step_id); + + let step = self.orchestrated_steps + .iter() + .find(|s| s.id() == step_id) + .ok_or(OrchestratorError::StepNotFound(step_id))?; + + for &dep_id in &step.depends_on { + if !visited.contains(&dep_id) { + if self.has_cycle(dep_id, visited, rec_stack)? { + return Ok(true); + } + } else if rec_stack.contains(&dep_id) { + return Ok(true); + } + } + + rec_stack.remove(&step_id); + Ok(false) + } +} + +/// Orchestrator errors +#[derive(Error, Debug)] +pub enum OrchestratorError { + #[error("Database error: {0}")] + DatabaseError(String), + + #[error("Executor error: {0}")] + ExecutorError(String), + + #[error("No ready steps found - possible deadlock")] + NoReadySteps, + + #[error("Step {0} failed: {1:?}")] + StepFailed(u32, Option), + + #[error("Cyclic dependency detected in workflow")] + CyclicDependency, + + #[error("Step {0} not found")] + StepNotFound(u32), + + #[error("Invalid dependency: step {0} depends on non-existent step {1}")] + InvalidDependency(u32, u32), +} + +/// Flow execution status +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum FlowStatus { + Pending, + Running, + Completed, + Failed, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_orchestrated_flow_builder() { + let step1 = OrchestratedFlowStep::new("step1").script("let x = 1;"); + let step2 = OrchestratedFlowStep::new("step2").script("let y = 2;"); + + let flow = OrchestratedFlow::new("test_flow") + .add_step(step1) + .add_step(step2); + + assert_eq!(flow.name, "test_flow"); + assert_eq!(flow.orchestrated_steps.len(), 2); + } + + #[test] + fn test_dag_validation_no_cycle() { + let step1 = OrchestratedFlowStep::new("step1").script("let x = 1;"); + let step2 = OrchestratedFlowStep::new("step2") + .script("let y = 2;") + .depends_on(step1.id()); + + let flow = OrchestratedFlow::new("test_flow") + .add_step(step1) + .add_step(step2); + + assert!(flow.validate_dag().is_ok()); + } +} \ No newline at end of file diff --git a/_archive/flow/orchestrated_flow_step.rs b/_archive/flow/orchestrated_flow_step.rs new file mode 100644 index 0000000..dc3fcfc --- /dev/null +++ b/_archive/flow/orchestrated_flow_step.rs @@ -0,0 +1,124 @@ +//! Orchestrated Flow Step model for DAG-based workflow execution + +use heromodels_core::BaseModelData; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// Extended FlowStep with orchestrator-specific fields +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OrchestratedFlowStep { + /// Base model data (id, created_at, updated_at) + pub base_data: BaseModelData, + + /// Name of the flow step + pub name: String, + + /// Rhai script to execute + pub script: String, + + /// IDs of steps this step depends on + pub depends_on: Vec, + + /// Execution context (circle) + pub context_id: String, + + /// Target worker for execution + pub worker_id: String, + + /// Input parameters + pub inputs: HashMap, + + /// Output results + pub outputs: HashMap, +} + +impl OrchestratedFlowStep { + /// Create a new orchestrated flow step + pub fn new(name: &str) -> Self { + Self { + base_data: BaseModelData::new(), + name: name.to_string(), + script: String::new(), + depends_on: Vec::new(), + context_id: String::new(), + worker_id: String::new(), + inputs: HashMap::new(), + outputs: HashMap::new(), + } + } + + /// Set the script content + pub fn script(mut self, script: &str) -> Self { + self.script = script.to_string(); + self + } + + /// Add a dependency on another step + pub fn depends_on(mut self, step_id: u32) -> Self { + self.depends_on.push(step_id); + self + } + + /// Set the context ID + pub fn context_id(mut self, context_id: &str) -> Self { + self.context_id = context_id.to_string(); + self + } + + /// Set the worker ID + pub fn worker_id(mut self, worker_id: &str) -> Self { + self.worker_id = worker_id.to_string(); + self + } + + /// Add an input parameter + pub fn input(mut self, key: &str, value: &str) -> Self { + self.inputs.insert(key.to_string(), value.to_string()); + self + } + + /// Get the step ID + pub fn id(&self) -> u32 { + self.base_data.id + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_orchestrated_flow_step_builder() { + let step = OrchestratedFlowStep::new("test_step") + .script("let x = 1;") + .context_id("test_context") + .worker_id("test_worker") + .input("key1", "value1"); + + assert_eq!(step.name, "test_step"); + assert_eq!(step.script, "let x = 1;"); + assert_eq!(step.context_id, "test_context"); + assert_eq!(step.worker_id, "test_worker"); + assert_eq!(step.inputs.get("key1"), Some(&"value1".to_string())); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_orchestrated_flow_step_builder() { + let step = OrchestratedFlowStep::new("test_step") + .script("let x = 1;") + .context_id("test_context") + .worker_id("test_worker") + .input("key1", "value1"); + + assert_eq!(step.flow_step.name, "test_step"); + assert_eq!(step.script, "let x = 1;"); + assert_eq!(step.context_id, "test_context"); + assert_eq!(step.worker_id, "test_worker"); + assert_eq!(step.inputs.get("key1"), Some(&"value1".to_string())); + } +} \ No newline at end of file diff --git a/src/dsl/src/flow/signature_requirement.rs b/_archive/flow/signature_requirement.rs similarity index 98% rename from src/dsl/src/flow/signature_requirement.rs rename to _archive/flow/signature_requirement.rs index 96f344b..91ef242 100644 --- a/src/dsl/src/flow/signature_requirement.rs +++ b/_archive/flow/signature_requirement.rs @@ -142,5 +142,4 @@ pub fn register_signature_requirement_rhai_module(engine: &mut Engine) { ); engine.register_global_module(module.into()); - println!("Successfully registered signature_requirement Rhai module."); } diff --git a/src/dsl/src/access.rs b/src/dsl/src/access.rs index 23e64be..d2d0f8f 100644 --- a/src/dsl/src/access.rs +++ b/src/dsl/src/access.rs @@ -147,6 +147,4 @@ pub fn register_access_rhai_module(engine: &mut Engine) { ); engine.register_global_module(module.into()); - - println!("Successfully registered access Rhai module using export_module approach."); } diff --git a/src/dsl/src/biz/company.rs b/src/dsl/src/biz/company.rs index f2bf104..6f219cd 100644 --- a/src/dsl/src/biz/company.rs +++ b/src/dsl/src/biz/company.rs @@ -247,5 +247,4 @@ pub fn register_company_rhai_module(engine: &mut Engine) { ); engine.register_global_module(module.into()); - println!("Successfully registered company Rhai module."); } diff --git a/src/dsl/src/biz/mod.rs b/src/dsl/src/biz/mod.rs index f386180..a7b7fff 100644 --- a/src/dsl/src/biz/mod.rs +++ b/src/dsl/src/biz/mod.rs @@ -10,5 +10,4 @@ pub fn register_biz_rhai_module(engine: &mut Engine) { product::register_product_rhai_module(engine); sale::register_sale_rhai_module(engine); shareholder::register_shareholder_rhai_module(engine); - println!("Successfully registered biz Rhai module."); } diff --git a/src/dsl/src/biz/product.rs b/src/dsl/src/biz/product.rs index 82cbed4..2d09303 100644 --- a/src/dsl/src/biz/product.rs +++ b/src/dsl/src/biz/product.rs @@ -314,5 +314,4 @@ pub fn register_product_rhai_module(engine: &mut Engine) { ); engine.register_global_module(product_module.into()); - println!("Successfully registered product Rhai module."); } diff --git a/src/dsl/src/biz/sale.rs b/src/dsl/src/biz/sale.rs index dbdb75f..4c1fcc4 100644 --- a/src/dsl/src/biz/sale.rs +++ b/src/dsl/src/biz/sale.rs @@ -310,5 +310,4 @@ pub fn register_sale_rhai_module(engine: &mut Engine) { ); engine.register_global_module(sale_module.into()); - println!("Successfully registered sale Rhai module."); } diff --git a/src/dsl/src/biz/shareholder.rs b/src/dsl/src/biz/shareholder.rs index 11c2f4e..017bbfd 100644 --- a/src/dsl/src/biz/shareholder.rs +++ b/src/dsl/src/biz/shareholder.rs @@ -166,5 +166,4 @@ pub fn register_shareholder_rhai_module(engine: &mut Engine) { ); engine.register_global_module(shareholder_module.into()); - println!("Successfully registered shareholder Rhai module."); } diff --git a/src/dsl/src/calendar.rs b/src/dsl/src/calendar.rs index 867160f..9461447 100644 --- a/src/dsl/src/calendar.rs +++ b/src/dsl/src/calendar.rs @@ -245,5 +245,4 @@ pub fn register_calendar_rhai_module(engine: &mut Engine) { engine.register_type_with_name::("Attendee"); engine.register_type_with_name::("Event"); engine.register_global_module(module.into()); - println!("Successfully registered calendar Rhai module."); } diff --git a/src/dsl/src/circle.rs b/src/dsl/src/circle.rs index 9e155da..4dce592 100644 --- a/src/dsl/src/circle.rs +++ b/src/dsl/src/circle.rs @@ -153,5 +153,4 @@ pub fn register_circle_rhai_module(engine: &mut Engine) { ); engine.register_global_module(module.into()); - println!("Successfully registered circle Rhai module."); } diff --git a/src/dsl/src/company.rs b/src/dsl/src/company.rs index 942e087..f2aade7 100644 --- a/src/dsl/src/company.rs +++ b/src/dsl/src/company.rs @@ -295,5 +295,4 @@ pub fn register_company_rhai_module(engine: &mut Engine) { ); engine.register_global_module(module.into()); - println!("Successfully registered company Rhai module."); } diff --git a/src/dsl/src/contact.rs b/src/dsl/src/contact.rs index 1551dee..650b973 100644 --- a/src/dsl/src/contact.rs +++ b/src/dsl/src/contact.rs @@ -230,5 +230,4 @@ pub fn register_contact_rhai_module(engine: &mut Engine) { ); engine.register_global_module(module.into()); - println!("Successfully registered contact Rhai module."); } diff --git a/src/dsl/src/core/comment.rs b/src/dsl/src/core/comment.rs index 8eb0759..3a92883 100644 --- a/src/dsl/src/core/comment.rs +++ b/src/dsl/src/core/comment.rs @@ -99,5 +99,4 @@ pub fn register_comment_rhai_module(engine: &mut Engine) { ); engine.register_global_module(module.into()); - println!("Successfully registered comment Rhai module."); } diff --git a/src/dsl/src/finance/account.rs b/src/dsl/src/finance/account.rs index 341b5a6..3e76ab2 100644 --- a/src/dsl/src/finance/account.rs +++ b/src/dsl/src/finance/account.rs @@ -156,5 +156,4 @@ pub fn register_account_rhai_module(engine: &mut Engine) { ); engine.register_global_module(account_module.into()); - println!("Successfully registered account Rhai module."); } diff --git a/src/dsl/src/finance/asset.rs b/src/dsl/src/finance/asset.rs index 82fc637..ed18935 100644 --- a/src/dsl/src/finance/asset.rs +++ b/src/dsl/src/finance/asset.rs @@ -165,5 +165,4 @@ pub fn register_asset_rhai_module(engine: &mut Engine) { ); engine.register_global_module(module.into()); - println!("Successfully registered asset Rhai module."); } diff --git a/src/dsl/src/finance/marketplace.rs b/src/dsl/src/finance/marketplace.rs index 1cf7d9d..8581351 100644 --- a/src/dsl/src/finance/marketplace.rs +++ b/src/dsl/src/finance/marketplace.rs @@ -526,5 +526,4 @@ pub fn register_marketplace_rhai_module(engine: &mut Engine) { ); engine.register_global_module(listing_module.into()); - println!("Successfully registered marketplace Rhai module."); } diff --git a/src/dsl/src/flow_manager.rs b/src/dsl/src/flow_manager.rs deleted file mode 100644 index e69de29..0000000 diff --git a/src/dsl/src/lib.rs b/src/dsl/src/lib.rs index 14d8d08..670097a 100644 --- a/src/dsl/src/lib.rs +++ b/src/dsl/src/lib.rs @@ -48,7 +48,7 @@ pub mod company; pub mod contact; pub mod core; pub mod finance; -pub mod flow; +// pub mod flow; pub mod library; pub mod object; pub mod payment; @@ -107,7 +107,7 @@ pub fn register_dsl_modules(engine: &mut Engine) { contact::register_contact_rhai_module(engine); core::register_core_rhai_module(engine); finance::register_finance_rhai_modules(engine); - flow::register_flow_rhai_modules(engine); + // flow::register_flow_rhai_modules(engine); library::register_library_rhai_module(engine); object::register_object_fns(engine); payment::register_payment_rhai_module(engine); diff --git a/src/dsl/src/object.rs b/src/dsl/src/object.rs index 7ce9707..482d399 100644 --- a/src/dsl/src/object.rs +++ b/src/dsl/src/object.rs @@ -1,6 +1,5 @@ use heromodels::db::hero::OurDB; use heromodels::db::{Collection, Db}; -use heromodels::models::object::object::object_rhai_dsl::generated_rhai_module; use heromodels::models::object::Object; use macros::{register_authorized_create_by_id_fn, register_authorized_get_by_id_fn}; use rhai::{exported_module, Engine, EvalAltResult, FuncRegistration, Module}; @@ -8,7 +7,6 @@ use std::sync::Arc; pub fn register_object_fns(engine: &mut Engine) { let mut module = Module::new(); - module.merge(&exported_module!(generated_rhai_module)); register_authorized_get_by_id_fn!( module: &mut module, diff --git a/src/dsl/src/payment.rs b/src/dsl/src/payment.rs index 82dbdae..3b0955e 100644 --- a/src/dsl/src/payment.rs +++ b/src/dsl/src/payment.rs @@ -981,5 +981,4 @@ pub fn register_payment_rhai_module(engine: &mut Engine) { engine.register_type_with_name::("Coupon"); engine.register_global_module(module.into()); - println!("Successfully registered payment Rhai module."); } \ No newline at end of file diff --git a/src/dsl/src/product.rs b/src/dsl/src/product.rs index 629c158..53054fd 100644 --- a/src/dsl/src/product.rs +++ b/src/dsl/src/product.rs @@ -173,5 +173,4 @@ pub fn register_product_rhai_module(engine: &mut Engine) { engine.register_type_with_name::("ProductComponent"); engine.register_global_module(module.into()); - println!("Successfully registered product Rhai module."); } diff --git a/src/dsl/src/sale.rs b/src/dsl/src/sale.rs index 3691bb5..9161dfa 100644 --- a/src/dsl/src/sale.rs +++ b/src/dsl/src/sale.rs @@ -177,5 +177,4 @@ pub fn register_sale_rhai_module(engine: &mut Engine) { engine.register_type_with_name::("SaleItem"); engine.register_global_module(module.into()); - println!("Successfully registered sale Rhai module."); } diff --git a/src/dsl/src/shareholder.rs b/src/dsl/src/shareholder.rs index 27c635f..089df10 100644 --- a/src/dsl/src/shareholder.rs +++ b/src/dsl/src/shareholder.rs @@ -109,5 +109,4 @@ pub fn register_shareholder_rhai_module(engine: &mut Engine) { ); engine.register_global_module(module.into()); - println!("Successfully registered shareholder Rhai module."); } diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml new file mode 100644 index 0000000..310cfd7 --- /dev/null +++ b/src/flow/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "flow" +version = "0.1.0" +edition = "2021" +description = "Simple flow manager for Rhai scripts" + +[dependencies] +rhai = { version = "=1.21.0", features = ["std", "sync"] } +rhai_dispatcher = { path = "../dispatcher" } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tokio = { version = "1", features = ["full"] } +redis = { version = "0.23", features = ["tokio-comp"] } +uuid = { version = "1.0", features = ["v4"] } + +[dev-dependencies] +tempfile = "3" \ No newline at end of file diff --git a/src/flow/README.md b/src/flow/README.md new file mode 100644 index 0000000..4477e94 --- /dev/null +++ b/src/flow/README.md @@ -0,0 +1,110 @@ +# Flow Manager + +A simple, generic flow manager for Rhai scripts with builder pattern API and non-blocking execution. + +## Features + +- **Builder Pattern API**: Fluent interface for creating steps and flows +- **Non-blocking Execution**: Uses `tokio::spawn` for async step execution +- **Simple State Management**: Redis-based state tracking +- **Retry Logic**: Configurable timeouts and retry attempts +- **Mock API Support**: Built-in mock API for testing different scenarios +- **RhaiDispatcher Integration**: Seamless integration with existing Rhai execution system + +## Quick Start + +```rust +use flow::{new_step, new_flow, FlowExecutor}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Create executor + let executor = FlowExecutor::new("redis://127.0.0.1/").await?; + + // Build steps using fluent API + let step1 = new_step("stripe_config") + .script("stripe_config_script") + .timeout(5) + .retries(2) + .build(); + + let step2 = new_step("stripe_config_confirm") + .script("script that looks up stripe config confirmation in db") + .timeout(5) + .build(); + + let step3 = new_step("create_product") + .script("create_product_script") + .timeout(10) + .retries(1) + .build(); + + // Build flow using fluent API + let flow = new_flow("stripe_payment_request") + .add_step(step1) + .add_step(step2) + .add_step(step3) + .build(); + + // Execute flow (non-blocking) + let result = executor.execute_flow(flow).await?; + println!("Flow started: {}", result); + + Ok(()) +} +``` + +## Architecture + +### Core Components + +- **Types** (`types.rs`): Core data structures (Flow, Step, Status enums) +- **Builder** (`builder.rs`): Fluent API for constructing flows and steps +- **State** (`state.rs`): Simple Redis-based state management +- **Executor** (`executor.rs`): Non-blocking flow execution engine +- **Mock API** (`mock_api.rs`): Testing utilities for different response scenarios + +### State Management + +The system tracks minimal state: + +**Flow State:** +- `flow_id: String` - unique identifier +- `status: FlowStatus` (Created, Running, Completed, Failed) +- `current_step: Option` - currently executing step +- `completed_steps: Vec` - list of finished steps + +**Step State:** +- `step_id: String` - unique identifier +- `status: StepStatus` (Pending, Running, Completed, Failed) +- `attempt_count: u32` - for retry logic +- `output: Option` - result from script execution + +**Storage:** +- Redis key-value pairs: `flow:{flow_id}` and `step:{flow_id}:{step_id}` + +## Examples + +Run the example: + +```bash +cd ../rhailib/src/flow +cargo run --example stripe_flow_example +``` + +## Testing + +```bash +cargo test +``` + +Note: Some tests require Redis to be running. Set `SKIP_REDIS_TESTS=1` to skip Redis-dependent tests. + +## Integration + +The flow manager integrates with: +- **RhaiDispatcher**: For executing Rhai scripts +- **Redis**: For state persistence +- **tokio**: For non-blocking async execution + +This provides a simple, reliable foundation for orchestrating complex workflows while maintaining the non-blocking execution pattern established in the payment system. \ No newline at end of file diff --git a/src/flow/src/builder.rs b/src/flow/src/builder.rs new file mode 100644 index 0000000..6b1f80f --- /dev/null +++ b/src/flow/src/builder.rs @@ -0,0 +1,108 @@ +//! Builder patterns for steps and flows + +use crate::types::{Step, Flow}; + +/// Builder for creating steps with fluent API +pub struct StepBuilder { + step: Step, +} + +impl StepBuilder { + pub fn new(name: &str) -> Self { + Self { + step: Step::new(name), + } + } + + /// Set the script content for this step + pub fn script(mut self, script: &str) -> Self { + self.step.script = script.to_string(); + self + } + + /// Set timeout in seconds + pub fn timeout(mut self, seconds: u64) -> Self { + self.step.timeout_seconds = seconds; + self + } + + /// Set maximum retry attempts + pub fn retries(mut self, count: u32) -> Self { + self.step.max_retries = count; + self + } + + /// Build the final step + pub fn build(self) -> Step { + self.step + } +} + +/// Builder for creating flows with fluent API +pub struct FlowBuilder { + flow: Flow, +} + +impl FlowBuilder { + pub fn new(name: &str) -> Self { + Self { + flow: Flow::new(name), + } + } + + /// Add a step to this flow + pub fn add_step(mut self, step: Step) -> Self { + self.flow.steps.push(step); + self + } + + /// Build the final flow + pub fn build(self) -> Flow { + self.flow + } +} + +/// Convenience function to create a new step builder +pub fn new_step(name: &str) -> StepBuilder { + StepBuilder::new(name) +} + +/// Convenience function to create a new flow builder +pub fn new_flow(name: &str) -> FlowBuilder { + FlowBuilder::new(name) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_step_builder() { + let step = new_step("test_step") + .script("print('hello world');") + .timeout(10) + .retries(3) + .build(); + + assert_eq!(step.name, "test_step"); + assert_eq!(step.script, "print('hello world');"); + assert_eq!(step.timeout_seconds, 10); + assert_eq!(step.max_retries, 3); + } + + #[test] + fn test_flow_builder() { + let step1 = new_step("step1").script("let x = 1;").build(); + let step2 = new_step("step2").script("let y = 2;").build(); + + let flow = new_flow("test_flow") + .add_step(step1) + .add_step(step2) + .build(); + + assert_eq!(flow.name, "test_flow"); + assert_eq!(flow.steps.len(), 2); + assert_eq!(flow.steps[0].name, "step1"); + assert_eq!(flow.steps[1].name, "step2"); + } +} \ No newline at end of file diff --git a/src/flow/src/executor.rs b/src/flow/src/executor.rs new file mode 100644 index 0000000..9c1e2e5 --- /dev/null +++ b/src/flow/src/executor.rs @@ -0,0 +1,243 @@ +//! Simple flow executor with non-blocking step execution + +use crate::types::{Flow, Step, FlowStatus, StepStatus}; +use crate::state::{FlowState, StepState, StateManager}; +use crate::mock_api::MockAPI; +use rhai_dispatcher::RhaiDispatcherBuilder; +use std::sync::Arc; +use tokio::time::{timeout, Duration}; + +/// Simple flow executor +pub struct FlowExecutor { + state_manager: Arc, + mock_api: Arc, + redis_url: String, +} + +impl FlowExecutor { + pub async fn new(redis_url: &str) -> Result> { + let state_manager = Arc::new(StateManager::new(redis_url).await?); + let mock_api = Arc::new(MockAPI::default()); + + Ok(Self { + state_manager, + mock_api, + redis_url: redis_url.to_string(), + }) + } + + /// Execute a flow non-blocking + pub async fn execute_flow(&self, flow: Flow) -> Result> { + // Initialize flow state + let mut flow_state = FlowState::new(flow.id.clone()); + flow_state.status = FlowStatus::Running; + self.state_manager.save_flow_state(&flow_state).await?; + + // Initialize step states + for step in &flow.steps { + let step_state = StepState::new(step.id.clone()); + self.state_manager.save_step_state(&flow.id, &step_state).await?; + } + + // Spawn flow execution in background + let flow_id = flow.id.clone(); + let state_manager = self.state_manager.clone(); + let mock_api = self.mock_api.clone(); + let redis_url = self.redis_url.clone(); + + tokio::spawn(async move { + if let Err(e) = Self::execute_flow_steps(flow, state_manager, mock_api, redis_url).await { + eprintln!("Flow execution error: {}", e); + } + }); + + Ok(format!("flow_execution_started:{}", flow_id)) + } + + /// Execute all steps in a flow + async fn execute_flow_steps( + flow: Flow, + state_manager: Arc, + mock_api: Arc, + redis_url: String, + ) -> Result<(), Box> { + let mut flow_state = state_manager.load_flow_state(&flow.id).await? + .ok_or("Flow state not found")?; + + // Execute steps sequentially + for step in &flow.steps { + flow_state.current_step = Some(step.id.clone()); + state_manager.save_flow_state(&flow_state).await?; + + match Self::execute_step_with_retries( + step, + &flow.id, + state_manager.clone(), + mock_api.clone(), + redis_url.clone(), + ).await { + Ok(_) => { + flow_state.completed_steps.push(step.id.clone()); + } + Err(e) => { + eprintln!("Step {} failed: {}", step.name, e); + flow_state.status = FlowStatus::Failed; + state_manager.save_flow_state(&flow_state).await?; + return Err(e); + } + } + } + + // Mark flow as completed + flow_state.status = FlowStatus::Completed; + flow_state.current_step = None; + state_manager.save_flow_state(&flow_state).await?; + + Ok(()) + } + + /// Execute a single step with retry logic + async fn execute_step_with_retries( + step: &Step, + flow_id: &str, + state_manager: Arc, + mock_api: Arc, + redis_url: String, + ) -> Result<(), Box> { + let mut step_state = state_manager.load_step_state(flow_id, &step.id).await? + .ok_or("Step state not found")?; + + let max_attempts = step.max_retries + 1; + + for attempt in 0..max_attempts { + step_state.attempt_count = attempt + 1; + step_state.status = StepStatus::Running; + state_manager.save_step_state(flow_id, &step_state).await?; + + match Self::execute_single_step(step, &mock_api, &redis_url).await { + Ok(output) => { + step_state.status = StepStatus::Completed; + step_state.output = Some(output); + state_manager.save_step_state(flow_id, &step_state).await?; + return Ok(()); + } + Err(e) => { + if attempt + 1 >= max_attempts { + step_state.status = StepStatus::Failed; + state_manager.save_step_state(flow_id, &step_state).await?; + return Err(e); + } + // Wait before retry + tokio::time::sleep(Duration::from_millis(1000)).await; + } + } + } + + Err("Max retries exceeded".into()) + } + + /// Execute a single step + async fn execute_single_step( + step: &Step, + mock_api: &MockAPI, + redis_url: &str, + ) -> Result> { + // Execute with timeout + let result = timeout(step.timeout(), async { + // For demo, we'll use mock API calls instead of real Rhai execution + // In real implementation, this would execute the Rhai script + if step.script.contains("mock_api_call") { + // Extract endpoint from script (simple parsing) + let endpoint = if step.script.contains("stripe_config") { + "stripe_config" + } else if step.script.contains("create_product") { + "create_product" + } else { + "default_endpoint" + }; + + mock_api.call(endpoint).await + } else { + // For non-mock scripts, simulate Rhai execution via dispatcher + Self::execute_rhai_script(&step.script, redis_url).await + } + }).await; + + match result { + Ok(Ok(output)) => Ok(output), + Ok(Err(e)) => Err(e.into()), + Err(_) => Err("Step execution timed out".into()), + } + } + + /// Execute Rhai script using dispatcher (simplified) + async fn execute_rhai_script( + script: &str, + redis_url: &str, + ) -> Result> { + let dispatcher = RhaiDispatcherBuilder::new() + .caller_id("flow_executor") + .redis_url(redis_url) + .build()?; + + let result = dispatcher + .new_play_request() + .worker_id("flow_worker") + .script(script) + .timeout(Duration::from_secs(30)) + .await_response() + .await; + + match result { + Ok(task_details) => { + if task_details.status == "completed" { + Ok(task_details.output.unwrap_or_default()) + } else { + Err(format!("Script execution failed: {:?}", task_details.error).into()) + } + } + Err(e) => Err(format!("Dispatcher error: {}", e).into()), + } + } + + /// Get flow status + pub async fn get_flow_status(&self, flow_id: &str) -> Result, Box> { + self.state_manager.load_flow_state(flow_id).await + } + + /// Get step status + pub async fn get_step_status(&self, flow_id: &str, step_id: &str) -> Result, Box> { + self.state_manager.load_step_state(flow_id, step_id).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::builder::{new_step, new_flow}; + + #[tokio::test] + async fn test_flow_execution() { + // This test requires Redis to be running + // Skip if Redis is not available + if std::env::var("SKIP_REDIS_TESTS").is_ok() { + return; + } + + let executor = FlowExecutor::new("redis://127.0.0.1/").await.unwrap(); + + let step1 = new_step("test_step") + .script("mock_api_call stripe_config") + .timeout(5) + .retries(1) + .build(); + + let flow = new_flow("test_flow") + .add_step(step1) + .build(); + + let result = executor.execute_flow(flow).await; + assert!(result.is_ok()); + assert!(result.unwrap().starts_with("flow_execution_started:")); + } +} \ No newline at end of file diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs new file mode 100644 index 0000000..7a69a1e --- /dev/null +++ b/src/flow/src/lib.rs @@ -0,0 +1,20 @@ +//! Simple Flow Manager for Rhai Scripts +//! +//! Provides a minimal flow execution system with builder patterns: +//! - `new_step("name").script("script").timeout(5).retries(2)` +//! - `new_flow("name").add_step(step1).add_step(step2)` + +pub mod types; +pub mod builder; +pub mod executor; +pub mod state; +pub mod mock_api; + +pub use types::{Flow, Step, FlowStatus, StepStatus}; +pub use builder::{StepBuilder, FlowBuilder, new_step, new_flow}; +pub use executor::FlowExecutor; +pub use state::{FlowState, StepState, StateManager}; +pub use mock_api::MockAPI; + +// Re-export for convenience +pub use rhai_dispatcher::RhaiDispatcherBuilder; \ No newline at end of file diff --git a/src/flow/src/mock_api.rs b/src/flow/src/mock_api.rs new file mode 100644 index 0000000..44f6051 --- /dev/null +++ b/src/flow/src/mock_api.rs @@ -0,0 +1,144 @@ +//! Simple mock API for testing different response types and durations + +use serde::{Serialize, Deserialize}; +use std::time::Duration; +use std::collections::HashMap; + +/// Mock API response types +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum MockResponseType { + Success, + Failure, + Timeout, +} + +/// Mock API scenario configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MockScenario { + pub response_type: MockResponseType, + pub delay_ms: u64, + pub response_data: String, +} + +impl MockScenario { + pub fn success(delay_ms: u64, data: &str) -> Self { + Self { + response_type: MockResponseType::Success, + delay_ms, + response_data: data.to_string(), + } + } + + pub fn failure(delay_ms: u64, error: &str) -> Self { + Self { + response_type: MockResponseType::Failure, + delay_ms, + response_data: error.to_string(), + } + } + + pub fn timeout(delay_ms: u64) -> Self { + Self { + response_type: MockResponseType::Timeout, + delay_ms, + response_data: "Request timed out".to_string(), + } + } +} + +/// Simple mock API for testing +pub struct MockAPI { + scenarios: HashMap, +} + +impl MockAPI { + pub fn new() -> Self { + Self { + scenarios: HashMap::new(), + } + } + + /// Add a mock scenario for an endpoint + pub fn add_scenario(&mut self, endpoint: &str, scenario: MockScenario) { + self.scenarios.insert(endpoint.to_string(), scenario); + } + + /// Call a mock endpoint + pub async fn call(&self, endpoint: &str) -> Result { + match self.scenarios.get(endpoint) { + Some(scenario) => { + // Simulate delay + tokio::time::sleep(Duration::from_millis(scenario.delay_ms)).await; + + match scenario.response_type { + MockResponseType::Success => Ok(scenario.response_data.clone()), + MockResponseType::Failure => Err(scenario.response_data.clone()), + MockResponseType::Timeout => { + // For timeout, we just return an error after the delay + Err("Request timed out".to_string()) + } + } + } + None => Err(format!("Unknown endpoint: {}", endpoint)), + } + } + + /// Setup common test scenarios + pub fn setup_test_scenarios(&mut self) { + // Fast success + self.add_scenario("stripe_config", MockScenario::success(100, r#"{"status": "configured"}"#)); + + // Slow success + self.add_scenario("create_product", MockScenario::success(2000, r#"{"id": "prod_123", "name": "Test Product"}"#)); + + // Fast failure + self.add_scenario("invalid_endpoint", MockScenario::failure(50, "Invalid API key")); + + // Timeout scenario + self.add_scenario("slow_endpoint", MockScenario::timeout(5000)); + + // Variable responses for testing retries + self.add_scenario("flaky_endpoint", MockScenario::failure(500, "Temporary server error")); + } +} + +impl Default for MockAPI { + fn default() -> Self { + let mut api = Self::new(); + api.setup_test_scenarios(); + api + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_mock_api_success() { + let mut api = MockAPI::new(); + api.add_scenario("test", MockScenario::success(10, "success")); + + let result = api.call("test").await; + assert!(result.is_ok()); + assert_eq!(result.unwrap(), "success"); + } + + #[tokio::test] + async fn test_mock_api_failure() { + let mut api = MockAPI::new(); + api.add_scenario("test", MockScenario::failure(10, "error")); + + let result = api.call("test").await; + assert!(result.is_err()); + assert_eq!(result.unwrap_err(), "error"); + } + + #[tokio::test] + async fn test_mock_api_unknown_endpoint() { + let api = MockAPI::new(); + let result = api.call("unknown").await; + assert!(result.is_err()); + assert!(result.unwrap_err().contains("Unknown endpoint")); + } +} \ No newline at end of file diff --git a/src/flow/src/state.rs b/src/flow/src/state.rs new file mode 100644 index 0000000..57d1e41 --- /dev/null +++ b/src/flow/src/state.rs @@ -0,0 +1,100 @@ +//! Simple state management for flows and steps + +use serde::{Serialize, Deserialize}; +use crate::types::{FlowStatus, StepStatus}; + +/// Minimal flow state tracking +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FlowState { + pub flow_id: String, + pub status: FlowStatus, + pub current_step: Option, + pub completed_steps: Vec, +} + +impl FlowState { + pub fn new(flow_id: String) -> Self { + Self { + flow_id, + status: FlowStatus::Created, + current_step: None, + completed_steps: Vec::new(), + } + } +} + +/// Minimal step state tracking +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StepState { + pub step_id: String, + pub status: StepStatus, + pub attempt_count: u32, + pub output: Option, +} + +impl StepState { + pub fn new(step_id: String) -> Self { + Self { + step_id, + status: StepStatus::Pending, + attempt_count: 0, + output: None, + } + } +} + +/// Simple Redis-based state manager +pub struct StateManager { + redis_client: redis::Client, +} + +impl StateManager { + pub async fn new(redis_url: &str) -> Result> { + let client = redis::Client::open(redis_url)?; + Ok(Self { + redis_client: client, + }) + } + + /// Save flow state to Redis + pub async fn save_flow_state(&self, state: &FlowState) -> Result<(), Box> { + let mut conn = self.redis_client.get_async_connection().await?; + let key = format!("flow:{}", state.flow_id); + let json = serde_json::to_string(state)?; + redis::cmd("SET").arg(&key).arg(&json).query_async(&mut conn).await?; + Ok(()) + } + + /// Load flow state from Redis + pub async fn load_flow_state(&self, flow_id: &str) -> Result, Box> { + let mut conn = self.redis_client.get_async_connection().await?; + let key = format!("flow:{}", flow_id); + let result: Option = redis::cmd("GET").arg(&key).query_async(&mut conn).await?; + + match result { + Some(json) => Ok(Some(serde_json::from_str(&json)?)), + None => Ok(None), + } + } + + /// Save step state to Redis + pub async fn save_step_state(&self, flow_id: &str, state: &StepState) -> Result<(), Box> { + let mut conn = self.redis_client.get_async_connection().await?; + let key = format!("step:{}:{}", flow_id, state.step_id); + let json = serde_json::to_string(state)?; + redis::cmd("SET").arg(&key).arg(&json).query_async(&mut conn).await?; + Ok(()) + } + + /// Load step state from Redis + pub async fn load_step_state(&self, flow_id: &str, step_id: &str) -> Result, Box> { + let mut conn = self.redis_client.get_async_connection().await?; + let key = format!("step:{}:{}", flow_id, step_id); + let result: Option = redis::cmd("GET").arg(&key).query_async(&mut conn).await?; + + match result { + Some(json) => Ok(Some(serde_json::from_str(&json)?)), + None => Ok(None), + } + } +} \ No newline at end of file diff --git a/src/flow/src/types.rs b/src/flow/src/types.rs new file mode 100644 index 0000000..cd7bd39 --- /dev/null +++ b/src/flow/src/types.rs @@ -0,0 +1,66 @@ +//! Core types for the flow manager + +use serde::{Serialize, Deserialize}; +use std::time::Duration; + +/// Simple flow status enumeration +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum FlowStatus { + Created, + Running, + Completed, + Failed, +} + +/// Simple step status enumeration +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum StepStatus { + Pending, + Running, + Completed, + Failed, +} + +/// A single step in a flow +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Step { + pub id: String, + pub name: String, + pub script: String, + pub timeout_seconds: u64, + pub max_retries: u32, +} + +impl Step { + pub fn new(name: &str) -> Self { + Self { + id: uuid::Uuid::new_v4().to_string(), + name: name.to_string(), + script: String::new(), + timeout_seconds: 30, // default 30 seconds + max_retries: 0, // default no retries + } + } + + pub fn timeout(&self) -> Duration { + Duration::from_secs(self.timeout_seconds) + } +} + +/// A flow containing multiple steps +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Flow { + pub id: String, + pub name: String, + pub steps: Vec, +} + +impl Flow { + pub fn new(name: &str) -> Self { + Self { + id: uuid::Uuid::new_v4().to_string(), + name: name.to_string(), + steps: Vec::new(), + } + } +} \ No newline at end of file