Compare commits

...

5 Commits

Author SHA1 Message Date
Timur Gordon
02d9f5937e remove local deps paths 2025-08-06 15:09:52 +02:00
Timur Gordon
7c646106d6 remove local deps paths 2025-08-06 15:06:54 +02:00
Timur Gordon
c37be2dfcc archive / remove access control from everything 2025-08-06 14:26:51 +02:00
Timur Gordon
6d271068fc rhailib fixes 2025-08-05 16:06:48 +02:00
Timur Gordon
9c03b5ed37 workflow orchestration wip 2025-07-21 20:56:29 +02:00
62 changed files with 3706 additions and 161 deletions

664
Cargo.lock generated
View File

@@ -123,6 +123,28 @@ version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
[[package]]
name = "async-stream"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
dependencies = [
"async-stream-impl",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-stream-impl"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.101",
]
[[package]]
name = "async-trait"
version = "0.1.88"
@@ -161,6 +183,12 @@ version = "0.21.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
[[package]]
name = "base64"
version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
[[package]]
name = "bincode"
version = "2.0.1"
@@ -193,12 +221,27 @@ version = "2.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967"
[[package]]
name = "block-buffer"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71"
dependencies = [
"generic-array",
]
[[package]]
name = "bumpalo"
version = "3.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf"
[[package]]
name = "byteorder"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.10.1"
@@ -327,6 +370,16 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
[[package]]
name = "colored"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c"
dependencies = [
"lazy_static",
"windows-sys 0.59.0",
]
[[package]]
name = "combine"
version = "4.6.7"
@@ -377,6 +430,15 @@ version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "cpufeatures"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280"
dependencies = [
"libc",
]
[[package]]
name = "crc32fast"
version = "1.4.2"
@@ -453,6 +515,16 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43da5946c66ffcc7745f48db692ffbb10a83bfe0afd96235c5c2a4fb23994929"
[[package]]
name = "crypto-common"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
dependencies = [
"generic-array",
"typenum",
]
[[package]]
name = "csv"
version = "1.3.1"
@@ -474,6 +546,12 @@ dependencies = [
"memchr",
]
[[package]]
name = "data-encoding"
version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476"
[[package]]
name = "derive"
version = "0.1.0"
@@ -482,6 +560,17 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "digest"
version = "0.10.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
dependencies = [
"block-buffer",
"crypto-common",
"subtle",
]
[[package]]
name = "dirs"
version = "4.0.0"
@@ -590,6 +679,24 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "ethnum"
version = "1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca81e6b4777c89fd810c25a4be2b1bd93ea034fbe58e6a75216a34c6b82c539b"
[[package]]
name = "fallible-iterator"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
[[package]]
name = "fast-float2"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8eb564c5c7423d25c886fb561d1e4ee69f72354d16918afa32c08811f6b6a55"
[[package]]
name = "fastrand"
version = "2.3.0"
@@ -715,6 +822,16 @@ dependencies = [
"slab",
]
[[package]]
name = "generic-array"
version = "0.14.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a"
dependencies = [
"typenum",
"version_check",
]
[[package]]
name = "getrandom"
version = "0.2.16"
@@ -797,17 +914,42 @@ version = "0.1.0"
dependencies = [
"bincode",
"chrono",
"derive",
"heromodels-derive",
"heromodels_core",
"ourdb",
"heromodels-derive 0.1.0",
"heromodels_core 0.1.0",
"jsonb",
"ourdb 0.1.0",
"postgres",
"r2d2",
"r2d2_postgres",
"rhai",
"rhai_client_macros",
"serde",
"serde_json",
"strum",
"strum_macros",
"tst",
"tst 0.1.0",
"uuid",
]
[[package]]
name = "heromodels"
version = "0.1.0"
source = "git+https://git.ourworld.tf/herocode/db.git#453e86edd24d6009f0b154ac777cc66dc5f3bf76"
dependencies = [
"bincode",
"chrono",
"heromodels-derive 0.1.0 (git+https://git.ourworld.tf/herocode/db.git)",
"heromodels_core 0.1.0 (git+https://git.ourworld.tf/herocode/db.git)",
"jsonb",
"ourdb 0.1.0 (git+https://git.ourworld.tf/herocode/db.git)",
"postgres",
"r2d2",
"r2d2_postgres",
"rhai",
"serde",
"serde_json",
"strum",
"strum_macros",
"tst 0.1.0 (git+https://git.ourworld.tf/herocode/db.git)",
"uuid",
]
@@ -820,6 +962,16 @@ dependencies = [
"syn 2.0.101",
]
[[package]]
name = "heromodels-derive"
version = "0.1.0"
source = "git+https://git.ourworld.tf/herocode/db.git#453e86edd24d6009f0b154ac777cc66dc5f3bf76"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.101",
]
[[package]]
name = "heromodels_core"
version = "0.1.0"
@@ -828,6 +980,24 @@ dependencies = [
"serde",
]
[[package]]
name = "heromodels_core"
version = "0.1.0"
source = "git+https://git.ourworld.tf/herocode/db.git#453e86edd24d6009f0b154ac777cc66dc5f3bf76"
dependencies = [
"chrono",
"serde",
]
[[package]]
name = "hmac"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
dependencies = [
"digest",
]
[[package]]
name = "home"
version = "0.5.11"
@@ -1102,6 +1272,47 @@ version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
[[package]]
name = "jiff"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be1f93b8b1eb69c77f24bbb0afdf66f54b632ee39af40ca21c4365a1d7347e49"
dependencies = [
"jiff-static",
"jiff-tzdb-platform",
"log",
"portable-atomic",
"portable-atomic-util",
"serde",
"windows-sys 0.59.0",
]
[[package]]
name = "jiff-static"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03343451ff899767262ec32146f6d559dd759fdadf42ff0e227c7c48f72594b4"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.101",
]
[[package]]
name = "jiff-tzdb"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1283705eb0a21404d2bfd6eef2a7593d240bc42a0bdb39db0ad6fa2ec026524"
[[package]]
name = "jiff-tzdb-platform"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "875a5a69ac2bab1a891711cf5eccbec1ce0341ea805560dcd90b7a2e925132e8"
dependencies = [
"jiff-tzdb",
]
[[package]]
name = "js-sys"
version = "0.3.77"
@@ -1112,6 +1323,26 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "jsonb"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96cbb4fba292867a2d86ed83dbe5f9d036f423bf6a491b7d884058b2fde42fcd"
dependencies = [
"byteorder",
"ethnum",
"fast-float2",
"itoa",
"jiff",
"nom 8.0.0",
"num-traits",
"ordered-float",
"rand 0.9.2",
"ryu",
"serde",
"serde_json",
]
[[package]]
name = "lazy_static"
version = "1.5.0"
@@ -1172,12 +1403,22 @@ checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
name = "macros"
version = "0.1.0"
dependencies = [
"heromodels",
"heromodels_core",
"heromodels 0.1.0",
"heromodels_core 0.1.0",
"rhai",
"serde",
]
[[package]]
name = "md-5"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf"
dependencies = [
"cfg-if",
"digest",
]
[[package]]
name = "memchr"
version = "2.7.4"
@@ -1279,6 +1520,15 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "nom"
version = "8.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df9761775871bdef83bee530e60050f7e54b1105350d6884eb0fb4f46c2f9405"
dependencies = [
"memchr",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
@@ -1372,13 +1622,59 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "orchestrator"
version = "0.1.0"
dependencies = [
"async-trait",
"chrono",
"futures",
"futures-util",
"heromodels 0.1.0",
"heromodels_core 0.1.0",
"log",
"reqwest",
"rhai",
"rhai_dispatcher",
"rhailib_dsl",
"serde",
"serde_json",
"thiserror",
"tokio",
"tokio-test",
"tokio-tungstenite",
"tracing",
"tracing-subscriber",
"uuid",
]
[[package]]
name = "ordered-float"
version = "5.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2c1f9f56e534ac6a9b8a4600bdf0f530fb393b5f393e7b4d03489c3cf0c3f01"
dependencies = [
"num-traits",
]
[[package]]
name = "ourdb"
version = "0.1.0"
dependencies = [
"crc32fast",
"log",
"rand",
"rand 0.8.5",
"thiserror",
]
[[package]]
name = "ourdb"
version = "0.1.0"
source = "git+https://git.ourworld.tf/herocode/db.git#453e86edd24d6009f0b154ac777cc66dc5f3bf76"
dependencies = [
"crc32fast",
"log",
"rand 0.8.5",
"thiserror",
]
@@ -1443,7 +1739,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d"
dependencies = [
"phf_shared",
"rand",
"rand 0.8.5",
]
[[package]]
@@ -1507,6 +1803,60 @@ version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "350e9b48cbc6b0e028b0473b114454c6316e57336ee184ceab6e53f72c178b3e"
[[package]]
name = "portable-atomic-util"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507"
dependencies = [
"portable-atomic",
]
[[package]]
name = "postgres"
version = "0.19.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "363e6dfbdd780d3aa3597b6eb430db76bb315fa9bad7fae595bb8def808b8470"
dependencies = [
"bytes",
"fallible-iterator",
"futures-util",
"log",
"tokio",
"tokio-postgres",
]
[[package]]
name = "postgres-protocol"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76ff0abab4a9b844b93ef7b81f1efc0a366062aaef2cd702c76256b5dc075c54"
dependencies = [
"base64 0.22.1",
"byteorder",
"bytes",
"fallible-iterator",
"hmac",
"md-5",
"memchr",
"rand 0.9.2",
"sha2",
"stringprep",
]
[[package]]
name = "postgres-types"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613283563cd90e1dfc3518d548caee47e0e725455ed619881f5cf21f36de4b48"
dependencies = [
"bytes",
"fallible-iterator",
"postgres-protocol",
"serde",
"serde_json",
]
[[package]]
name = "potential_utf"
version = "0.1.2"
@@ -1563,6 +1913,27 @@ version = "5.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5"
[[package]]
name = "r2d2"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
dependencies = [
"log",
"parking_lot",
"scheduled-thread-pool",
]
[[package]]
name = "r2d2_postgres"
version = "0.18.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "efd4b47636dbca581cd057e2f27a5d39be741ea4f85fd3c29e415c55f71c7595"
dependencies = [
"postgres",
"r2d2",
]
[[package]]
name = "rand"
version = "0.8.5"
@@ -1570,8 +1941,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
"rand_chacha 0.3.1",
"rand_core 0.6.4",
]
[[package]]
name = "rand"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1"
dependencies = [
"rand_chacha 0.9.0",
"rand_core 0.9.3",
]
[[package]]
@@ -1581,7 +1962,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
"rand_core 0.6.4",
]
[[package]]
name = "rand_chacha"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb"
dependencies = [
"ppv-lite86",
"rand_core 0.9.3",
]
[[package]]
@@ -1593,6 +1984,15 @@ dependencies = [
"getrandom 0.2.16",
]
[[package]]
name = "rand_core"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38"
dependencies = [
"getrandom 0.3.3",
]
[[package]]
name = "rayon"
version = "1.10.0"
@@ -1689,7 +2089,7 @@ version = "0.11.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62"
dependencies = [
"base64",
"base64 0.21.7",
"bytes",
"encoding_rs",
"futures-core",
@@ -1742,16 +2142,6 @@ dependencies = [
"thin-vec",
]
[[package]]
name = "rhai_client_macros"
version = "0.1.0"
dependencies = [
"proc-macro2",
"quote",
"rhai",
"syn 2.0.101",
]
[[package]]
name = "rhai_codegen"
version = "2.2.0"
@@ -1769,6 +2159,7 @@ version = "0.1.0"
dependencies = [
"chrono",
"clap",
"colored",
"env_logger",
"log",
"redis",
@@ -1808,9 +2199,9 @@ dependencies = [
"chrono",
"derive",
"dotenv",
"heromodels",
"heromodels-derive",
"heromodels_core",
"heromodels 0.1.0 (git+https://git.ourworld.tf/herocode/db.git)",
"heromodels-derive 0.1.0 (git+https://git.ourworld.tf/herocode/db.git)",
"heromodels_core 0.1.0 (git+https://git.ourworld.tf/herocode/db.git)",
"macros",
"reqwest",
"rhai",
@@ -1818,6 +2209,7 @@ dependencies = [
"serde",
"serde_json",
"tempfile",
"thiserror",
"tokio",
]
@@ -1826,9 +2218,9 @@ name = "rhailib_engine"
version = "0.1.0"
dependencies = [
"chrono",
"heromodels",
"heromodels-derive",
"heromodels_core",
"heromodels 0.1.0",
"heromodels-derive 0.1.0",
"heromodels_core 0.1.0",
"rhai",
"rhailib_dsl",
]
@@ -1840,7 +2232,7 @@ dependencies = [
"chrono",
"clap",
"env_logger",
"heromodels",
"heromodels 0.1.0",
"log",
"redis",
"rhai",
@@ -1900,7 +2292,7 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c"
dependencies = [
"base64",
"base64 0.21.7",
]
[[package]]
@@ -1933,6 +2325,15 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "scheduled-thread-pool"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
dependencies = [
"parking_lot",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
@@ -1988,6 +2389,7 @@ version = "1.0.140"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373"
dependencies = [
"indexmap",
"itoa",
"memchr",
"ryu",
@@ -2006,12 +2408,34 @@ dependencies = [
"serde",
]
[[package]]
name = "sha1"
version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
]
[[package]]
name = "sha1_smol"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d"
[[package]]
name = "sha2"
version = "0.10.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
]
[[package]]
name = "sharded-slab"
version = "0.1.7"
@@ -2096,6 +2520,17 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "stringprep"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1"
dependencies = [
"unicode-bidi",
"unicode-normalization",
"unicode-properties",
]
[[package]]
name = "strsim"
version = "0.11.1"
@@ -2121,6 +2556,12 @@ dependencies = [
"syn 2.0.101",
]
[[package]]
name = "subtle"
version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "syn"
version = "1.0.109"
@@ -2222,7 +2663,7 @@ checksum = "666cd3a6681775d22b200409aad3b089c5b99fb11ecdd8a204d9d62f8148498f"
dependencies = [
"dirs",
"fnv",
"nom",
"nom 7.1.3",
"phf",
"phf_codegen",
]
@@ -2292,6 +2733,21 @@ dependencies = [
"serde_json",
]
[[package]]
name = "tinyvec"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09b3661f17e86524eccd4371ab0429194e0d7c008abb45f7a7495b1719463c71"
dependencies = [
"tinyvec_macros",
]
[[package]]
name = "tinyvec_macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.45.1"
@@ -2331,6 +2787,68 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-postgres"
version = "0.7.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c95d533c83082bb6490e0189acaa0bbeef9084e60471b696ca6988cd0541fb0"
dependencies = [
"async-trait",
"byteorder",
"bytes",
"fallible-iterator",
"futures-channel",
"futures-util",
"log",
"parking_lot",
"percent-encoding",
"phf",
"pin-project-lite",
"postgres-protocol",
"postgres-types",
"rand 0.9.2",
"socket2",
"tokio",
"tokio-util",
"whoami",
]
[[package]]
name = "tokio-stream"
version = "0.1.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-test"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7"
dependencies = [
"async-stream",
"bytes",
"futures-core",
"tokio",
"tokio-stream",
]
[[package]]
name = "tokio-tungstenite"
version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite",
]
[[package]]
name = "tokio-util"
version = "0.7.15"
@@ -2417,16 +2935,71 @@ checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
name = "tst"
version = "0.1.0"
dependencies = [
"ourdb",
"ourdb 0.1.0",
"thiserror",
]
[[package]]
name = "tst"
version = "0.1.0"
source = "git+https://git.ourworld.tf/herocode/db.git#453e86edd24d6009f0b154ac777cc66dc5f3bf76"
dependencies = [
"ourdb 0.1.0 (git+https://git.ourworld.tf/herocode/db.git)",
"thiserror",
]
[[package]]
name = "tungstenite"
version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9"
dependencies = [
"byteorder",
"bytes",
"data-encoding",
"http",
"httparse",
"log",
"rand 0.8.5",
"sha1",
"thiserror",
"url",
"utf-8",
]
[[package]]
name = "typenum"
version = "1.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f"
[[package]]
name = "unicode-bidi"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5"
[[package]]
name = "unicode-ident"
version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512"
[[package]]
name = "unicode-normalization"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956"
dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-properties"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0"
[[package]]
name = "unicode-width"
version = "0.1.14"
@@ -2450,6 +3023,12 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "utf-8"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]]
name = "utf8_iter"
version = "1.0.4"
@@ -2532,6 +3111,12 @@ dependencies = [
"wit-bindgen-rt",
]
[[package]]
name = "wasite"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b"
[[package]]
name = "wasm-bindgen"
version = "0.2.100"
@@ -2625,6 +3210,17 @@ dependencies = [
"rustix 0.38.44",
]
[[package]]
name = "whoami"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6994d13118ab492c3c80c1f81928718159254c53c472bf9ce36f8dae4add02a7"
dependencies = [
"redox_syscall",
"wasite",
"web-sys",
]
[[package]]
name = "winapi"
version = "0.3.9"

View File

@@ -35,6 +35,7 @@ members = [
"src/engine",
"src/worker",
"src/monitor", # Added the new monitor package to workspace
"src/orchestrator", # Added the new orchestrator package to workspace
"src/macros", "src/dsl", "src/derive",
]
resolver = "2" # Recommended for new workspaces

View File

@@ -17,6 +17,7 @@ uuid = { version = "1.6", features = ["v4", "serde"] }
chrono = { version = "0.4", features = ["serde"] }
log = "0.4"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] } # For async main in examples, and general async
colored = "2.0"
[dev-dependencies] # For examples later
env_logger = "0.10"

View File

@@ -1,6 +1,7 @@
use clap::Parser;
use rhai_dispatcher::{RhaiDispatcher, RhaiDispatcherBuilder};
use log::{error, info};
use colored::Colorize;
use std::io::{self, Write};
use std::time::Duration;
@@ -50,10 +51,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure logging based on verbosity level
let log_config = match args.verbose {
0 => "warn,rhai_dispatcher=info",
1 => "info,rhai_dispatcher=debug",
2 => "debug",
_ => "trace",
0 => "warn,rhai_dispatcher=warn",
1 => "info,rhai_dispatcher=info",
2 => "debug,rhai_dispatcher=debug",
_ => "trace,rhai_dispatcher=trace",
};
std::env::set_var("RUST_LOG", log_config);
@@ -67,14 +68,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
}
info!("🔗 Starting Rhai Dispatcher");
info!("📋 Configuration:");
info!(" Caller ID: {}", args.caller_id);
info!(" Context ID: {}", args.context_id);
info!(" Worker ID: {}", args.worker_id);
info!(" Redis URL: {}", args.redis_url);
info!(" Timeout: {}s", args.timeout);
info!("");
if args.verbose > 0 {
info!("🔗 Starting Rhai Dispatcher");
info!("📋 Configuration:");
info!(" Caller ID: {}", args.caller_id);
info!(" Context ID: {}", args.context_id);
info!(" Worker ID: {}", args.worker_id);
info!(" Redis URL: {}", args.redis_url);
info!(" Timeout: {}s", args.timeout);
info!("");
}
// Create the Rhai client
let client = RhaiDispatcherBuilder::new()
@@ -84,16 +87,22 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.redis_url(&args.redis_url)
.build()?;
info!("✅ Connected to Redis at {}", args.redis_url);
if args.verbose > 0 {
info!("✅ Connected to Redis at {}", args.redis_url);
}
// Determine execution mode
if let Some(script_content) = args.script {
// Execute inline script
info!("📜 Executing inline script");
if args.verbose > 0 {
info!("📜 Executing inline script");
}
execute_script(&client, script_content, args.timeout).await?;
} else if let Some(file_path) = args.file {
// Execute script from file
info!("📁 Loading script from file: {}", file_path);
if args.verbose > 0 {
info!("📁 Loading script from file: {}", file_path);
}
let script_content = std::fs::read_to_string(&file_path)
.map_err(|e| format!("Failed to read script file '{}': {}", file_path, e))?;
execute_script(&client, script_content, args.timeout).await?;
@@ -101,7 +110,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Interactive mode
info!("🎮 Entering interactive mode");
info!("Type Rhai scripts and press Enter to execute. Type 'exit' or 'quit' to close.");
run_interactive_mode(&client, args.timeout).await?;
run_interactive_mode(&client, args.timeout, args.verbose).await?;
}
Ok(())
@@ -145,6 +154,7 @@ async fn execute_script(
async fn run_interactive_mode(
client: &RhaiDispatcher,
timeout_secs: u64,
verbose: u8,
) -> Result<(), Box<dyn std::error::Error>> {
let timeout = Duration::from_secs(timeout_secs);
@@ -166,7 +176,9 @@ async fn run_interactive_mode(
break;
}
info!("⚡ Executing: {}", input);
if verbose > 0 {
info!("⚡ Executing: {}", input);
}
match client
.new_play_request()
@@ -176,16 +188,15 @@ async fn run_interactive_mode(
.await
{
Ok(result) => {
println!("Status: {}", result.status);
if let Some(output) = result.output {
println!("Output: {}", output);
println!("{}", output.color("green"));
}
if let Some(error) = result.error {
println!("Error: {}", error);
println!("{}", format!("error: {}", error).color("red"));
}
}
Err(e) => {
error!("❌ Execution failed: {}", e);
println!("{}", format!("error: {}", e).red());
}
}

View File

@@ -262,16 +262,17 @@ impl RhaiDispatcherBuilder {
}
}
/// Internal representation of a script execution request.
/// Representation of a script execution request.
///
/// This structure contains all the information needed to execute a Rhai script
/// on a worker service, including the script content, target worker, and timeout.
#[derive(Debug, Clone)]
pub struct PlayRequest {
id: String,
worker_id: String,
context_id: String,
script: String,
timeout: Duration,
pub id: String,
pub worker_id: String,
pub context_id: String,
pub script: String,
pub timeout: Duration,
}
/// Builder for constructing and submitting script execution requests.
@@ -301,6 +302,7 @@ pub struct PlayRequestBuilder<'a> {
caller_id: String,
script: String,
timeout: Duration,
retries: u32,
}
impl<'a> PlayRequestBuilder<'a> {
@@ -312,7 +314,8 @@ impl<'a> PlayRequestBuilder<'a> {
context_id: client.context_id.clone(),
caller_id: client.caller_id.clone(),
script: "".to_string(),
timeout: Duration::from_secs(10),
timeout: Duration::from_secs(5),
retries: 0,
}
}
@@ -384,10 +387,6 @@ impl<'a> PlayRequestBuilder<'a> {
pub async fn await_response(self) -> Result<RhaiTaskDetails, RhaiDispatcherError> {
// Build the request and submit using self.client
println!(
"Awaiting response for request {} with timeout {:?}",
self.request_id, self.timeout
);
let result = self
.client
.submit_play_request_and_await_result(&self.build()?)

View File

@@ -21,8 +21,8 @@ mod rhai_flow_module {
use super::{Array, Dynamic, RhaiFlow, RhaiFlowStep, INT};
#[rhai_fn(name = "new_flow", return_raw)]
pub fn new_flow(flow_uuid: String) -> Result<RhaiFlow, Box<EvalAltResult>> {
Ok(Flow::new(flow_uuid))
pub fn new_flow() -> Result<RhaiFlow, Box<EvalAltResult>> {
Ok(Flow::new())
}
// --- Setters ---
@@ -55,10 +55,7 @@ mod rhai_flow_module {
pub fn get_id(f: &mut RhaiFlow) -> INT {
f.base_data.id as INT
}
#[rhai_fn(get = "flow_uuid", pure)]
pub fn get_flow_uuid(f: &mut RhaiFlow) -> String {
f.flow_uuid.clone()
}
#[rhai_fn(get = "name", pure)]
pub fn get_name(f: &mut RhaiFlow) -> String {
f.name.clone()
@@ -97,5 +94,4 @@ pub fn register_flow_rhai_module(engine: &mut Engine) {
);
engine.register_global_module(module.into());
println!("Successfully registered flow Rhai module.");
}

View File

@@ -34,17 +34,6 @@ mod rhai_flow_step_module {
Ok(step.clone())
}
#[rhai_fn(name = "step_order", return_raw)]
pub fn set_step_order(
step: &mut RhaiFlowStep,
step_order: INT,
) -> Result<RhaiFlowStep, Box<EvalAltResult>> {
let mut owned = std::mem::take(step);
owned.step_order = step_order as u32;
*step = owned;
Ok(step.clone())
}
#[rhai_fn(name = "status", return_raw)]
pub fn set_status(
step: &mut RhaiFlowStep,
@@ -64,10 +53,6 @@ mod rhai_flow_step_module {
pub fn get_description(s: &mut RhaiFlowStep) -> Option<String> {
s.description.clone()
}
#[rhai_fn(get = "step_order", pure)]
pub fn get_step_order(s: &mut RhaiFlowStep) -> INT {
s.step_order as INT
}
#[rhai_fn(get = "status", pure)]
pub fn get_status(s: &mut RhaiFlowStep) -> String {
s.status.clone()
@@ -98,5 +83,4 @@ pub fn register_flow_step_rhai_module(engine: &mut Engine) {
);
engine.register_global_module(module.into());
println!("Successfully registered flow_step Rhai module.");
}

View File

@@ -3,10 +3,15 @@ use rhai::Engine;
pub mod flow;
pub mod flow_step;
pub mod signature_requirement;
pub mod orchestrated_flow;
pub mod orchestrated_flow_step;
// Re-export the orchestrated models for easy access
pub use orchestrated_flow::{OrchestratedFlow, OrchestratorError, FlowStatus};
pub use orchestrated_flow_step::OrchestratedFlowStep;
pub fn register_flow_rhai_modules(engine: &mut Engine) {
flow::register_flow_rhai_module(engine);
flow_step::register_flow_step_rhai_module(engine);
signature_requirement::register_signature_requirement_rhai_module(engine);
println!("Successfully registered flow Rhai modules.");
}

View File

@@ -0,0 +1,154 @@
//! Orchestrated Flow model for DAG-based workflow execution
use heromodels_core::BaseModelData;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use thiserror::Error;
use super::orchestrated_flow_step::OrchestratedFlowStep;
/// Extended Flow with orchestrator-specific steps
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OrchestratedFlow {
/// Base model data (id, created_at, updated_at)
pub base_data: BaseModelData,
/// Name of the flow
pub name: String,
/// Orchestrated steps with dependencies
pub orchestrated_steps: Vec<OrchestratedFlowStep>,
}
impl OrchestratedFlow {
/// Create a new orchestrated flow
pub fn new(name: &str) -> Self {
Self {
base_data: BaseModelData::new(),
name: name.to_string(),
orchestrated_steps: Vec::new(),
}
}
/// Add a step to the flow
pub fn add_step(mut self, step: OrchestratedFlowStep) -> Self {
self.orchestrated_steps.push(step);
self
}
/// Get the flow ID
pub fn id(&self) -> u32 {
self.base_data.id
}
/// Validate the DAG structure (no cycles)
pub fn validate_dag(&self) -> Result<(), OrchestratorError> {
let mut visited = HashSet::new();
let mut rec_stack = HashSet::new();
for step in &self.orchestrated_steps {
if !visited.contains(&step.id()) {
if self.has_cycle(step.id(), &mut visited, &mut rec_stack)? {
return Err(OrchestratorError::CyclicDependency);
}
}
}
Ok(())
}
/// Check for cycles in the dependency graph
fn has_cycle(
&self,
step_id: u32,
visited: &mut HashSet<u32>,
rec_stack: &mut HashSet<u32>,
) -> Result<bool, OrchestratorError> {
visited.insert(step_id);
rec_stack.insert(step_id);
let step = self.orchestrated_steps
.iter()
.find(|s| s.id() == step_id)
.ok_or(OrchestratorError::StepNotFound(step_id))?;
for &dep_id in &step.depends_on {
if !visited.contains(&dep_id) {
if self.has_cycle(dep_id, visited, rec_stack)? {
return Ok(true);
}
} else if rec_stack.contains(&dep_id) {
return Ok(true);
}
}
rec_stack.remove(&step_id);
Ok(false)
}
}
/// Orchestrator errors
#[derive(Error, Debug)]
pub enum OrchestratorError {
#[error("Database error: {0}")]
DatabaseError(String),
#[error("Executor error: {0}")]
ExecutorError(String),
#[error("No ready steps found - possible deadlock")]
NoReadySteps,
#[error("Step {0} failed: {1:?}")]
StepFailed(u32, Option<String>),
#[error("Cyclic dependency detected in workflow")]
CyclicDependency,
#[error("Step {0} not found")]
StepNotFound(u32),
#[error("Invalid dependency: step {0} depends on non-existent step {1}")]
InvalidDependency(u32, u32),
}
/// Flow execution status
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum FlowStatus {
Pending,
Running,
Completed,
Failed,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_orchestrated_flow_builder() {
let step1 = OrchestratedFlowStep::new("step1").script("let x = 1;");
let step2 = OrchestratedFlowStep::new("step2").script("let y = 2;");
let flow = OrchestratedFlow::new("test_flow")
.add_step(step1)
.add_step(step2);
assert_eq!(flow.name, "test_flow");
assert_eq!(flow.orchestrated_steps.len(), 2);
}
#[test]
fn test_dag_validation_no_cycle() {
let step1 = OrchestratedFlowStep::new("step1").script("let x = 1;");
let step2 = OrchestratedFlowStep::new("step2")
.script("let y = 2;")
.depends_on(step1.id());
let flow = OrchestratedFlow::new("test_flow")
.add_step(step1)
.add_step(step2);
assert!(flow.validate_dag().is_ok());
}
}

View File

@@ -0,0 +1,124 @@
//! Orchestrated Flow Step model for DAG-based workflow execution
use heromodels_core::BaseModelData;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
/// Extended FlowStep with orchestrator-specific fields
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OrchestratedFlowStep {
/// Base model data (id, created_at, updated_at)
pub base_data: BaseModelData,
/// Name of the flow step
pub name: String,
/// Rhai script to execute
pub script: String,
/// IDs of steps this step depends on
pub depends_on: Vec<u32>,
/// Execution context (circle)
pub context_id: String,
/// Target worker for execution
pub worker_id: String,
/// Input parameters
pub inputs: HashMap<String, String>,
/// Output results
pub outputs: HashMap<String, String>,
}
impl OrchestratedFlowStep {
/// Create a new orchestrated flow step
pub fn new(name: &str) -> Self {
Self {
base_data: BaseModelData::new(),
name: name.to_string(),
script: String::new(),
depends_on: Vec::new(),
context_id: String::new(),
worker_id: String::new(),
inputs: HashMap::new(),
outputs: HashMap::new(),
}
}
/// Set the script content
pub fn script(mut self, script: &str) -> Self {
self.script = script.to_string();
self
}
/// Add a dependency on another step
pub fn depends_on(mut self, step_id: u32) -> Self {
self.depends_on.push(step_id);
self
}
/// Set the context ID
pub fn context_id(mut self, context_id: &str) -> Self {
self.context_id = context_id.to_string();
self
}
/// Set the worker ID
pub fn worker_id(mut self, worker_id: &str) -> Self {
self.worker_id = worker_id.to_string();
self
}
/// Add an input parameter
pub fn input(mut self, key: &str, value: &str) -> Self {
self.inputs.insert(key.to_string(), value.to_string());
self
}
/// Get the step ID
pub fn id(&self) -> u32 {
self.base_data.id
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_orchestrated_flow_step_builder() {
let step = OrchestratedFlowStep::new("test_step")
.script("let x = 1;")
.context_id("test_context")
.worker_id("test_worker")
.input("key1", "value1");
assert_eq!(step.name, "test_step");
assert_eq!(step.script, "let x = 1;");
assert_eq!(step.context_id, "test_context");
assert_eq!(step.worker_id, "test_worker");
assert_eq!(step.inputs.get("key1"), Some(&"value1".to_string()));
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_orchestrated_flow_step_builder() {
let step = OrchestratedFlowStep::new("test_step")
.script("let x = 1;")
.context_id("test_context")
.worker_id("test_worker")
.input("key1", "value1");
assert_eq!(step.flow_step.name, "test_step");
assert_eq!(step.script, "let x = 1;");
assert_eq!(step.context_id, "test_context");
assert_eq!(step.worker_id, "test_worker");
assert_eq!(step.inputs.get("key1"), Some(&"value1".to_string()));
}
}

View File

@@ -142,5 +142,4 @@ pub fn register_signature_requirement_rhai_module(engine: &mut Engine) {
);
engine.register_global_module(module.into());
println!("Successfully registered signature_requirement Rhai module.");
}

View File

@@ -6,10 +6,10 @@ description = "Central Rhai engine for heromodels"
[dependencies]
rhai = { version = "=1.21.0", features = ["std", "sync", "decimal", "internals"] }
heromodels = { path = "../../../db/heromodels", features = ["rhai"] }
heromodels_core = { path = "../../../db/heromodels_core" }
heromodels = { git = "https://git.ourworld.tf/herocode/db.git", features = ["rhai"] }
heromodels_core = { git = "https://git.ourworld.tf/herocode/db.git" }
chrono = "0.4"
heromodels-derive = { path = "../../../db/heromodels-derive" }
heromodels-derive = { git = "https://git.ourworld.tf/herocode/db.git" }
macros = { path = "../macros"}
derive = { path = "../derive"}
serde = { version = "1.0", features = ["derive"] }
@@ -18,6 +18,7 @@ reqwest = { version = "0.11", features = ["json"] }
tokio = { version = "1", features = ["full"] }
dotenv = "0.15"
rhai_dispatcher = { path = "../dispatcher" }
thiserror = "1.0"
[dev-dependencies]
tempfile = "3"

View File

@@ -3,7 +3,8 @@ use rhai::{Engine, EvalAltResult, Scope};
use std::fs;
use std::env;
fn main() -> Result<(), Box<EvalAltResult>> {
#[tokio::main]
async fn main() -> Result<(), Box<EvalAltResult>> {
// Load environment variables from .env file
dotenv::from_filename("examples/payment/.env").ok();

View File

@@ -20,11 +20,8 @@ print(`Product created: ${product.name}`);
// Create the product in Stripe (non-blocking)
print("🔄 Dispatching product creation to Stripe...");
try {
let product_result = product.create_async(STRIPE_API_KEY, "payment-example", "new_create_product_response", "new_create_product_error");
let product_result = product.create_async("payment-example", "payment-context", STRIPE_API_KEY);
print(`✅ Product creation dispatched: ${product_result}`);
// In non-blocking mode, we use a demo product ID for the rest of the example
let product_id = "prod_demo_example_id";
print("💡 Using demo product ID for remaining operations in non-blocking mode");
} catch(error) {
print(`❌ Failed to dispatch product creation: ${error}`);
print("This is expected with a demo API key. In production, use a valid Stripe secret key.");
@@ -40,7 +37,7 @@ let upfront_price = new_price()
.product(product_id)
.metadata("type", "upfront");
let upfront_result = upfront_price.create_async(STRIPE_API_KEY, "payment-example", "new_create_price_response", "new_create_price_error");
let upfront_result = upfront_price.create_async("payment-example", "payment-context", STRIPE_API_KEY);
print(`✅ Upfront Price creation dispatched: ${upfront_result}`);
let upfront_price_id = "price_demo_upfront_id";
@@ -52,7 +49,7 @@ let monthly_price = new_price()
.recurring("month")
.metadata("type", "monthly_subscription");
let monthly_result = monthly_price.create_async(STRIPE_API_KEY, "payment-example", "new_create_price_response", "new_create_price_error");
let monthly_result = monthly_price.create_async("payment-example", "payment-context", STRIPE_API_KEY);
print(`✅ Monthly Price creation dispatched: ${monthly_result}`);
let monthly_price_id = "price_demo_monthly_id";
@@ -65,7 +62,7 @@ let annual_price = new_price()
.metadata("type", "annual_subscription")
.metadata("discount", "2_months_free");
let annual_result = annual_price.create_async(STRIPE_API_KEY, "payment-example", "new_create_price_response", "new_create_price_error");
let annual_result = annual_price.create_async("payment-example", "payment-context", STRIPE_API_KEY);
print(`✅ Annual Price creation dispatched: ${annual_result}`);
let annual_price_id = "price_demo_annual_id";
@@ -78,7 +75,7 @@ let percent_coupon = new_coupon()
.metadata("campaign", "new_customer_discount")
.metadata("code", "WELCOME25");
let percent_result = percent_coupon.create_async(STRIPE_API_KEY, "payment-example", "new_create_coupon_response", "new_create_coupon_error");
let percent_result = percent_coupon.create_async("payment-example", "payment-context", STRIPE_API_KEY);
print(`✅ 25% Off Coupon creation dispatched: ${percent_result}`);
let percent_coupon_id = "coupon_demo_25percent_id";
@@ -90,7 +87,7 @@ let amount_coupon = new_coupon()
.metadata("campaign", "loyalty_program")
.metadata("code", "LOYAL5");
let amount_result = amount_coupon.create_async(STRIPE_API_KEY, "payment-example", "new_create_coupon_response", "new_create_coupon_error");
let amount_result = amount_coupon.create_async("payment-example", "payment-context", STRIPE_API_KEY);
print(`✅ $5 Off Coupon creation dispatched: ${amount_result}`);
let amount_coupon_id = "coupon_demo_5dollar_id";
@@ -108,7 +105,7 @@ let payment_intent = new_payment_intent()
.metadata("price_id", upfront_price_id)
.metadata("payment_type", "upfront");
let payment_result = payment_intent.create_async(STRIPE_API_KEY, "payment-example", "new_create_payment_intent_response", "new_create_payment_intent_error");
let payment_result = payment_intent.create_async("payment-example", "payment-context", STRIPE_API_KEY);
print(`✅ Payment Intent creation dispatched: ${payment_result}`);
let payment_intent_id = "pi_demo_payment_intent_id";
@@ -124,7 +121,7 @@ let subscription = new_subscription()
.metadata("trial", "14_days")
.metadata("source", "website_signup");
let subscription_result = subscription.create_async(STRIPE_API_KEY, "payment-example", "new_create_subscription_response", "new_create_subscription_error");
let subscription_result = subscription.create_async("payment-example", "payment-context", STRIPE_API_KEY);
print(`✅ Subscription creation dispatched: ${subscription_result}`);
let subscription_id = "sub_demo_subscription_id";
@@ -140,7 +137,7 @@ let multi_subscription = new_subscription()
.metadata("licenses", "5")
.metadata("addons", "premium_support");
let multi_result = multi_subscription.create_async(STRIPE_API_KEY, "payment-example", "new_create_subscription_response", "new_create_subscription_error");
let multi_result = multi_subscription.create_async("payment-example", "payment-context", STRIPE_API_KEY);
print(`✅ Multi-Item Subscription creation dispatched: ${multi_result}`);
let multi_subscription_id = "sub_demo_multi_subscription_id";
@@ -156,7 +153,7 @@ let discounted_payment = new_payment_intent()
.metadata("coupon_applied", percent_coupon_id)
.metadata("discount_percent", "25");
let discounted_result = discounted_payment.create_async(STRIPE_API_KEY, "payment-example", "new_create_payment_intent_response", "new_create_payment_intent_error");
let discounted_result = discounted_payment.create_async("payment-example", "payment-context", STRIPE_API_KEY);
print(`✅ Discounted Payment Intent creation dispatched: ${discounted_result}`);
let discounted_payment_id = "pi_demo_discounted_payment_id";

View File

@@ -147,6 +147,4 @@ pub fn register_access_rhai_module(engine: &mut Engine) {
);
engine.register_global_module(module.into());
println!("Successfully registered access Rhai module using export_module approach.");
}

View File

@@ -247,5 +247,4 @@ pub fn register_company_rhai_module(engine: &mut Engine) {
);
engine.register_global_module(module.into());
println!("Successfully registered company Rhai module.");
}

View File

@@ -10,5 +10,4 @@ pub fn register_biz_rhai_module(engine: &mut Engine) {
product::register_product_rhai_module(engine);
sale::register_sale_rhai_module(engine);
shareholder::register_shareholder_rhai_module(engine);
println!("Successfully registered biz Rhai module.");
}

View File

@@ -314,5 +314,4 @@ pub fn register_product_rhai_module(engine: &mut Engine) {
);
engine.register_global_module(product_module.into());
println!("Successfully registered product Rhai module.");
}

View File

@@ -310,5 +310,4 @@ pub fn register_sale_rhai_module(engine: &mut Engine) {
);
engine.register_global_module(sale_module.into());
println!("Successfully registered sale Rhai module.");
}

View File

@@ -166,5 +166,4 @@ pub fn register_shareholder_rhai_module(engine: &mut Engine) {
);
engine.register_global_module(shareholder_module.into());
println!("Successfully registered shareholder Rhai module.");
}

View File

@@ -245,5 +245,4 @@ pub fn register_calendar_rhai_module(engine: &mut Engine) {
engine.register_type_with_name::<RhaiAttendee>("Attendee");
engine.register_type_with_name::<RhaiEvent>("Event");
engine.register_global_module(module.into());
println!("Successfully registered calendar Rhai module.");
}

View File

@@ -153,5 +153,4 @@ pub fn register_circle_rhai_module(engine: &mut Engine) {
);
engine.register_global_module(module.into());
println!("Successfully registered circle Rhai module.");
}

View File

@@ -295,5 +295,4 @@ pub fn register_company_rhai_module(engine: &mut Engine) {
);
engine.register_global_module(module.into());
println!("Successfully registered company Rhai module.");
}

View File

@@ -230,5 +230,4 @@ pub fn register_contact_rhai_module(engine: &mut Engine) {
);
engine.register_global_module(module.into());
println!("Successfully registered contact Rhai module.");
}

View File

@@ -99,5 +99,4 @@ pub fn register_comment_rhai_module(engine: &mut Engine) {
);
engine.register_global_module(module.into());
println!("Successfully registered comment Rhai module.");
}

View File

@@ -156,5 +156,4 @@ pub fn register_account_rhai_module(engine: &mut Engine) {
);
engine.register_global_module(account_module.into());
println!("Successfully registered account Rhai module.");
}

View File

@@ -165,5 +165,4 @@ pub fn register_asset_rhai_module(engine: &mut Engine) {
);
engine.register_global_module(module.into());
println!("Successfully registered asset Rhai module.");
}

View File

@@ -526,5 +526,4 @@ pub fn register_marketplace_rhai_module(engine: &mut Engine) {
);
engine.register_global_module(listing_module.into());
println!("Successfully registered marketplace Rhai module.");
}

View File

@@ -48,7 +48,7 @@ pub mod company;
pub mod contact;
pub mod core;
pub mod finance;
pub mod flow;
// pub mod flow;
pub mod library;
pub mod object;
pub mod payment;
@@ -107,7 +107,7 @@ pub fn register_dsl_modules(engine: &mut Engine) {
contact::register_contact_rhai_module(engine);
core::register_core_rhai_module(engine);
finance::register_finance_rhai_modules(engine);
flow::register_flow_rhai_modules(engine);
// flow::register_flow_rhai_modules(engine);
library::register_library_rhai_module(engine);
object::register_object_fns(engine);
payment::register_payment_rhai_module(engine);

View File

@@ -1,6 +1,5 @@
use heromodels::db::hero::OurDB;
use heromodels::db::{Collection, Db};
use heromodels::models::object::object::object_rhai_dsl::generated_rhai_module;
use heromodels::models::object::Object;
use macros::{register_authorized_create_by_id_fn, register_authorized_get_by_id_fn};
use rhai::{exported_module, Engine, EvalAltResult, FuncRegistration, Module};
@@ -8,7 +7,6 @@ use std::sync::Arc;
pub fn register_object_fns(engine: &mut Engine) {
let mut module = Module::new();
module.merge(&exported_module!(generated_rhai_module));
register_authorized_get_by_id_fn!(
module: &mut module,

View File

@@ -981,5 +981,4 @@ pub fn register_payment_rhai_module(engine: &mut Engine) {
engine.register_type_with_name::<RhaiCoupon>("Coupon");
engine.register_global_module(module.into());
println!("Successfully registered payment Rhai module.");
}

View File

@@ -173,5 +173,4 @@ pub fn register_product_rhai_module(engine: &mut Engine) {
engine.register_type_with_name::<RhaiProductComponent>("ProductComponent");
engine.register_global_module(module.into());
println!("Successfully registered product Rhai module.");
}

View File

@@ -177,5 +177,4 @@ pub fn register_sale_rhai_module(engine: &mut Engine) {
engine.register_type_with_name::<RhaiSaleItem>("SaleItem");
engine.register_global_module(module.into());
println!("Successfully registered sale Rhai module.");
}

View File

@@ -109,5 +109,4 @@ pub fn register_shareholder_rhai_module(engine: &mut Engine) {
);
engine.register_global_module(module.into());
println!("Successfully registered shareholder Rhai module.");
}

17
src/flow/Cargo.toml Normal file
View File

@@ -0,0 +1,17 @@
[package]
name = "flow"
version = "0.1.0"
edition = "2021"
description = "Simple flow manager for Rhai scripts"
[dependencies]
rhai = { version = "=1.21.0", features = ["std", "sync"] }
rhai_dispatcher = { path = "../dispatcher" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1", features = ["full"] }
redis = { version = "0.23", features = ["tokio-comp"] }
uuid = { version = "1.0", features = ["v4"] }
[dev-dependencies]
tempfile = "3"

110
src/flow/README.md Normal file
View File

@@ -0,0 +1,110 @@
# Flow Manager
A simple, generic flow manager for Rhai scripts with builder pattern API and non-blocking execution.
## Features
- **Builder Pattern API**: Fluent interface for creating steps and flows
- **Non-blocking Execution**: Uses `tokio::spawn` for async step execution
- **Simple State Management**: Redis-based state tracking
- **Retry Logic**: Configurable timeouts and retry attempts
- **Mock API Support**: Built-in mock API for testing different scenarios
- **RhaiDispatcher Integration**: Seamless integration with existing Rhai execution system
## Quick Start
```rust
use flow::{new_step, new_flow, FlowExecutor};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create executor
let executor = FlowExecutor::new("redis://127.0.0.1/").await?;
// Build steps using fluent API
let step1 = new_step("stripe_config")
.script("stripe_config_script")
.timeout(5)
.retries(2)
.build();
let step2 = new_step("stripe_config_confirm")
.script("script that looks up stripe config confirmation in db")
.timeout(5)
.build();
let step3 = new_step("create_product")
.script("create_product_script")
.timeout(10)
.retries(1)
.build();
// Build flow using fluent API
let flow = new_flow("stripe_payment_request")
.add_step(step1)
.add_step(step2)
.add_step(step3)
.build();
// Execute flow (non-blocking)
let result = executor.execute_flow(flow).await?;
println!("Flow started: {}", result);
Ok(())
}
```
## Architecture
### Core Components
- **Types** (`types.rs`): Core data structures (Flow, Step, Status enums)
- **Builder** (`builder.rs`): Fluent API for constructing flows and steps
- **State** (`state.rs`): Simple Redis-based state management
- **Executor** (`executor.rs`): Non-blocking flow execution engine
- **Mock API** (`mock_api.rs`): Testing utilities for different response scenarios
### State Management
The system tracks minimal state:
**Flow State:**
- `flow_id: String` - unique identifier
- `status: FlowStatus` (Created, Running, Completed, Failed)
- `current_step: Option<String>` - currently executing step
- `completed_steps: Vec<String>` - list of finished steps
**Step State:**
- `step_id: String` - unique identifier
- `status: StepStatus` (Pending, Running, Completed, Failed)
- `attempt_count: u32` - for retry logic
- `output: Option<String>` - result from script execution
**Storage:**
- Redis key-value pairs: `flow:{flow_id}` and `step:{flow_id}:{step_id}`
## Examples
Run the example:
```bash
cd ../rhailib/src/flow
cargo run --example stripe_flow_example
```
## Testing
```bash
cargo test
```
Note: Some tests require Redis to be running. Set `SKIP_REDIS_TESTS=1` to skip Redis-dependent tests.
## Integration
The flow manager integrates with:
- **RhaiDispatcher**: For executing Rhai scripts
- **Redis**: For state persistence
- **tokio**: For non-blocking async execution
This provides a simple, reliable foundation for orchestrating complex workflows while maintaining the non-blocking execution pattern established in the payment system.

View File

@@ -0,0 +1,90 @@
//! Example demonstrating the flow manager with mock Stripe API calls
use flow::{new_step, new_flow, FlowExecutor};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("=== Flow Manager Example ===");
println!("Demonstrating the builder pattern API with mock Stripe workflow\n");
// Create the flow executor
let executor = FlowExecutor::new("redis://127.0.0.1/").await?;
// Build steps using the fluent API
let step1 = new_step("stripe_config")
.script("mock_api_call stripe_config")
.timeout(5)
.retries(2)
.build();
let step2 = new_step("stripe_config_confirm")
.script("mock_api_call create_product")
.timeout(5)
.retries(1)
.build();
let step3 = new_step("create_product")
.script("mock_api_call create_product")
.timeout(10)
.retries(1)
.build();
// Build flow using the fluent API
let flow = new_flow("stripe_payment_request")
.add_step(step1)
.add_step(step2)
.add_step(step3)
.build();
println!("Created flow: {}", flow.name);
println!("Flow ID: {}", flow.id);
println!("Number of steps: {}", flow.steps.len());
for (i, step) in flow.steps.iter().enumerate() {
println!(" Step {}: {} (timeout: {}s, retries: {})",
i + 1, step.name, step.timeout_seconds, step.max_retries);
}
// Execute the flow (non-blocking)
println!("\n🚀 Starting flow execution...");
let result = executor.execute_flow(flow.clone()).await?;
println!("{}", result);
// Monitor flow progress
println!("\n📊 Monitoring flow progress...");
for i in 0..10 {
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
if let Ok(Some(flow_state)) = executor.get_flow_status(&flow.id).await {
println!(" Status: {:?}, Current step: {:?}, Completed: {}/{}",
flow_state.status,
flow_state.current_step,
flow_state.completed_steps.len(),
flow.steps.len());
if matches!(flow_state.status, flow::FlowStatus::Completed | flow::FlowStatus::Failed) {
break;
}
}
}
// Check final status
if let Ok(Some(final_state)) = executor.get_flow_status(&flow.id).await {
println!("\n🎯 Final flow status: {:?}", final_state.status);
println!("Completed steps: {:?}", final_state.completed_steps);
// Check individual step results
for step in &flow.steps {
if let Ok(Some(step_state)) = executor.get_step_status(&flow.id, &step.id).await {
println!(" Step '{}': {:?} (attempts: {})",
step.name, step_state.status, step_state.attempt_count);
if let Some(output) = &step_state.output {
println!(" Output: {}", output);
}
}
}
}
println!("\n✨ Flow execution demonstration completed!");
Ok(())
}

108
src/flow/src/builder.rs Normal file
View File

@@ -0,0 +1,108 @@
//! Builder patterns for steps and flows
use crate::types::{Step, Flow};
/// Builder for creating steps with fluent API
pub struct StepBuilder {
step: Step,
}
impl StepBuilder {
pub fn new(name: &str) -> Self {
Self {
step: Step::new(name),
}
}
/// Set the script content for this step
pub fn script(mut self, script: &str) -> Self {
self.step.script = script.to_string();
self
}
/// Set timeout in seconds
pub fn timeout(mut self, seconds: u64) -> Self {
self.step.timeout_seconds = seconds;
self
}
/// Set maximum retry attempts
pub fn retries(mut self, count: u32) -> Self {
self.step.max_retries = count;
self
}
/// Build the final step
pub fn build(self) -> Step {
self.step
}
}
/// Builder for creating flows with fluent API
pub struct FlowBuilder {
flow: Flow,
}
impl FlowBuilder {
pub fn new(name: &str) -> Self {
Self {
flow: Flow::new(name),
}
}
/// Add a step to this flow
pub fn add_step(mut self, step: Step) -> Self {
self.flow.steps.push(step);
self
}
/// Build the final flow
pub fn build(self) -> Flow {
self.flow
}
}
/// Convenience function to create a new step builder
pub fn new_step(name: &str) -> StepBuilder {
StepBuilder::new(name)
}
/// Convenience function to create a new flow builder
pub fn new_flow(name: &str) -> FlowBuilder {
FlowBuilder::new(name)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_step_builder() {
let step = new_step("test_step")
.script("print('hello world');")
.timeout(10)
.retries(3)
.build();
assert_eq!(step.name, "test_step");
assert_eq!(step.script, "print('hello world');");
assert_eq!(step.timeout_seconds, 10);
assert_eq!(step.max_retries, 3);
}
#[test]
fn test_flow_builder() {
let step1 = new_step("step1").script("let x = 1;").build();
let step2 = new_step("step2").script("let y = 2;").build();
let flow = new_flow("test_flow")
.add_step(step1)
.add_step(step2)
.build();
assert_eq!(flow.name, "test_flow");
assert_eq!(flow.steps.len(), 2);
assert_eq!(flow.steps[0].name, "step1");
assert_eq!(flow.steps[1].name, "step2");
}
}

243
src/flow/src/executor.rs Normal file
View File

@@ -0,0 +1,243 @@
//! Simple flow executor with non-blocking step execution
use crate::types::{Flow, Step, FlowStatus, StepStatus};
use crate::state::{FlowState, StepState, StateManager};
use crate::mock_api::MockAPI;
use rhai_dispatcher::RhaiDispatcherBuilder;
use std::sync::Arc;
use tokio::time::{timeout, Duration};
/// Simple flow executor
pub struct FlowExecutor {
state_manager: Arc<StateManager>,
mock_api: Arc<MockAPI>,
redis_url: String,
}
impl FlowExecutor {
pub async fn new(redis_url: &str) -> Result<Self, Box<dyn std::error::Error>> {
let state_manager = Arc::new(StateManager::new(redis_url).await?);
let mock_api = Arc::new(MockAPI::default());
Ok(Self {
state_manager,
mock_api,
redis_url: redis_url.to_string(),
})
}
/// Execute a flow non-blocking
pub async fn execute_flow(&self, flow: Flow) -> Result<String, Box<dyn std::error::Error>> {
// Initialize flow state
let mut flow_state = FlowState::new(flow.id.clone());
flow_state.status = FlowStatus::Running;
self.state_manager.save_flow_state(&flow_state).await?;
// Initialize step states
for step in &flow.steps {
let step_state = StepState::new(step.id.clone());
self.state_manager.save_step_state(&flow.id, &step_state).await?;
}
// Spawn flow execution in background
let flow_id = flow.id.clone();
let state_manager = self.state_manager.clone();
let mock_api = self.mock_api.clone();
let redis_url = self.redis_url.clone();
tokio::spawn(async move {
if let Err(e) = Self::execute_flow_steps(flow, state_manager, mock_api, redis_url).await {
eprintln!("Flow execution error: {}", e);
}
});
Ok(format!("flow_execution_started:{}", flow_id))
}
/// Execute all steps in a flow
async fn execute_flow_steps(
flow: Flow,
state_manager: Arc<StateManager>,
mock_api: Arc<MockAPI>,
redis_url: String,
) -> Result<(), Box<dyn std::error::Error>> {
let mut flow_state = state_manager.load_flow_state(&flow.id).await?
.ok_or("Flow state not found")?;
// Execute steps sequentially
for step in &flow.steps {
flow_state.current_step = Some(step.id.clone());
state_manager.save_flow_state(&flow_state).await?;
match Self::execute_step_with_retries(
step,
&flow.id,
state_manager.clone(),
mock_api.clone(),
redis_url.clone(),
).await {
Ok(_) => {
flow_state.completed_steps.push(step.id.clone());
}
Err(e) => {
eprintln!("Step {} failed: {}", step.name, e);
flow_state.status = FlowStatus::Failed;
state_manager.save_flow_state(&flow_state).await?;
return Err(e);
}
}
}
// Mark flow as completed
flow_state.status = FlowStatus::Completed;
flow_state.current_step = None;
state_manager.save_flow_state(&flow_state).await?;
Ok(())
}
/// Execute a single step with retry logic
async fn execute_step_with_retries(
step: &Step,
flow_id: &str,
state_manager: Arc<StateManager>,
mock_api: Arc<MockAPI>,
redis_url: String,
) -> Result<(), Box<dyn std::error::Error>> {
let mut step_state = state_manager.load_step_state(flow_id, &step.id).await?
.ok_or("Step state not found")?;
let max_attempts = step.max_retries + 1;
for attempt in 0..max_attempts {
step_state.attempt_count = attempt + 1;
step_state.status = StepStatus::Running;
state_manager.save_step_state(flow_id, &step_state).await?;
match Self::execute_single_step(step, &mock_api, &redis_url).await {
Ok(output) => {
step_state.status = StepStatus::Completed;
step_state.output = Some(output);
state_manager.save_step_state(flow_id, &step_state).await?;
return Ok(());
}
Err(e) => {
if attempt + 1 >= max_attempts {
step_state.status = StepStatus::Failed;
state_manager.save_step_state(flow_id, &step_state).await?;
return Err(e);
}
// Wait before retry
tokio::time::sleep(Duration::from_millis(1000)).await;
}
}
}
Err("Max retries exceeded".into())
}
/// Execute a single step
async fn execute_single_step(
step: &Step,
mock_api: &MockAPI,
redis_url: &str,
) -> Result<String, Box<dyn std::error::Error>> {
// Execute with timeout
let result = timeout(step.timeout(), async {
// For demo, we'll use mock API calls instead of real Rhai execution
// In real implementation, this would execute the Rhai script
if step.script.contains("mock_api_call") {
// Extract endpoint from script (simple parsing)
let endpoint = if step.script.contains("stripe_config") {
"stripe_config"
} else if step.script.contains("create_product") {
"create_product"
} else {
"default_endpoint"
};
mock_api.call(endpoint).await
} else {
// For non-mock scripts, simulate Rhai execution via dispatcher
Self::execute_rhai_script(&step.script, redis_url).await
}
}).await;
match result {
Ok(Ok(output)) => Ok(output),
Ok(Err(e)) => Err(e.into()),
Err(_) => Err("Step execution timed out".into()),
}
}
/// Execute Rhai script using dispatcher (simplified)
async fn execute_rhai_script(
script: &str,
redis_url: &str,
) -> Result<String, Box<dyn std::error::Error>> {
let dispatcher = RhaiDispatcherBuilder::new()
.caller_id("flow_executor")
.redis_url(redis_url)
.build()?;
let result = dispatcher
.new_play_request()
.worker_id("flow_worker")
.script(script)
.timeout(Duration::from_secs(30))
.await_response()
.await;
match result {
Ok(task_details) => {
if task_details.status == "completed" {
Ok(task_details.output.unwrap_or_default())
} else {
Err(format!("Script execution failed: {:?}", task_details.error).into())
}
}
Err(e) => Err(format!("Dispatcher error: {}", e).into()),
}
}
/// Get flow status
pub async fn get_flow_status(&self, flow_id: &str) -> Result<Option<FlowState>, Box<dyn std::error::Error>> {
self.state_manager.load_flow_state(flow_id).await
}
/// Get step status
pub async fn get_step_status(&self, flow_id: &str, step_id: &str) -> Result<Option<StepState>, Box<dyn std::error::Error>> {
self.state_manager.load_step_state(flow_id, step_id).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::builder::{new_step, new_flow};
#[tokio::test]
async fn test_flow_execution() {
// This test requires Redis to be running
// Skip if Redis is not available
if std::env::var("SKIP_REDIS_TESTS").is_ok() {
return;
}
let executor = FlowExecutor::new("redis://127.0.0.1/").await.unwrap();
let step1 = new_step("test_step")
.script("mock_api_call stripe_config")
.timeout(5)
.retries(1)
.build();
let flow = new_flow("test_flow")
.add_step(step1)
.build();
let result = executor.execute_flow(flow).await;
assert!(result.is_ok());
assert!(result.unwrap().starts_with("flow_execution_started:"));
}
}

20
src/flow/src/lib.rs Normal file
View File

@@ -0,0 +1,20 @@
//! Simple Flow Manager for Rhai Scripts
//!
//! Provides a minimal flow execution system with builder patterns:
//! - `new_step("name").script("script").timeout(5).retries(2)`
//! - `new_flow("name").add_step(step1).add_step(step2)`
pub mod types;
pub mod builder;
pub mod executor;
pub mod state;
pub mod mock_api;
pub use types::{Flow, Step, FlowStatus, StepStatus};
pub use builder::{StepBuilder, FlowBuilder, new_step, new_flow};
pub use executor::FlowExecutor;
pub use state::{FlowState, StepState, StateManager};
pub use mock_api::MockAPI;
// Re-export for convenience
pub use rhai_dispatcher::RhaiDispatcherBuilder;

144
src/flow/src/mock_api.rs Normal file
View File

@@ -0,0 +1,144 @@
//! Simple mock API for testing different response types and durations
use serde::{Serialize, Deserialize};
use std::time::Duration;
use std::collections::HashMap;
/// Mock API response types
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MockResponseType {
Success,
Failure,
Timeout,
}
/// Mock API scenario configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MockScenario {
pub response_type: MockResponseType,
pub delay_ms: u64,
pub response_data: String,
}
impl MockScenario {
pub fn success(delay_ms: u64, data: &str) -> Self {
Self {
response_type: MockResponseType::Success,
delay_ms,
response_data: data.to_string(),
}
}
pub fn failure(delay_ms: u64, error: &str) -> Self {
Self {
response_type: MockResponseType::Failure,
delay_ms,
response_data: error.to_string(),
}
}
pub fn timeout(delay_ms: u64) -> Self {
Self {
response_type: MockResponseType::Timeout,
delay_ms,
response_data: "Request timed out".to_string(),
}
}
}
/// Simple mock API for testing
pub struct MockAPI {
scenarios: HashMap<String, MockScenario>,
}
impl MockAPI {
pub fn new() -> Self {
Self {
scenarios: HashMap::new(),
}
}
/// Add a mock scenario for an endpoint
pub fn add_scenario(&mut self, endpoint: &str, scenario: MockScenario) {
self.scenarios.insert(endpoint.to_string(), scenario);
}
/// Call a mock endpoint
pub async fn call(&self, endpoint: &str) -> Result<String, String> {
match self.scenarios.get(endpoint) {
Some(scenario) => {
// Simulate delay
tokio::time::sleep(Duration::from_millis(scenario.delay_ms)).await;
match scenario.response_type {
MockResponseType::Success => Ok(scenario.response_data.clone()),
MockResponseType::Failure => Err(scenario.response_data.clone()),
MockResponseType::Timeout => {
// For timeout, we just return an error after the delay
Err("Request timed out".to_string())
}
}
}
None => Err(format!("Unknown endpoint: {}", endpoint)),
}
}
/// Setup common test scenarios
pub fn setup_test_scenarios(&mut self) {
// Fast success
self.add_scenario("stripe_config", MockScenario::success(100, r#"{"status": "configured"}"#));
// Slow success
self.add_scenario("create_product", MockScenario::success(2000, r#"{"id": "prod_123", "name": "Test Product"}"#));
// Fast failure
self.add_scenario("invalid_endpoint", MockScenario::failure(50, "Invalid API key"));
// Timeout scenario
self.add_scenario("slow_endpoint", MockScenario::timeout(5000));
// Variable responses for testing retries
self.add_scenario("flaky_endpoint", MockScenario::failure(500, "Temporary server error"));
}
}
impl Default for MockAPI {
fn default() -> Self {
let mut api = Self::new();
api.setup_test_scenarios();
api
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_mock_api_success() {
let mut api = MockAPI::new();
api.add_scenario("test", MockScenario::success(10, "success"));
let result = api.call("test").await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), "success");
}
#[tokio::test]
async fn test_mock_api_failure() {
let mut api = MockAPI::new();
api.add_scenario("test", MockScenario::failure(10, "error"));
let result = api.call("test").await;
assert!(result.is_err());
assert_eq!(result.unwrap_err(), "error");
}
#[tokio::test]
async fn test_mock_api_unknown_endpoint() {
let api = MockAPI::new();
let result = api.call("unknown").await;
assert!(result.is_err());
assert!(result.unwrap_err().contains("Unknown endpoint"));
}
}

100
src/flow/src/state.rs Normal file
View File

@@ -0,0 +1,100 @@
//! Simple state management for flows and steps
use serde::{Serialize, Deserialize};
use crate::types::{FlowStatus, StepStatus};
/// Minimal flow state tracking
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlowState {
pub flow_id: String,
pub status: FlowStatus,
pub current_step: Option<String>,
pub completed_steps: Vec<String>,
}
impl FlowState {
pub fn new(flow_id: String) -> Self {
Self {
flow_id,
status: FlowStatus::Created,
current_step: None,
completed_steps: Vec::new(),
}
}
}
/// Minimal step state tracking
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StepState {
pub step_id: String,
pub status: StepStatus,
pub attempt_count: u32,
pub output: Option<String>,
}
impl StepState {
pub fn new(step_id: String) -> Self {
Self {
step_id,
status: StepStatus::Pending,
attempt_count: 0,
output: None,
}
}
}
/// Simple Redis-based state manager
pub struct StateManager {
redis_client: redis::Client,
}
impl StateManager {
pub async fn new(redis_url: &str) -> Result<Self, Box<dyn std::error::Error>> {
let client = redis::Client::open(redis_url)?;
Ok(Self {
redis_client: client,
})
}
/// Save flow state to Redis
pub async fn save_flow_state(&self, state: &FlowState) -> Result<(), Box<dyn std::error::Error>> {
let mut conn = self.redis_client.get_async_connection().await?;
let key = format!("flow:{}", state.flow_id);
let json = serde_json::to_string(state)?;
redis::cmd("SET").arg(&key).arg(&json).query_async(&mut conn).await?;
Ok(())
}
/// Load flow state from Redis
pub async fn load_flow_state(&self, flow_id: &str) -> Result<Option<FlowState>, Box<dyn std::error::Error>> {
let mut conn = self.redis_client.get_async_connection().await?;
let key = format!("flow:{}", flow_id);
let result: Option<String> = redis::cmd("GET").arg(&key).query_async(&mut conn).await?;
match result {
Some(json) => Ok(Some(serde_json::from_str(&json)?)),
None => Ok(None),
}
}
/// Save step state to Redis
pub async fn save_step_state(&self, flow_id: &str, state: &StepState) -> Result<(), Box<dyn std::error::Error>> {
let mut conn = self.redis_client.get_async_connection().await?;
let key = format!("step:{}:{}", flow_id, state.step_id);
let json = serde_json::to_string(state)?;
redis::cmd("SET").arg(&key).arg(&json).query_async(&mut conn).await?;
Ok(())
}
/// Load step state from Redis
pub async fn load_step_state(&self, flow_id: &str, step_id: &str) -> Result<Option<StepState>, Box<dyn std::error::Error>> {
let mut conn = self.redis_client.get_async_connection().await?;
let key = format!("step:{}:{}", flow_id, step_id);
let result: Option<String> = redis::cmd("GET").arg(&key).query_async(&mut conn).await?;
match result {
Some(json) => Ok(Some(serde_json::from_str(&json)?)),
None => Ok(None),
}
}
}

66
src/flow/src/types.rs Normal file
View File

@@ -0,0 +1,66 @@
//! Core types for the flow manager
use serde::{Serialize, Deserialize};
use std::time::Duration;
/// Simple flow status enumeration
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum FlowStatus {
Created,
Running,
Completed,
Failed,
}
/// Simple step status enumeration
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum StepStatus {
Pending,
Running,
Completed,
Failed,
}
/// A single step in a flow
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Step {
pub id: String,
pub name: String,
pub script: String,
pub timeout_seconds: u64,
pub max_retries: u32,
}
impl Step {
pub fn new(name: &str) -> Self {
Self {
id: uuid::Uuid::new_v4().to_string(),
name: name.to_string(),
script: String::new(),
timeout_seconds: 30, // default 30 seconds
max_retries: 0, // default no retries
}
}
pub fn timeout(&self) -> Duration {
Duration::from_secs(self.timeout_seconds)
}
}
/// A flow containing multiple steps
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Flow {
pub id: String,
pub name: String,
pub steps: Vec<Step>,
}
impl Flow {
pub fn new(name: &str) -> Self {
Self {
id: uuid::Uuid::new_v4().to_string(),
name: name.to_string(),
steps: Vec::new(),
}
}
}

View File

@@ -5,6 +5,6 @@ edition = "2024"
[dependencies]
rhai = { version = "1.21.0", features = ["std", "sync", "decimal", "internals"] }
heromodels = { path = "../../../db/heromodels" }
heromodels_core = { path = "../../../db/heromodels_core" }
heromodels = { git = "https://git.ourworld.tf/herocode/db.git" }
heromodels_core = { git = "https://git.ourworld.tf/herocode/db.git" }
serde = { version = "1.0", features = ["derive"] }

380
src/macros/_archive/lib.rs Normal file
View File

@@ -0,0 +1,380 @@
//! # Rhai Authorization Crate
//! This crate provides authorization mechanisms for Rhai functions, particularly those interacting with a database.
//! It includes helper functions for authorization checks and macros to simplify the registration
//! of authorized Rhai functions.
//! ## Features:
//! - `is_super_admin`: Checks if a caller (identified by a public key) is a super admin.
//! - `can_access_resource`: Checks if a caller has specific access rights to a resource, using a database connection.
//! - `get_caller_public_key`: Helper to extract `CALLER_ID` from the Rhai `NativeCallContext`.
//! - `id_from_i64_to_u32`: Helper to convert `i64` Rhai IDs to `u32` Rust IDs.
//! - `register_authorized_get_by_id_fn!`: Macro to register a Rhai function that retrieves a single item by ID, with authorization checks.
//! - `register_authorized_list_fn!`: Macro to register a Rhai function that lists multiple items, filtering them based on authorization.
//! ## Usage:
//! 1. Use the macros to register your Rhai functions, providing a database connection (`Arc<OurDB>`) and necessary type/name information.
//! 2. The macros internally use `can_access_resource` for authorization checks.
//! 3. Ensure `CALLER_ID` is set in the Rhai engine's scope before calling authorized functions.
use rhai::{EvalAltResult, Position};
use std::convert::TryFrom;
/// Extracts the `CALLER_ID` string constant from the Rhai `NativeCallContext`.
/// This key is used to identify the caller for authorization checks.
/// It first checks the current `Scope` and then falls back to the global constants cache.
///
/// # Arguments
/// * `context`: The Rhai `NativeCallContext` of the currently executing function.
///
/// Converts an `i64` (common Rhai integer type) to a `u32` (common Rust ID type).
///
/// # Arguments
/// * `id_i64`: The `i64` value to convert.
///
/// # Errors
/// Returns `Err(EvalAltResult::ErrorMismatchDataType)` if the `i64` value cannot be represented as a `u32`.
pub fn id_from_i64_to_u32(id_i64: i64) -> Result<u32, Box<EvalAltResult>> {
u32::try_from(id_i64).map_err(|_| {
Box::new(EvalAltResult::ErrorMismatchDataType(
"u32".to_string(),
format!("i64 value ({}) that cannot be represented as u32", id_i64),
Position::NONE,
))
})
}
/// Extracts the `CALLER_ID` string constant from the Rhai `NativeCallContext`'s tag.
/// This key is used to identify the caller for authorization checks.
/// Macro to register a Rhai function that retrieves a single resource by its ID, with authorization.
///
/// The macro handles:
/// - Argument parsing (ID).
/// - Caller identification via `CALLER_ID`.
/// - Authorization check using `AccessControlService::can_access_resource`.
/// - Database call to fetch the resource.
/// - Error handling for type mismatches, authorization failures, DB errors, and not found errors.
///
/// # Arguments
/// * `module`: Mutable reference to the Rhai `Module`.
/// * `db_clone`: Cloned `Arc<Db>` for database access.
/// * `acs_clone`: Cloned `Arc<AccessControlService>`.
/// * `rhai_fn_name`: String literal for the Rhai function name (e.g., "get_collection").
/// * `resource_type_str`: String literal for the resource type (e.g., "Collection"), used in authorization checks and error messages.
/// * `db_method_name`: Identifier for the database method to call (e.g., `get_by_id`).
/// * `id_arg_type`: Rust type of the ID argument in Rhai (e.g., `i64`).
/// * `id_rhai_type_name`: String literal for the Rhai type name of the ID (e.g., "i64"), for error messages.
/// * `id_conversion_fn`: Path to a function converting `id_arg_type` to `actual_id_type` (e.g., `id_from_i64_to_u32`).
/// * `actual_id_type`: Rust type of the ID used in the database (e.g., `u32`).
/// * `rhai_return_rust_type`: Rust type of the resource returned by the DB and Rhai function (e.g., `RhaiCollection`).
#[macro_export]
macro_rules! register_authorized_get_by_id_fn {
(
module: $module:expr,
rhai_fn_name: $rhai_fn_name:expr, // String literal for the Rhai function name (e.g., "get_collection")
resource_type_str: $resource_type_str:expr, // String literal for the resource type (e.g., "Collection")
rhai_return_rust_type: $rhai_return_rust_type:ty // Rust type of the resource returned (e.g., `RhaiCollection`)
) => {
FuncRegistration::new($rhai_fn_name).set_into_module(
$module,
move |context: rhai::NativeCallContext,
id_val: i64|
-> Result<$rhai_return_rust_type, Box<EvalAltResult>> {
let actual_id: u32 = $crate::id_from_i64_to_u32(id_val)?;
// Inlined logic to get caller public key
let tag_map = context
.tag()
.and_then(|tag| tag.read_lock::<rhai::Map>())
.ok_or_else(|| {
Box::new(EvalAltResult::ErrorRuntime(
"Context tag must be a Map.".into(),
context.position(),
))
})?;
let pk_dynamic = tag_map.get("CALLER_ID").ok_or_else(|| {
Box::new(EvalAltResult::ErrorRuntime(
"'CALLER_ID' not found in context tag Map.".into(),
context.position(),
))
})?;
let db_path = tag_map.get("DB_PATH").ok_or_else(|| {
Box::new(EvalAltResult::ErrorRuntime(
"'DB_PATH' not found in context tag Map.".into(),
context.position(),
))
})?;
let db_path = db_path.clone().into_string()?;
let circle_pk = tag_map.get("CONTEXT_ID").ok_or_else(|| {
Box::new(EvalAltResult::ErrorRuntime(
"'CONTEXT_ID' not found in context tag Map.".into(),
context.position(),
))
})?;
let circle_pk = circle_pk.clone().into_string()?;
let db_path = format!("{}/{}", db_path, circle_pk);
let db = Arc::new(OurDB::new(db_path, false).expect("Failed to create DB"));
let caller_pk_str = pk_dynamic.clone().into_string()?;
println!("Checking access for public key: {}", caller_pk_str);
if circle_pk != caller_pk_str {
// Use the standalone can_access_resource function from heromodels
let has_access = heromodels::models::access::access::can_access_resource(
db.clone(),
&caller_pk_str,
actual_id,
$resource_type_str,
);
if !has_access {
return Err(Box::new(EvalAltResult::ErrorRuntime(
format!("Access denied for public key: {}", caller_pk_str).into(),
context.position(),
)));
}
}
let result = db
.collection::<$rhai_return_rust_type>()
.unwrap()
.get_by_id(actual_id)
.map_err(|e| {
println!(
"Database error fetching {} with ID: {}",
$resource_type_str, actual_id
);
Box::new(EvalAltResult::ErrorRuntime(
format!("Database error fetching {}: {:?}", $resource_type_str, e)
.into(),
context.position(),
))
})?
.ok_or_else(|| {
Box::new(EvalAltResult::ErrorRuntime(
format!(
"Database error fetching {} with ID: {}",
$resource_type_str, actual_id
)
.into(),
context.position(),
))
})?;
Ok(result)
},
);
};
}
// Macro to register a Rhai function that retrieves a single resource by its ID, with authorization.
#[macro_export]
macro_rules! register_authorized_create_by_id_fn {
(
module: $module:expr,
rhai_fn_name: $rhai_fn_name:expr, // String literal for the Rhai function name (e.g., "get_collection")
resource_type_str: $resource_type_str:expr, // String literal for the resource type (e.g., "Collection")
rhai_return_rust_type: $rhai_return_rust_type:ty // Rust type of the resource returned (e.g., `RhaiCollection`)
) => {
FuncRegistration::new($rhai_fn_name).set_into_module(
$module,
move |context: rhai::NativeCallContext, object: $rhai_return_rust_type| -> Result<$rhai_return_rust_type, Box<EvalAltResult>> {
// Inlined logic to get caller public key
let tag_map = context
.tag()
.and_then(|tag| tag.read_lock::<rhai::Map>())
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("Context tag must be a Map.".into(), context.position())))?;
let pk_dynamic = tag_map.get("CALLER_ID")
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'CALLER_ID' not found in context tag Map.".into(), context.position())))?;
let db_path = tag_map.get("DB_PATH")
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'DB_PATH' not found in context tag Map.".into(), context.position())))?;
let db_path = db_path.clone().into_string()?;
let circle_pk = tag_map.get("CONTEXT_ID")
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'CONTEXT_ID' not found in context tag Map.".into(), context.position())))?;
let circle_pk = circle_pk.clone().into_string()?;
let db_path = format!("{}/{}", db_path, circle_pk);
let db = Arc::new(OurDB::new(db_path, false).expect("Failed to create DB"));
let caller_pk_str = pk_dynamic.clone().into_string()?;
if circle_pk != caller_pk_str {
let is_circle_member = heromodels::models::access::access::is_circle_member(
db.clone(),
&caller_pk_str,
);
if !is_circle_member {
// TODO: check if caller pk is member of circle
return Err(Box::new(EvalAltResult::ErrorRuntime(
format!("Insufficient authorization. Caller public key {} does not match circle public key {}", caller_pk_str, circle_pk).into(),
context.position(),
)));
}
}
let result = db.set(&object).map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("Database error creating {}: {:?}", $resource_type_str, e).into(),
context.position(),
))
})?;
Ok(result.1)
},
);
};
}
// Macro to register a Rhai function that retrieves a single resource by its ID, with authorization.
#[macro_export]
macro_rules! register_authorized_delete_by_id_fn {
(
module: $module:expr,
rhai_fn_name: $rhai_fn_name:expr, // String literal for the Rhai function name (e.g., "get_collection")
resource_type_str: $resource_type_str:expr, // String literal for the resource type (e.g., "Collection")
rhai_return_rust_type: $rhai_return_rust_type:ty // Rust type of the resource returned (e.g., `RhaiCollection`)
) => {
FuncRegistration::new($rhai_fn_name).set_into_module(
$module,
move |context: rhai::NativeCallContext, id_val: i64| -> Result<(), Box<EvalAltResult>> {
let actual_id: u32 = $crate::id_from_i64_to_u32(id_val)?;
// Inlined logic to get caller public key
let tag_map = context
.tag()
.and_then(|tag| tag.read_lock::<rhai::Map>())
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("Context tag must be a Map.".into(), context.position())))?;
let pk_dynamic = tag_map.get("CALLER_ID")
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'CALLER_ID' not found in context tag Map.".into(), context.position())))?;
let db_path = tag_map.get("DB_PATH")
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'DB_PATH' not found in context tag Map.".into(), context.position())))?;
let db_path = db_path.clone().into_string()?;
let circle_pk = tag_map.get("CONTEXT_ID")
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'CONTEXT_ID' not found in context tag Map.".into(), context.position())))?;
let circle_pk = circle_pk.clone().into_string()?;
let db_path = format!("{}/{}", db_path, circle_pk);
let db = Arc::new(OurDB::new(db_path, false).expect("Failed to create DB"));
let caller_pk_str = pk_dynamic.clone().into_string()?;
if circle_pk != caller_pk_str {
let is_circle_member = heromodels::models::access::access::is_circle_member(
db.clone(),
&caller_pk_str,
);
if !is_circle_member {
// TODO: check if caller pk is member of circle
return Err(Box::new(EvalAltResult::ErrorRuntime(
format!("Insufficient authorization. Caller public key {} does not match circle public key {}", caller_pk_str, circle_pk).into(),
context.position(),
)));
}
}
let result = db
.collection::<$rhai_return_rust_type>()
.unwrap()
.delete_by_id(actual_id)
.map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("Database error deleting {}: {:?}", $resource_type_str, e).into(),
context.position(),
))
})?;
Ok(())
},
);
};
}
/// Macro to register a Rhai function that lists all resources of a certain type, with authorization.
///
/// The macro handles:
/// - Caller identification via `CALLER_ID`.
/// - Fetching all items of a specific type from the database.
/// - Filtering the items based on the standalone `can_access_resource` function for each item.
/// - Wrapping the authorized items in a specified collection type (e.g., `RhaiCollectionArray`).
/// - Error handling for DB errors during fetch or authorization checks.
///
/// # Arguments
/// * `module`: Mutable reference to the Rhai `Module`.
/// * `rhai_fn_name`: String literal for the Rhai function name (e.g., "list_collections").
/// * `resource_type_str`: String literal for the resource type (e.g., "Collection"), used in authorization checks.
/// * `rhai_return_rust_type`: Rust type of the resource item (e.g., `RhaiCollection`).
/// * `item_id_accessor`: Identifier for the method on `rhai_return_rust_type` that returns its ID (e.g., `id`).
/// * `rhai_return_wrapper_type`: Rust type that wraps a `Vec` of `rhai_return_rust_type` for Rhai (e.g., `RhaiCollectionArray`).
#[macro_export]
macro_rules! register_authorized_list_fn {
(
module: $module:expr,
rhai_fn_name: $rhai_fn_name:expr,
resource_type_str: $resource_type_str:expr,
rhai_return_rust_type: $rhai_return_rust_type:ty,
rhai_return_wrapper_type: $rhai_return_wrapper_type:ty
) => {
FuncRegistration::new($rhai_fn_name).set_into_module(
$module,
move |context: rhai::NativeCallContext| -> Result<$rhai_return_wrapper_type, Box<EvalAltResult>> {
// Inlined logic to get caller public key
let tag_map = context
.tag()
.and_then(|tag| tag.read_lock::<rhai::Map>())
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("Context tag must be a Map.".into(), context.position())))?;
let pk_dynamic = tag_map.get("CALLER_ID")
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'CALLER_ID' not found in context tag Map.".into(), context.position())))?;
let caller_pk_str = pk_dynamic.clone().into_string()?;
let db_path = tag_map.get("DB_PATH")
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'DB_PATH' not found in context tag Map.".into(), context.position())))?;
let db_path = db_path.clone().into_string()?;
let circle_pk = tag_map.get("CONTEXT_ID")
.ok_or_else(|| Box::new(EvalAltResult::ErrorRuntime("'CONTEXT_ID' not found in context tag Map.".into(), context.position())))?;
let circle_pk = circle_pk.clone().into_string()?;
let db_path = format!("{}/{}", db_path, circle_pk);
let db = Arc::new(OurDB::new(db_path, false).expect("Failed to create DB"));
let all_items: Vec<$rhai_return_rust_type> = db
.collection::<$rhai_return_rust_type>()
.map_err(|e| Box::new(EvalAltResult::ErrorRuntime(format!("{:?}", e).into(), Position::NONE)))?
.get_all()
.map_err(|e| Box::new(EvalAltResult::ErrorRuntime(format!("{:?}", e).into(), Position::NONE)))?;
let authorized_items: Vec<$rhai_return_rust_type> = all_items
.into_iter()
.filter(|item| {
let resource_id = item.id();
heromodels::models::access::access::can_access_resource(
db.clone(),
&caller_pk_str,
resource_id,
$resource_type_str,
)
})
.collect();
Ok(authorized_items.into())
},
);
};
}

View File

@@ -208,20 +208,6 @@ macro_rules! register_authorized_create_by_id_fn {
let caller_pk_str = pk_dynamic.clone().into_string()?;
if circle_pk != caller_pk_str {
let is_circle_member = heromodels::models::access::access::is_circle_member(
db.clone(),
&caller_pk_str,
);
if !is_circle_member {
// TODO: check if caller pk is member of circle
return Err(Box::new(EvalAltResult::ErrorRuntime(
format!("Insufficient authorization. Caller public key {} does not match circle public key {}", caller_pk_str, circle_pk).into(),
context.position(),
)));
}
}
let result = db.set(&object).map_err(|e| {
Box::new(EvalAltResult::ErrorRuntime(
format!("Database error creating {}: {:?}", $resource_type_str, e).into(),
@@ -272,20 +258,6 @@ macro_rules! register_authorized_delete_by_id_fn {
let caller_pk_str = pk_dynamic.clone().into_string()?;
if circle_pk != caller_pk_str {
let is_circle_member = heromodels::models::access::access::is_circle_member(
db.clone(),
&caller_pk_str,
);
if !is_circle_member {
// TODO: check if caller pk is member of circle
return Err(Box::new(EvalAltResult::ErrorRuntime(
format!("Insufficient authorization. Caller public key {} does not match circle public key {}", caller_pk_str, circle_pk).into(),
context.position(),
)));
}
}
let result = db
.collection::<$rhai_return_rust_type>()
.unwrap()

View File

@@ -0,0 +1,51 @@
[package]
name = "orchestrator"
version = "0.1.0"
edition = "2021"
[dependencies]
# Core async runtime
tokio = { version = "1", features = ["macros", "rt-multi-thread", "sync", "time"] }
async-trait = "0.1"
futures = "0.3"
futures-util = "0.3"
# Serialization
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
# Error handling
thiserror = "1.0"
# Collections
uuid = { version = "1.6", features = ["v4", "serde"] }
# Time handling
chrono = { version = "0.4", features = ["serde"] }
# HTTP client
reqwest = { version = "0.11", features = ["json"] }
# WebSocket client
tokio-tungstenite = "0.20"
# Rhai scripting
rhai = "1.21.0"
# Database and models
heromodels = { path = "/Users/timurgordon/code/git.ourworld.tf/herocode/db/heromodels" }
heromodels_core = { path = "/Users/timurgordon/code/git.ourworld.tf/herocode/db/heromodels_core" }
# DSL integration for flow models
rhailib_dsl = { path = "../dsl" }
# Dispatcher integration
rhai_dispatcher = { path = "../dispatcher" }
# Logging
log = "0.4"
tracing = "0.1"
tracing-subscriber = "0.3"
[dev-dependencies]
tokio-test = "0.4"

320
src/orchestrator/README.md Normal file
View File

@@ -0,0 +1,320 @@
# Rationale for Orchestrator
We may have scripts that run asynchrounsly, depend on human input or depend on other scripts to complete. We want to be able to implement high-level workflows of rhai scripts.
## Design
Direct Acyclic Graphs (DAGs) are a natural fit for representing workflows.
## Requirements
1. Uses Direct Acyclic Graphs (DAGs) to represent workflows.
2. Each step in the workflow defines the script to execute, the inputs to pass to it, and the outputs to expect from it.
3. Simplicity: the output cases are binary (success or failure), and params inputted / outputted are simple key-value pairs.
4. Multiple steps can depend on the same step.
5. Scripts are executed using [RhaiDispatcher](../dispatcher/README.md).
## Architecture
The Orchestrator is a simple DAG-based workflow execution system that extends the heromodels flow structures to support workflows with dependencies and distributed script execution.
### Core Component
```mermaid
graph TB
subgraph "Orchestrator"
O[Orchestrator] --> RE[RhaiExecutor Trait]
O --> DB[(Database)]
end
subgraph "Executor Implementations"
RE --> RD[RhaiDispatcher]
RE --> WS[WebSocketClient]
RE --> HTTP[HttpClient]
RE --> LOCAL[LocalExecutor]
end
subgraph "Data Models (heromodels)"
F[Flow] --> FS[FlowStep]
FS --> SR[SignatureRequirement]
end
subgraph "Infrastructure"
RD --> RQ[Redis Queues]
RD --> W[Workers]
WS --> WSS[WebSocket Server]
HTTP --> API[REST API]
end
```
### Execution Abstraction
The orchestrator uses a trait-based approach for script execution, allowing different execution backends:
#### RhaiExecutor Trait
```rust
use rhai_dispatcher::{PlayRequestBuilder, RhaiTaskDetails, RhaiDispatcherError};
#[async_trait]
pub trait RhaiExecutor {
async fn call(&self, request: PlayRequestBuilder<'_>) -> Result<RhaiTaskDetails, RhaiDispatcherError>;
}
```
#### Executor Implementations
**RhaiDispatcher Implementation:**
```rust
pub struct DispatcherExecutor {
dispatcher: RhaiDispatcher,
}
#[async_trait]
impl RhaiExecutor for DispatcherExecutor {
async fn call(&self, request: PlayRequestBuilder<'_>) -> Result<RhaiTaskDetails, RhaiDispatcherError> {
// Use RhaiDispatcher to execute script via Redis queues
request.await_response().await
}
}
```
**WebSocket Client Implementation:**
```rust
pub struct WebSocketExecutor {
ws_client: WebSocketClient,
endpoint: String,
}
#[async_trait]
impl RhaiExecutor for WebSocketExecutor {
async fn call(&self, request: PlayRequestBuilder<'_>) -> Result<RhaiTaskDetails, RhaiDispatcherError> {
// Build the PlayRequest and send via WebSocket
let play_request = request.build()?;
// Send script execution request via WebSocket
let ws_message = serde_json::to_string(&play_request)?;
self.ws_client.send(ws_message).await?;
// Wait for response and convert to RhaiTaskDetails
let response = self.ws_client.receive().await?;
serde_json::from_str(&response).map_err(RhaiDispatcherError::from)
}
}
```
**HTTP Client Implementation:**
```rust
pub struct HttpExecutor {
http_client: reqwest::Client,
base_url: String,
}
#[async_trait]
impl RhaiExecutor for HttpExecutor {
async fn call(&self, request: PlayRequestBuilder<'_>) -> Result<RhaiTaskDetails, RhaiDispatcherError> {
// Build the PlayRequest and send via HTTP
let play_request = request.build()?;
// Send script execution request via HTTP API
let response = self.http_client
.post(&format!("{}/execute", self.base_url))
.json(&play_request)
.send()
.await?;
response.json().await.map_err(RhaiDispatcherError::from)
}
}
```
**Local Executor Implementation:**
```rust
pub struct LocalExecutor {
engine: Engine,
}
#[async_trait]
impl RhaiExecutor for LocalExecutor {
async fn call(&self, request: PlayRequestBuilder<'_>) -> Result<RhaiTaskDetails, RhaiDispatcherError> {
// Build the PlayRequest and execute locally
let play_request = request.build()?;
// Execute script directly in local Rhai engine
let result = self.engine.eval::<String>(&play_request.script);
// Convert to RhaiTaskDetails format
let task_details = RhaiTaskDetails {
task_id: play_request.id,
script: play_request.script,
status: if result.is_ok() { "completed".to_string() } else { "error".to_string() },
output: result.ok(),
error: result.err().map(|e| e.to_string()),
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
caller_id: "local".to_string(),
context_id: play_request.context_id,
worker_id: "local".to_string(),
};
Ok(task_details)
}
}
```
### Data Model Extensions
Simple extensions to the existing heromodels flow structures:
#### Enhanced FlowStep Model
```rust
// Extends heromodels::models::flow::FlowStep
pub struct FlowStep {
// ... existing heromodels::models::flow::FlowStep fields
pub script: String, // Rhai script to execute
pub depends_on: Vec<u32>, // IDs of steps this step depends on
pub context_id: String, // Execution context (circle)
pub inputs: HashMap<String, String>, // Input parameters
pub outputs: HashMap<String, String>, // Output results
}
```
### Execution Flow
```mermaid
sequenceDiagram
participant Client as Client
participant O as Orchestrator
participant RE as RhaiExecutor
participant DB as Database
Client->>O: Submit Flow
O->>DB: Store flow and steps
O->>O: Find steps with no dependencies
loop Until all steps complete
O->>RE: Execute ready steps
RE-->>O: Return results
O->>DB: Update step status
O->>O: Find newly ready steps
end
O->>Client: Flow completed
```
### Flexible Orchestrator Implementation
```rust
use rhai_dispatcher::{RhaiDispatcher, PlayRequestBuilder};
use std::collections::HashSet;
pub struct Orchestrator<E: RhaiExecutor> {
executor: E,
database: Arc<Database>,
}
impl<E: RhaiExecutor> Orchestrator<E> {
pub fn new(executor: E, database: Arc<Database>) -> Self {
Self { executor, database }
}
pub async fn execute_flow(&self, flow: Flow) -> Result<(), OrchestratorError> {
// 1. Store flow in database
self.database.collection::<Flow>()?.set(&flow)?;
// 2. Find steps with no dependencies (depends_on is empty)
let mut pending_steps: Vec<FlowStep> = flow.steps.clone();
let mut completed_steps: HashSet<u32> = HashSet::new();
while !pending_steps.is_empty() {
// Find ready steps (all dependencies completed)
let ready_steps: Vec<FlowStep> = pending_steps
.iter()
.filter(|step| {
step.depends_on.iter().all(|dep_id| completed_steps.contains(dep_id))
})
.cloned()
.collect();
if ready_steps.is_empty() {
return Err(OrchestratorError::NoReadySteps);
}
// Execute ready steps concurrently
let mut tasks = Vec::new();
for step in ready_steps {
let executor = &self.executor;
let task = async move {
// Create PlayRequestBuilder for this step
let request = RhaiDispatcher::new_play_request()
.script(&step.script)
.context_id(&step.context_id)
.worker_id(&step.worker_id);
// Execute via the trait
let result = executor.call(request).await?;
Ok((step.base_data.id, result))
};
tasks.push(task);
}
// Wait for all ready steps to complete
let results = futures::future::try_join_all(tasks).await?;
// Update step status and mark as completed
for (step_id, task_details) in results {
if task_details.status == "completed" {
completed_steps.insert(step_id);
// Update step status in database
// self.update_step_status(step_id, "completed", task_details.output).await?;
} else {
return Err(OrchestratorError::StepFailed(step_id, task_details.error));
}
}
// Remove completed steps from pending
pending_steps.retain(|step| !completed_steps.contains(&step.base_data.id));
}
Ok(())
}
pub async fn get_flow_status(&self, flow_id: u32) -> Result<FlowStatus, OrchestratorError> {
// Return current status of flow and all its steps
let flow = self.database.collection::<Flow>()?.get(flow_id)?;
// Implementation would check step statuses and return overall flow status
Ok(FlowStatus::Running) // Placeholder
}
}
pub enum OrchestratorError {
DatabaseError(String),
ExecutorError(RhaiDispatcherError),
NoReadySteps,
StepFailed(u32, Option<String>),
}
pub enum FlowStatus {
Pending,
Running,
Completed,
Failed,
}
// Usage examples:
// let orchestrator = Orchestrator::new(DispatcherExecutor::new(dispatcher), db);
// let orchestrator = Orchestrator::new(WebSocketExecutor::new(ws_client), db);
// let orchestrator = Orchestrator::new(HttpExecutor::new(http_client), db);
// let orchestrator = Orchestrator::new(LocalExecutor::new(engine), db);
```
### Key Features
1. **DAG Validation**: Ensures no circular dependencies exist in the `depends_on` relationships
2. **Parallel Execution**: Executes independent steps concurrently via multiple workers
3. **Simple Dependencies**: Each step lists the step IDs it depends on
4. **RhaiDispatcher Integration**: Uses existing dispatcher for script execution
5. **Binary Outcomes**: Steps either succeed or fail (keeping it simple as per requirements)
This simple architecture provides DAG-based workflow execution while leveraging the existing rhailib infrastructure and keeping complexity minimal.

View File

@@ -0,0 +1,283 @@
//! Basic workflow example demonstrating orchestrator usage
use orchestrator::{
interface::LocalInterface,
orchestrator::Orchestrator,
OrchestratedFlow, OrchestratedFlowStep, FlowStatus,
};
use std::sync::Arc;
use std::collections::HashMap;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize logging
tracing_subscriber::fmt().init();
// Create executor
let executor = Arc::new(LocalInterface::new());
// Create orchestrator
let orchestrator = Orchestrator::new(executor);
println!("🚀 Starting basic workflow example");
// Example 1: Simple sequential workflow
println!("\n📋 Example 1: Sequential Workflow");
let sequential_flow = create_sequential_workflow();
let flow_id = orchestrator.execute_flow(sequential_flow).await?;
// Wait for completion and show results
wait_and_show_results(&orchestrator, flow_id, "Sequential").await;
// Example 2: Parallel workflow with convergence
println!("\n📋 Example 2: Parallel Workflow");
let parallel_flow = create_parallel_workflow();
let flow_id = orchestrator.execute_flow(parallel_flow).await?;
// Wait for completion and show results
wait_and_show_results(&orchestrator, flow_id, "Parallel").await;
// Example 3: Complex workflow with multiple dependencies
println!("\n📋 Example 3: Complex Workflow");
let complex_flow = create_complex_workflow();
let flow_id = orchestrator.execute_flow(complex_flow).await?;
// Wait for completion and show results
wait_and_show_results(&orchestrator, flow_id, "Complex").await;
// Clean up completed flows
orchestrator.cleanup_completed_flows().await;
println!("\n✅ All examples completed successfully!");
Ok(())
}
/// Create a simple sequential workflow
fn create_sequential_workflow() -> OrchestratedFlow {
let step1 = OrchestratedFlowStep::new("data_preparation")
.script(r#"
let data = [1, 2, 3, 4, 5];
let sum = 0;
for item in data {
sum += item;
}
let result = sum;
"#)
.context_id("sequential_context")
.worker_id("worker_1");
let step2 = OrchestratedFlowStep::new("data_processing")
.script(r#"
let processed_data = dep_1_result * 2;
let result = processed_data;
"#)
.depends_on(step1.id())
.context_id("sequential_context")
.worker_id("worker_2");
let step3 = OrchestratedFlowStep::new("data_output")
.script(r#"
let final_result = "Processed value: " + dep_2_result;
let result = final_result;
"#)
.depends_on(step2.id())
.context_id("sequential_context")
.worker_id("worker_3");
OrchestratedFlow::new("sequential_workflow")
.add_step(step1)
.add_step(step2)
.add_step(step3)
}
/// Create a parallel workflow with convergence
fn create_parallel_workflow() -> OrchestratedFlow {
let step1 = OrchestratedFlowStep::new("fetch_user_data")
.script(r#"
let user_id = 12345;
let user_name = "Alice";
let result = user_name;
"#)
.context_id("parallel_context")
.worker_id("user_service");
let step2 = OrchestratedFlowStep::new("fetch_order_data")
.script(r#"
let order_id = 67890;
let order_total = 99.99;
let result = order_total;
"#)
.context_id("parallel_context")
.worker_id("order_service");
let step3 = OrchestratedFlowStep::new("fetch_inventory_data")
.script(r#"
let product_id = "ABC123";
let stock_count = 42;
let result = stock_count;
"#)
.context_id("parallel_context")
.worker_id("inventory_service");
let step4 = OrchestratedFlowStep::new("generate_report")
.script(r#"
let report = "User: " + dep_1_result +
", Order Total: $" + dep_2_result +
", Stock: " + dep_3_result + " units";
let result = report;
"#)
.depends_on(step1.id())
.depends_on(step2.id())
.depends_on(step3.id())
.context_id("parallel_context")
.worker_id("report_service");
OrchestratedFlow::new("parallel_workflow")
.add_step(step1)
.add_step(step2)
.add_step(step3)
.add_step(step4)
}
/// Create a complex workflow with multiple dependency levels
fn create_complex_workflow() -> OrchestratedFlow {
// Level 1: Initial data gathering
let step1 = OrchestratedFlowStep::new("load_config")
.script(r#"
let config = #{
api_url: "https://api.example.com",
timeout: 30,
retries: 3
};
let result = config.api_url;
"#)
.context_id("complex_context")
.worker_id("config_service");
let step2 = OrchestratedFlowStep::new("authenticate")
.script(r#"
let token = "auth_token_12345";
let expires_in = 3600;
let result = token;
"#)
.context_id("complex_context")
.worker_id("auth_service");
// Level 2: Data fetching (depends on config and auth)
let step3 = OrchestratedFlowStep::new("fetch_customers")
.script(r#"
let api_url = dep_1_result;
let auth_token = dep_2_result;
let customers = ["Customer A", "Customer B", "Customer C"];
let result = customers.len();
"#)
.depends_on(step1.id())
.depends_on(step2.id())
.context_id("complex_context")
.worker_id("customer_service");
let step4 = OrchestratedFlowStep::new("fetch_products")
.script(r#"
let api_url = dep_1_result;
let auth_token = dep_2_result;
let products = ["Product X", "Product Y", "Product Z"];
let result = products.len();
"#)
.depends_on(step1.id())
.depends_on(step2.id())
.context_id("complex_context")
.worker_id("product_service");
// Level 3: Data processing (depends on fetched data)
let step5 = OrchestratedFlowStep::new("calculate_metrics")
.script(r#"
let customer_count = dep_3_result;
let product_count = dep_4_result;
let ratio = customer_count / product_count;
let result = ratio;
"#)
.depends_on(step3.id())
.depends_on(step4.id())
.context_id("complex_context")
.worker_id("analytics_service");
// Level 4: Final reporting
let step6 = OrchestratedFlowStep::new("generate_dashboard")
.script(r#"
let customer_count = dep_3_result;
let product_count = dep_4_result;
let ratio = dep_5_result;
let dashboard = "Dashboard: " + customer_count + " customers, " +
product_count + " products, ratio: " + ratio;
let result = dashboard;
"#)
.depends_on(step3.id())
.depends_on(step4.id())
.depends_on(step5.id())
.context_id("complex_context")
.worker_id("dashboard_service");
OrchestratedFlow::new("complex_workflow")
.add_step(step1)
.add_step(step2)
.add_step(step3)
.add_step(step4)
.add_step(step5)
.add_step(step6)
}
/// Wait for flow completion and show results
async fn wait_and_show_results(
orchestrator: &Orchestrator<LocalInterface>,
flow_id: u32,
workflow_name: &str,
) {
println!(" ⏳ Executing {} workflow (ID: {})...", workflow_name, flow_id);
// Poll for completion
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
if let Some(execution) = orchestrator.get_flow_status(flow_id).await {
match execution.status {
FlowStatus::Completed => {
println!("{} workflow completed successfully!", workflow_name);
println!(" 📊 Executed {} steps in {:?}",
execution.completed_steps.len(),
execution.completed_at.unwrap() - execution.started_at);
// Show step results
for (step_id, outputs) in &execution.step_results {
if let Some(result) = outputs.get("result") {
let step_name = execution.flow.orchestrated_steps
.iter()
.find(|s| s.id() == *step_id)
.map(|s| s.flow_step.name.as_str())
.unwrap_or("unknown");
println!(" 📝 Step '{}': {}", step_name, result);
}
}
break;
}
FlowStatus::Failed => {
println!("{} workflow failed!", workflow_name);
if !execution.failed_steps.is_empty() {
println!(" 💥 Failed steps: {:?}", execution.failed_steps);
}
break;
}
FlowStatus::Running => {
print!(".");
std::io::Write::flush(&mut std::io::stdout()).unwrap();
}
FlowStatus::Pending => {
println!(" ⏸️ {} workflow is pending...", workflow_name);
}
}
} else {
println!("{} workflow not found!", workflow_name);
break;
}
}
}

View File

@@ -0,0 +1,61 @@
//! Dispatcher interface implementation using RhaiDispatcher
use crate::RhaiInterface;
use async_trait::async_trait;
use rhai_dispatcher::{PlayRequest, RhaiDispatcher, RhaiDispatcherError};
use std::sync::Arc;
/// Dispatcher-based interface using RhaiDispatcher
pub struct DispatcherInterface {
dispatcher: Arc<RhaiDispatcher>,
}
impl DispatcherInterface {
/// Create a new dispatcher interface
pub fn new(dispatcher: Arc<RhaiDispatcher>) -> Self {
Self { dispatcher }
}
}
#[async_trait]
impl RhaiInterface for DispatcherInterface {
async fn submit_play_request(&self, play_request: &PlayRequest) -> Result<(), RhaiDispatcherError> {
self.dispatcher.submit_play_request(play_request).await
}
async fn submit_play_request_and_await_result(&self, play_request: &PlayRequest) -> Result<String, RhaiDispatcherError> {
self.dispatcher.submit_play_request_and_await_result(play_request).await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_dispatcher_interface_creation() {
// This test just verifies we can create the interface
// Note: Actual testing would require a properly configured RhaiDispatcher
// For now, we'll create a mock or skip the actual dispatcher creation
// This is a placeholder test - adjust based on actual RhaiDispatcher constructor
// let dispatcher = Arc::new(RhaiDispatcher::new());
// let interface = DispatcherInterface::new(dispatcher);
// Just verify the test compiles for now
assert!(true);
}
#[tokio::test]
async fn test_dispatcher_interface_methods() {
// This test would verify the interface methods work correctly
// when a proper RhaiDispatcher is available
let play_request = PlayRequest {
script: "let x = 5; x + 3".to_string(),
};
// Placeholder assertions - would test actual functionality with real dispatcher
assert_eq!(play_request.script, "let x = 5; x + 3");
}
}

View File

@@ -0,0 +1,111 @@
//! Local interface implementation for in-process script execution
use crate::RhaiInterface;
use async_trait::async_trait;
use rhai_dispatcher::{PlayRequest, RhaiDispatcherError};
/// Local interface for in-process script execution
pub struct LocalInterface {
engine: rhai::Engine,
}
impl LocalInterface {
/// Create a new local interface
pub fn new() -> Self {
let engine = rhai::Engine::new();
Self { engine }
}
/// Create a new local interface with custom engine
pub fn with_engine(engine: rhai::Engine) -> Self {
Self { engine }
}
}
impl Default for LocalInterface {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl RhaiInterface for LocalInterface {
async fn submit_play_request(&self, _play_request: &PlayRequest) -> Result<(), RhaiDispatcherError> {
// For local interface, fire-and-forget doesn't make much sense
// We'll just execute and ignore the result
let _ = self.submit_play_request_and_await_result(_play_request).await?;
Ok(())
}
async fn submit_play_request_and_await_result(&self, play_request: &PlayRequest) -> Result<String, RhaiDispatcherError> {
let mut scope = rhai::Scope::new();
// Execute the script
let result = self
.engine
.eval_with_scope::<rhai::Dynamic>(&mut scope, &play_request.script)
.map_err(|e| RhaiDispatcherError::TaskNotFound(format!("Script execution error: {}", e)))?;
// Return the result as a string
if result.is_unit() {
Ok(String::new())
} else {
Ok(result.to_string())
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_local_interface_basic() {
let interface = LocalInterface::new();
let play_request = PlayRequest {
script: "let x = 5; x + 3".to_string(),
};
let result = interface.submit_play_request_and_await_result(&play_request).await;
assert!(result.is_ok());
let output = result.unwrap();
assert_eq!(output, "8");
}
#[tokio::test]
async fn test_local_interface_fire_and_forget() {
let interface = LocalInterface::new();
let play_request = PlayRequest {
script: "let x = 5; x + 3".to_string(),
};
let result = interface.submit_play_request(&play_request).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_local_interface_with_error() {
let interface = LocalInterface::new();
let play_request = PlayRequest {
script: "invalid_syntax +++".to_string(),
};
let result = interface.submit_play_request_and_await_result(&play_request).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_local_interface_empty_result() {
let interface = LocalInterface::new();
let play_request = PlayRequest {
script: "let x = 42;".to_string(),
};
let result = interface.submit_play_request_and_await_result(&play_request).await;
assert!(result.is_ok());
let output = result.unwrap();
assert_eq!(output, "");
}
}

View File

@@ -0,0 +1,9 @@
//! Interface implementations for different backends
pub mod local;
pub mod ws;
pub mod dispatcher;
pub use local::*;
pub use ws::*;
pub use dispatcher::*;

View File

@@ -0,0 +1,117 @@
//! WebSocket interface implementation for remote script execution
use crate::RhaiInterface;
use async_trait::async_trait;
use rhai_dispatcher::{PlayRequest, RhaiDispatcherError};
use reqwest::Client;
use serde_json::json;
/// WebSocket-based interface for remote script execution
pub struct WsInterface {
client: Client,
base_url: String,
}
impl WsInterface {
/// Create a new WebSocket interface
pub fn new(base_url: String) -> Self {
Self {
client: Client::new(),
base_url,
}
}
}
#[async_trait]
impl RhaiInterface for WsInterface {
async fn submit_play_request(&self, play_request: &PlayRequest) -> Result<(), RhaiDispatcherError> {
let payload = json!({
"script": play_request.script
});
let response = self
.client
.post(&format!("{}/submit", self.base_url))
.json(&payload)
.send()
.await
.map_err(|e| RhaiDispatcherError::TaskNotFound(format!("Network error: {}", e)))?;
if response.status().is_success() {
Ok(())
} else {
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
Err(RhaiDispatcherError::TaskNotFound(format!("HTTP error: {}", error_text)))
}
}
async fn submit_play_request_and_await_result(&self, play_request: &PlayRequest) -> Result<String, RhaiDispatcherError> {
let payload = json!({
"script": play_request.script
});
let response = self
.client
.post(&format!("{}/execute", self.base_url))
.json(&payload)
.send()
.await
.map_err(|e| RhaiDispatcherError::TaskNotFound(format!("Network error: {}", e)))?;
if response.status().is_success() {
let result: String = response
.text()
.await
.map_err(|e| RhaiDispatcherError::TaskNotFound(format!("Response parsing error: {}", e)))?;
Ok(result)
} else {
let error_text = response
.text()
.await
.unwrap_or_else(|_| "Unknown error".to_string());
Err(RhaiDispatcherError::TaskNotFound(format!("HTTP error: {}", error_text)))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ws_interface_creation() {
let interface = WsInterface::new("http://localhost:8080".to_string());
assert_eq!(interface.base_url, "http://localhost:8080");
}
#[tokio::test]
async fn test_ws_interface_call_with_mock_server() {
// This test would require a mock HTTP server
// For now, just test that we can create the interface
let interface = WsInterface::new("http://localhost:8080".to_string());
let play_request = PlayRequest {
script: "let x = 1;".to_string(),
};
// This will fail without a real server, but that's expected in unit tests
let result = interface.submit_play_request_and_await_result(&play_request).await;
assert!(result.is_err()); // Expected to fail without server
}
#[tokio::test]
async fn test_ws_interface_fire_and_forget() {
let interface = WsInterface::new("http://localhost:8080".to_string());
let play_request = PlayRequest {
script: "let x = 1;".to_string(),
};
// This will fail without a real server, but that's expected in unit tests
let result = interface.submit_play_request(&play_request).await;
assert!(result.is_err()); // Expected to fail without server
}
}

View File

@@ -0,0 +1,35 @@
//! # Orchestrator
//!
//! A simple DAG-based workflow execution system that extends the heromodels flow structures
//! to support workflows with dependencies and distributed script execution.
use async_trait::async_trait;
use rhai_dispatcher::{PlayRequest, RhaiDispatcherError};
pub mod interface;
pub mod orchestrator;
pub use interface::*;
pub use orchestrator::*;
/// Trait for executing Rhai scripts through different backends
/// Uses the same signature as RhaiDispatcher for consistency
#[async_trait]
pub trait RhaiInterface {
/// Submit a play request without waiting for result (fire-and-forget)
async fn submit_play_request(&self, play_request: &PlayRequest) -> Result<(), RhaiDispatcherError>;
/// Submit a play request and await the result
/// Returns just the output string on success
async fn submit_play_request_and_await_result(&self, play_request: &PlayRequest) -> Result<String, RhaiDispatcherError>;
}
// Re-export the flow models from DSL
pub use rhailib_dsl::flow::{OrchestratedFlow, OrchestratedFlowStep, OrchestratorError, FlowStatus};
// Conversion from RhaiDispatcherError to OrchestratorError
impl From<RhaiDispatcherError> for OrchestratorError {
fn from(err: RhaiDispatcherError) -> Self {
OrchestratorError::ExecutorError(err.to_string())
}
}

View File

@@ -0,0 +1,418 @@
//! Main orchestrator implementation for DAG-based workflow execution
use crate::{
OrchestratedFlow, OrchestratedFlowStep, OrchestratorError, FlowStatus, RhaiInterface,
};
use rhai_dispatcher::PlayRequest;
use futures::future::try_join_all;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, error, info, warn};
/// Main orchestrator for executing DAG-based workflows
pub struct Orchestrator<I: RhaiInterface> {
/// Interface for running scripts
interface: Arc<I>,
/// Active flow executions
active_flows: Arc<RwLock<HashMap<u32, FlowExecution>>>,
}
/// Represents an active flow execution
#[derive(Debug, Clone)]
pub struct FlowExecution {
/// The flow being executed
pub flow: OrchestratedFlow,
/// Current status
pub status: FlowStatus,
/// Completed step IDs
pub completed_steps: HashSet<u32>,
/// Failed step IDs
pub failed_steps: HashSet<u32>,
/// Step results
pub step_results: HashMap<u32, HashMap<String, String>>,
/// Execution start time
pub started_at: chrono::DateTime<chrono::Utc>,
/// Execution end time
pub completed_at: Option<chrono::DateTime<chrono::Utc>>,
}
impl FlowExecution {
/// Create a new flow execution
pub fn new(flow: OrchestratedFlow) -> Self {
Self {
flow,
status: FlowStatus::Pending,
completed_steps: HashSet::new(),
failed_steps: HashSet::new(),
step_results: HashMap::new(),
started_at: chrono::Utc::now(),
completed_at: None,
}
}
/// Check if a step is ready to execute (all dependencies completed)
pub fn is_step_ready(&self, step: &OrchestratedFlowStep) -> bool {
if self.completed_steps.contains(&step.id()) || self.failed_steps.contains(&step.id()) {
return false;
}
step.depends_on.iter().all(|dep_id| self.completed_steps.contains(dep_id))
}
/// Get all ready steps
pub fn get_ready_steps(&self) -> Vec<&OrchestratedFlowStep> {
self.flow
.orchestrated_steps
.iter()
.filter(|step| self.is_step_ready(step))
.collect()
}
/// Mark a step as completed
pub fn complete_step(&mut self, step_id: u32, outputs: HashMap<String, String>) {
self.completed_steps.insert(step_id);
self.step_results.insert(step_id, outputs);
// Check if flow is complete
if self.completed_steps.len() == self.flow.orchestrated_steps.len() {
self.status = FlowStatus::Completed;
self.completed_at = Some(chrono::Utc::now());
}
}
/// Mark a step as failed
pub fn fail_step(&mut self, step_id: u32) {
self.failed_steps.insert(step_id);
self.status = FlowStatus::Failed;
self.completed_at = Some(chrono::Utc::now());
}
/// Check if the flow execution is finished
pub fn is_finished(&self) -> bool {
matches!(self.status, FlowStatus::Completed | FlowStatus::Failed)
}
}
impl<I: RhaiInterface + Send + Sync + 'static> Orchestrator<I> {
/// Create a new orchestrator
pub fn new(interface: Arc<I>) -> Self {
Self {
interface,
active_flows: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Start executing a flow
pub async fn execute_flow(&self, flow: OrchestratedFlow) -> Result<u32, OrchestratorError> {
let flow_id = flow.id();
flow.validate_dag()?;
info!("Starting execution of flow {} with {} steps", flow_id, flow.orchestrated_steps.len());
// Create flow execution
let mut execution = FlowExecution::new(flow);
execution.status = FlowStatus::Running;
// Store the execution
{
let mut active_flows = self.active_flows.write().await;
active_flows.insert(flow_id, execution);
}
// Start execution in background
let orchestrator = self.clone();
tokio::spawn(async move {
if let Err(e) = orchestrator.execute_flow_steps(flow_id).await {
error!("Flow {} execution failed: {}", flow_id, e);
// Mark flow as failed
let mut active_flows = orchestrator.active_flows.write().await;
if let Some(execution) = active_flows.get_mut(&flow_id) {
execution.status = FlowStatus::Failed;
execution.completed_at = Some(chrono::Utc::now());
}
}
});
Ok(flow_id)
}
/// Execute flow steps using DAG traversal
async fn execute_flow_steps(&self, flow_id: u32) -> Result<(), OrchestratorError> {
loop {
let ready_steps = {
let active_flows = self.active_flows.read().await;
let execution = active_flows
.get(&flow_id)
.ok_or(OrchestratorError::StepNotFound(flow_id))?;
if execution.is_finished() {
info!("Flow {} execution completed with status: {:?}", flow_id, execution.status);
return Ok(());
}
execution.get_ready_steps().into_iter().cloned().collect::<Vec<_>>()
};
if ready_steps.is_empty() {
// Check if we're deadlocked
let active_flows = self.active_flows.read().await;
let execution = active_flows
.get(&flow_id)
.ok_or(OrchestratorError::StepNotFound(flow_id))?;
if !execution.is_finished() {
warn!("No ready steps found for flow {} - possible deadlock", flow_id);
return Err(OrchestratorError::NoReadySteps);
}
return Ok(());
}
debug!("Executing {} ready steps for flow {}", ready_steps.len(), flow_id);
// Execute ready steps concurrently
let step_futures = ready_steps.into_iter().map(|step| {
let orchestrator = self.clone();
async move {
orchestrator.execute_step(flow_id, step).await
}
});
// Wait for all steps to complete
let results = try_join_all(step_futures).await?;
// Update execution state
{
let mut active_flows = self.active_flows.write().await;
let execution = active_flows
.get_mut(&flow_id)
.ok_or(OrchestratorError::StepNotFound(flow_id))?;
for (step_id, outputs) in results {
execution.complete_step(step_id, outputs);
}
}
// Small delay to prevent tight loop
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
}
/// Execute a single step
async fn execute_step(
&self,
flow_id: u32,
step: OrchestratedFlowStep,
) -> Result<(u32, HashMap<String, String>), OrchestratorError> {
let step_id = step.id();
info!("Executing step {} for flow {}", step_id, flow_id);
// Prepare inputs with dependency outputs
let mut inputs = step.inputs.clone();
// Add outputs from dependency steps
{
let active_flows = self.active_flows.read().await;
let execution = active_flows
.get(&flow_id)
.ok_or(OrchestratorError::StepNotFound(flow_id))?;
for dep_id in &step.depends_on {
if let Some(dep_outputs) = execution.step_results.get(dep_id) {
for (key, value) in dep_outputs {
inputs.insert(format!("dep_{}_{}", dep_id, key), value.clone());
}
}
}
}
// Create play request
let play_request = PlayRequest {
id: format!("{}_{}", flow_id, step_id),
worker_id: step.worker_id.clone(),
context_id: step.context_id.clone(),
script: step.script.clone(),
timeout: std::time::Duration::from_secs(30), // Default timeout
};
// Execute the script
match self.interface.submit_play_request_and_await_result(&play_request).await {
Ok(output) => {
info!("Step {} completed successfully", step_id);
let mut outputs = HashMap::new();
outputs.insert("result".to_string(), output);
Ok((step_id, outputs))
}
Err(e) => {
error!("Step {} failed: {}", step_id, e);
// Mark step as failed
{
let mut active_flows = self.active_flows.write().await;
if let Some(execution) = active_flows.get_mut(&flow_id) {
execution.fail_step(step_id);
}
}
Err(OrchestratorError::StepFailed(step_id, Some(e.to_string())))
}
}
}
/// Get the status of a flow execution
pub async fn get_flow_status(&self, flow_id: u32) -> Option<FlowExecution> {
let active_flows = self.active_flows.read().await;
active_flows.get(&flow_id).cloned()
}
/// Cancel a flow execution
pub async fn cancel_flow(&self, flow_id: u32) -> Result<(), OrchestratorError> {
let mut active_flows = self.active_flows.write().await;
if let Some(execution) = active_flows.get_mut(&flow_id) {
execution.status = FlowStatus::Failed;
execution.completed_at = Some(chrono::Utc::now());
info!("Flow {} cancelled", flow_id);
Ok(())
} else {
Err(OrchestratorError::StepNotFound(flow_id))
}
}
/// List all active flows
pub async fn list_active_flows(&self) -> Vec<(u32, FlowStatus)> {
let active_flows = self.active_flows.read().await;
active_flows
.iter()
.map(|(id, execution)| (*id, execution.status.clone()))
.collect()
}
/// Clean up completed flows
pub async fn cleanup_completed_flows(&self) {
let mut active_flows = self.active_flows.write().await;
active_flows.retain(|_, execution| !execution.is_finished());
}
}
impl<I: RhaiInterface + Send + Sync> Clone for Orchestrator<I> {
fn clone(&self) -> Self {
Self {
interface: self.interface.clone(),
active_flows: self.active_flows.clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::interface::LocalInterface;
use std::collections::HashMap;
#[tokio::test]
async fn test_simple_flow_execution() {
let interface = Arc::new(LocalInterface::new());
let orchestrator = Orchestrator::new(interface);
// Create a simple flow with two steps
let step1 = OrchestratedFlowStep::new("step1")
.script("let result = 10;")
.context_id("test")
.worker_id("worker1");
let step2 = OrchestratedFlowStep::new("step2")
.script("let result = dep_1_result + 5;")
.depends_on(step1.id())
.context_id("test")
.worker_id("worker1");
let flow = OrchestratedFlow::new("test_flow")
.add_step(step1)
.add_step(step2);
// Execute the flow
let flow_id = orchestrator.execute_flow(flow).await.unwrap();
// Wait for completion
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let status = orchestrator.get_flow_status(flow_id).await.unwrap();
assert_eq!(status.status, FlowStatus::Completed);
assert_eq!(status.completed_steps.len(), 2);
}
#[tokio::test]
async fn test_parallel_execution() {
let interface = Arc::new(LocalInterface::new());
let orchestrator = Orchestrator::new(interface);
// Create a flow with parallel steps
let step1 = OrchestratedFlowStep::new("step1")
.script("let result = 10;")
.context_id("test")
.worker_id("worker1");
let step2 = OrchestratedFlowStep::new("step2")
.script("let result = 20;")
.context_id("test")
.worker_id("worker2");
let step3 = OrchestratedFlowStep::new("step3")
.script("let result = dep_1_result + dep_2_result;")
.depends_on(step1.id())
.depends_on(step2.id())
.context_id("test")
.worker_id("worker3");
let flow = OrchestratedFlow::new("parallel_flow")
.add_step(step1)
.add_step(step2)
.add_step(step3);
// Execute the flow
let flow_id = orchestrator.execute_flow(flow).await.unwrap();
// Wait for completion
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let status = orchestrator.get_flow_status(flow_id).await.unwrap();
assert_eq!(status.status, FlowStatus::Completed);
assert_eq!(status.completed_steps.len(), 3);
}
#[test]
fn test_flow_execution_state() {
let step1 = OrchestratedFlowStep::new("step1").script("let x = 1;");
let step2 = OrchestratedFlowStep::new("step2")
.script("let y = 2;")
.depends_on(step1.id());
let flow = OrchestratedFlow::new("test_flow")
.add_step(step1.clone())
.add_step(step2.clone());
let mut execution = FlowExecution::new(flow);
// Initially, only step1 should be ready
assert!(execution.is_step_ready(&step1));
assert!(!execution.is_step_ready(&step2));
// After completing step1, step2 should be ready
execution.complete_step(step1.id(), HashMap::new());
assert!(!execution.is_step_ready(&step1)); // Already completed
assert!(execution.is_step_ready(&step2));
// After completing step2, flow should be complete
execution.complete_step(step2.id(), HashMap::new());
assert_eq!(execution.status, FlowStatus::Completed);
}
}

View File

@@ -0,0 +1,42 @@
//! Main orchestrator implementation for DAG-based workflow execution
use crate::{
OrchestratedFlow, OrchestratedFlowStep, OrchestratorError, FlowStatus, RhaiInterface, ScriptRequest,
};
use futures::future::try_join_all;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, error, info, warn};
impl<I: RhaiInterface + Send + Sync + 'static> Orchestrator<I> {
/// Get a flow by ID
pub fn get_flow(&self, flow_id: u32) -> Result<OrchestratedFlow, OrchestratorError> {
self.interface
.new_play_request()
.script(format!("json_encode(get_flow({}))", flow_id))
.submit_play_request_and_await_result()
.await
.map(|result| serde_json::from_str(&result).unwrap())
}
pub fn get_flows(&self) -> Result<Vec<OrchestratedFlow>, OrchestratorError> {
self.interface
.new_play_request()
.script("json_encode(get_flows())")
.submit_play_request_and_await_result()
.await
.map(|result| serde_json::from_str(&result).unwrap())
}
pub fn get_active_flows(&self) -> Result<Vec<OrchestratedFlow>, OrchestratorError> {
self.interface
.new_play_request()
.script("json_encode(get_flows())")
.submit_play_request_and_await_result()
.await
.map(|result| serde_json::from_str(&result).unwrap())
}
}