From 629d59f7db05debecfc3b906a0158220f7ffcb23 Mon Sep 17 00:00:00 2001 From: Timur Gordon <31495328+timurgordon@users.noreply.github.com> Date: Tue, 9 Sep 2025 15:42:20 +0200 Subject: [PATCH] merge runners into single project --- Cargo.lock | 552 +---------------------------- Cargo.toml | 22 +- cmd/async_actor.rs | 71 ++++ cmd/async_actor_tui.rs | 149 ++++++++ cmd/sync_actor.rs | 70 ++++ cmd/sync_actor_tui.rs | 153 ++++++++ src/async_runner.rs | 270 ++++++++++++++ src/bin/runner_osis/README.md | 118 ++++++ src/bin/runner_osis/engine.rs | 123 +++++++ src/bin/runner_osis/main.rs | 79 +++++ src/bin/runner_sal/README.md | 87 +++++ src/{ => bin/runner_sal}/engine.rs | 24 +- src/bin/runner_sal/main.rs | 81 +++++ src/engine/mod.rs | 19 + src/engine/osis.rs | 143 ++++++++ src/engine/system.rs | 124 +++++++ src/lib.rs | 404 ++++----------------- src/runner_trait.rs | 267 ++++++++++++++ src/sync_runner.rs | 124 +++++++ test_sync_runner.rs | 47 +++ 20 files changed, 2033 insertions(+), 894 deletions(-) create mode 100644 cmd/async_actor.rs create mode 100644 cmd/async_actor_tui.rs create mode 100644 cmd/sync_actor.rs create mode 100644 cmd/sync_actor_tui.rs create mode 100644 src/async_runner.rs create mode 100644 src/bin/runner_osis/README.md create mode 100644 src/bin/runner_osis/engine.rs create mode 100644 src/bin/runner_osis/main.rs create mode 100644 src/bin/runner_sal/README.md rename src/{ => bin/runner_sal}/engine.rs (88%) create mode 100644 src/bin/runner_sal/main.rs create mode 100644 src/engine/mod.rs create mode 100644 src/engine/osis.rs create mode 100644 src/engine/system.rs create mode 100644 src/runner_trait.rs create mode 100644 src/sync_runner.rs create mode 100644 test_sync_runner.rs diff --git a/Cargo.lock b/Cargo.lock index 40e8066..a81c407 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,16 +8,17 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", - "baobab_actor", "chrono", "clap", + "crossterm", "env_logger", - "hero_job", + "hero-job", "hero_logger", "heromodels", "heromodels-derive", "heromodels_core", "log", + "ratatui", "redis 0.25.4", "rhai", "rhailib_dsl", @@ -40,6 +41,7 @@ dependencies = [ "thiserror 1.0.69", "tokio", "toml", + "tracing", "uuid", ] @@ -234,34 +236,6 @@ dependencies = [ "windows-targets 0.52.6", ] -[[package]] -name = "baobab_actor" -version = "0.1.0" -source = "git+https://git.ourworld.tf/herocode/baobab.git#337ec2f660dd7d2e888a43520b7c29a7654b50f6" -dependencies = [ - "anyhow", - "async-trait", - "chrono", - "clap", - "crossterm", - "env_logger", - "hero_job", - "hero_supervisor", - "heromodels", - "heromodels-derive", - "heromodels_core", - "log", - "ratatui", - "redis 0.25.4", - "rhai", - "serde", - "serde_json", - "thiserror 1.0.69", - "tokio", - "toml", - "uuid", -] - [[package]] name = "base16ct" version = "0.2.0" @@ -379,12 +353,6 @@ dependencies = [ "shlex", ] -[[package]] -name = "cesu8" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" - [[package]] name = "cfg-if" version = "1.0.1" @@ -879,12 +847,6 @@ dependencies = [ "syn 2.0.104", ] -[[package]] -name = "doctest-file" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aac81fa3e28d21450aa4d2ac065992ba96a1d7303efbce51a95f4fd175b67562" - [[package]] name = "dotenv" version = "0.15.0" @@ -1127,10 +1089,6 @@ name = "futures-timer" version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" -dependencies = [ - "gloo-timers", - "send_wrapper", -] [[package]] name = "futures-util" @@ -1224,52 +1182,6 @@ dependencies = [ "walkdir", ] -[[package]] -name = "gloo-net" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06f627b1a58ca3d42b45d6104bf1e1a03799df472df00988b6ba21accc10580" -dependencies = [ - "futures-channel", - "futures-core", - "futures-sink", - "gloo-utils", - "http 1.3.1", - "js-sys", - "pin-project", - "serde", - "serde_json", - "thiserror 1.0.69", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", -] - -[[package]] -name = "gloo-timers" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" -dependencies = [ - "futures-channel", - "futures-core", - "js-sys", - "wasm-bindgen", -] - -[[package]] -name = "gloo-utils" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b5555354113b18c547c1d3a98fbf7fb32a9ff4f6fa112ce823a21641a0ba3aa" -dependencies = [ - "js-sys", - "serde", - "serde_json", - "wasm-bindgen", - "web-sys", -] - [[package]] name = "governor" version = "0.6.3" @@ -1393,9 +1305,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" [[package]] -name = "hero_job" +name = "hero-job" version = "0.1.0" -source = "git+https://git.ourworld.tf/herocode/baobab.git#337ec2f660dd7d2e888a43520b7c29a7654b50f6" dependencies = [ "chrono", "log", @@ -1403,7 +1314,6 @@ dependencies = [ "serde", "serde_json", "thiserror 1.0.69", - "tokio", "uuid", ] @@ -1424,29 +1334,6 @@ dependencies = [ "tracing-subscriber", ] -[[package]] -name = "hero_supervisor" -version = "0.1.0" -source = "git+https://git.ourworld.tf/herocode/baobab.git#337ec2f660dd7d2e888a43520b7c29a7654b50f6" -dependencies = [ - "anyhow", - "chrono", - "clap", - "colored", - "crossterm", - "env_logger", - "hero_job", - "log", - "ratatui", - "redis 0.25.4", - "serde", - "serde_json", - "tokio", - "toml", - "uuid", - "zinit-client 0.1.0", -] - [[package]] name = "heromodels" version = "0.1.0" @@ -1636,7 +1523,6 @@ dependencies = [ "http 1.3.1", "http-body 1.0.1", "httparse", - "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -1950,21 +1836,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "interprocess" -version = "2.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d941b405bd2322993887859a8ee6ac9134945a24ec5ec763a8a962fc64dfec2d" -dependencies = [ - "doctest-file", - "futures-core", - "libc", - "recvmsg", - "tokio", - "widestring", - "windows-sys 0.52.0", -] - [[package]] name = "io-uring" version = "0.7.9" @@ -2065,28 +1936,6 @@ dependencies = [ "jiff-tzdb", ] -[[package]] -name = "jni" -version = "0.21.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a87aa2bb7d2af34197c04845522473242e1aa17c12f4935d5856491a7fb8c97" -dependencies = [ - "cesu8", - "cfg-if", - "combine", - "jni-sys", - "log", - "thiserror 1.0.69", - "walkdir", - "windows-sys 0.45.0", -] - -[[package]] -name = "jni-sys" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" - [[package]] name = "js-sys" version = "0.3.77" @@ -2132,178 +1981,6 @@ dependencies = [ "thiserror 1.0.69", ] -[[package]] -name = "jsonrpsee" -version = "0.25.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fba77a59c4c644fd48732367624d1bcf6f409f9c9a286fbc71d2f1fc0b2ea16" -dependencies = [ - "jsonrpsee-client-transport", - "jsonrpsee-core", - "jsonrpsee-http-client", - "jsonrpsee-proc-macros", - "jsonrpsee-server", - "jsonrpsee-types", - "jsonrpsee-wasm-client", - "jsonrpsee-ws-client", - "tokio", - "tracing", -] - -[[package]] -name = "jsonrpsee-client-transport" -version = "0.25.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2a320a3f1464e4094f780c4d48413acd786ce5627aaaecfac9e9c7431d13ae1" -dependencies = [ - "base64 0.22.1", - "futures-channel", - "futures-util", - "gloo-net", - "http 1.3.1", - "jsonrpsee-core", - "pin-project", - "rustls", - "rustls-pki-types", - "rustls-platform-verifier", - "soketto", - "thiserror 2.0.12", - "tokio", - "tokio-rustls", - "tokio-util", - "tracing", - "url", -] - -[[package]] -name = "jsonrpsee-core" -version = "0.25.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "693c93cbb7db25f4108ed121304b671a36002c2db67dff2ee4391a688c738547" -dependencies = [ - "async-trait", - "bytes", - "futures-timer", - "futures-util", - "http 1.3.1", - "http-body 1.0.1", - "http-body-util", - "jsonrpsee-types", - "parking_lot", - "pin-project", - "rand 0.9.2", - "rustc-hash", - "serde", - "serde_json", - "thiserror 2.0.12", - "tokio", - "tokio-stream", - "tower 0.5.2", - "tracing", - "wasm-bindgen-futures", -] - -[[package]] -name = "jsonrpsee-http-client" -version = "0.25.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6962d2bd295f75e97dd328891e58fce166894b974c1f7ce2e7597f02eeceb791" -dependencies = [ - "base64 0.22.1", - "http-body 1.0.1", - "hyper 1.6.0", - "hyper-rustls", - "hyper-util", - "jsonrpsee-core", - "jsonrpsee-types", - "rustls", - "rustls-platform-verifier", - "serde", - "serde_json", - "thiserror 2.0.12", - "tokio", - "tower 0.5.2", - "url", -] - -[[package]] -name = "jsonrpsee-proc-macros" -version = "0.25.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fa4f5daed39f982a1bb9d15449a28347490ad42b212f8eaa2a2a344a0dce9e9" -dependencies = [ - "heck", - "proc-macro-crate", - "proc-macro2", - "quote", - "syn 2.0.104", -] - -[[package]] -name = "jsonrpsee-server" -version = "0.25.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d38b0bcf407ac68d241f90e2d46041e6a06988f97fe1721fb80b91c42584fae6" -dependencies = [ - "futures-util", - "http 1.3.1", - "http-body 1.0.1", - "http-body-util", - "hyper 1.6.0", - "hyper-util", - "jsonrpsee-core", - "jsonrpsee-types", - "pin-project", - "route-recognizer", - "serde", - "serde_json", - "soketto", - "thiserror 2.0.12", - "tokio", - "tokio-stream", - "tokio-util", - "tower 0.5.2", - "tracing", -] - -[[package]] -name = "jsonrpsee-types" -version = "0.25.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66df7256371c45621b3b7d2fb23aea923d577616b9c0e9c0b950a6ea5c2be0ca" -dependencies = [ - "http 1.3.1", - "serde", - "serde_json", - "thiserror 2.0.12", -] - -[[package]] -name = "jsonrpsee-wasm-client" -version = "0.25.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b67695cbcf4653f39f8f8738925547e0e23fd9fe315bccf951097b9f6a38781" -dependencies = [ - "jsonrpsee-client-transport", - "jsonrpsee-core", - "jsonrpsee-types", - "tower 0.5.2", -] - -[[package]] -name = "jsonrpsee-ws-client" -version = "0.25.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2da2694c9ff271a9d3ebfe520f6b36820e85133a51be77a3cb549fd615095261" -dependencies = [ - "http 1.3.1", - "jsonrpsee-client-transport", - "jsonrpsee-core", - "jsonrpsee-types", - "tower 0.5.2", - "url", -] - [[package]] name = "k256" version = "0.13.4" @@ -3073,15 +2750,6 @@ dependencies = [ "unicode-width", ] -[[package]] -name = "proc-macro-crate" -version = "3.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edce586971a4dfaa28950c6f18ed55e0406c1ab88bbce2c6f6293a7aaba73d35" -dependencies = [ - "toml_edit", -] - [[package]] name = "proc-macro2" version = "1.0.95" @@ -3240,12 +2908,6 @@ dependencies = [ "bitflags 2.9.1", ] -[[package]] -name = "recvmsg" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3edd4d5d42c92f0a659926464d4cce56b562761267ecf0f469d85b7de384175" - [[package]] name = "redis" version = "0.25.4" @@ -3440,26 +3102,6 @@ dependencies = [ "web-sys", ] -[[package]] -name = "reth-ipc" -version = "1.6.0" -source = "git+https://github.com/paradigmxyz/reth#59e4a5556fa54f1c210e45412b6a91f2351bea19" -dependencies = [ - "bytes", - "futures", - "futures-util", - "interprocess", - "jsonrpsee", - "pin-project", - "serde_json", - "thiserror 2.0.12", - "tokio", - "tokio-stream", - "tokio-util", - "tower 0.5.2", - "tracing", -] - [[package]] name = "rfc6979" version = "0.4.0" @@ -3553,12 +3195,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "route-recognizer" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afab94fb28594581f62d981211a9a4d53cc8130bbcbbb89a0440d9b8e81a7746" - [[package]] name = "rust_decimal" version = "1.37.2" @@ -3575,12 +3211,6 @@ version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" -[[package]] -name = "rustc-hash" -version = "2.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" - [[package]] name = "rustix" version = "0.38.44" @@ -3674,33 +3304,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "rustls-platform-verifier" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19787cda76408ec5404443dc8b31795c87cd8fec49762dc75fa727740d34acc1" -dependencies = [ - "core-foundation 0.10.1", - "core-foundation-sys", - "jni", - "log", - "once_cell", - "rustls", - "rustls-native-certs 0.8.1", - "rustls-platform-verifier-android", - "rustls-webpki", - "security-framework 3.3.0", - "security-framework-sys", - "webpki-root-certs 0.26.11", - "windows-sys 0.59.0", -] - -[[package]] -name = "rustls-platform-verifier-android" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" - [[package]] name = "rustls-webpki" version = "0.103.4" @@ -3868,7 +3471,7 @@ dependencies = [ "serde_json", "thiserror 1.0.69", "tokio", - "zinit-client 0.4.0", + "zinit-client", ] [[package]] @@ -3926,7 +3529,7 @@ dependencies = [ "serde_json", "thiserror 2.0.12", "tokio", - "zinit-client 0.4.0", + "zinit-client", ] [[package]] @@ -4046,12 +3649,6 @@ dependencies = [ "libc", ] -[[package]] -name = "send_wrapper" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f638d531eccd6e23b980caf34876660d38e265409d8e99b397ab71eb3612fad0" - [[package]] name = "serde" version = "1.0.219" @@ -4286,22 +3883,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "soketto" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e859df029d160cb88608f5d7df7fb4753fd20fdfb4de5644f3d8b8440841721" -dependencies = [ - "base64 0.22.1", - "bytes", - "futures", - "http 1.3.1", - "httparse", - "log", - "rand 0.8.5", - "sha1", -] - [[package]] name = "spin" version = "0.5.2" @@ -4740,18 +4321,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-stream" -version = "0.1.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" -dependencies = [ - "futures-core", - "pin-project-lite", - "tokio", - "tokio-util", -] - [[package]] name = "tokio-util" version = "0.7.16" @@ -4760,7 +4329,6 @@ checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5" dependencies = [ "bytes", "futures-core", - "futures-io", "futures-sink", "pin-project-lite", "tokio", @@ -5318,24 +4886,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "webpki-root-certs" -version = "0.26.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75c7f0ef91146ebfb530314f5f1d24528d7f0767efbfd31dce919275413e393e" -dependencies = [ - "webpki-root-certs 1.0.2", -] - -[[package]] -name = "webpki-root-certs" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e4ffd8df1c57e87c325000a3d6ef93db75279dc3a231125aac571650f22b12a" -dependencies = [ - "rustls-pki-types", -] - [[package]] name = "whoami" version = "1.6.0" @@ -5347,12 +4897,6 @@ dependencies = [ "web-sys", ] -[[package]] -name = "widestring" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd7cf3379ca1aac9eea11fba24fd7e315d621f8dfe35c8d7d2be8b793726e07d" - [[package]] name = "winapi" version = "0.3.9" @@ -5497,15 +5041,6 @@ dependencies = [ "windows-link", ] -[[package]] -name = "windows-sys" -version = "0.45.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" -dependencies = [ - "windows-targets 0.42.2", -] - [[package]] name = "windows-sys" version = "0.48.0" @@ -5542,21 +5077,6 @@ dependencies = [ "windows-targets 0.53.3", ] -[[package]] -name = "windows-targets" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" -dependencies = [ - "windows_aarch64_gnullvm 0.42.2", - "windows_aarch64_msvc 0.42.2", - "windows_i686_gnu 0.42.2", - "windows_i686_msvc 0.42.2", - "windows_x86_64_gnu 0.42.2", - "windows_x86_64_gnullvm 0.42.2", - "windows_x86_64_msvc 0.42.2", -] - [[package]] name = "windows-targets" version = "0.48.5" @@ -5614,12 +5134,6 @@ dependencies = [ "windows-link", ] -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" - [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -5638,12 +5152,6 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86b8d5f90ddd19cb4a147a5fa63ca848db3df085e25fee3cc10b39b6eebae764" -[[package]] -name = "windows_aarch64_msvc" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" - [[package]] name = "windows_aarch64_msvc" version = "0.48.5" @@ -5662,12 +5170,6 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7651a1f62a11b8cbd5e0d42526e55f2c99886c77e007179efff86c2b137e66c" -[[package]] -name = "windows_i686_gnu" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" - [[package]] name = "windows_i686_gnu" version = "0.48.5" @@ -5698,12 +5200,6 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ce6ccbdedbf6d6354471319e781c0dfef054c81fbc7cf83f338a4296c0cae11" -[[package]] -name = "windows_i686_msvc" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" - [[package]] name = "windows_i686_msvc" version = "0.48.5" @@ -5722,12 +5218,6 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "581fee95406bb13382d2f65cd4a908ca7b1e4c2f1917f143ba16efe98a589b5d" -[[package]] -name = "windows_x86_64_gnu" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" - [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -5746,12 +5236,6 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e55b5ac9ea33f2fc1716d1742db15574fd6fc8dadc51caab1c16a3d3b4190ba" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" - [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" @@ -5770,12 +5254,6 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0a6e035dd0599267ce1ee132e51c27dd29437f63325753051e71dd9e42406c57" -[[package]] -name = "windows_x86_64_msvc" -version = "0.42.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" - [[package]] name = "windows_x86_64_msvc" version = "0.48.5" @@ -5932,22 +5410,6 @@ dependencies = [ "syn 2.0.104", ] -[[package]] -name = "zinit-client" -version = "0.1.0" -source = "git+https://github.com/threefoldtech/zinit?branch=master#1b76c062fe31d552d1b7b23484ce163995a81482" -dependencies = [ - "anyhow", - "async-trait", - "jsonrpsee", - "log", - "reth-ipc", - "serde", - "serde_json", - "thiserror 1.0.69", - "tokio", -] - [[package]] name = "zinit-client" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index e4f67cd..ec121ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,12 +8,16 @@ name = "actor_system" # Can be different from package name, or same path = "src/lib.rs" [[bin]] -name = "actor_system" -path = "cmd/actor.rs" +name = "runner_sal" +path = "src/bin/runner_sal/main.rs" [[bin]] -name = "actor_system_tui" -path = "cmd/terminal_ui.rs" +name = "runner_osis" +path = "src/bin/runner_osis/main.rs" + +[[bin]] +name = "test_sync_runner" +path = "test_sync_runner.rs" [[example]] name = "engine" @@ -40,13 +44,15 @@ chrono = { version = "0.4", features = ["serde"] } toml = "0.8" thiserror = "1.0" async-trait = "0.1" -hero_job = { git = "https://git.ourworld.tf/herocode/baobab.git"} -baobab_actor = { git = "https://git.ourworld.tf/herocode/baobab.git"} +# Core hero dependencies +hero-job = { path = "../job" } +#hero-job = { git = "https://git.ourworld.tf/herocode/job.git" } heromodels = { git = "https://git.ourworld.tf/herocode/db.git" } heromodels_core = { git = "https://git.ourworld.tf/herocode/db.git" } heromodels-derive = { git = "https://git.ourworld.tf/herocode/db.git" } rhailib_dsl = { git = "https://git.ourworld.tf/herocode/rhailib.git" } hero_logger = { git = "https://git.ourworld.tf/herocode/baobab.git", branch = "logger" } +# SAL modules for system engine sal-os = { git = "https://git.ourworld.tf/herocode/herolib_rust.git" } sal-redisclient = { git = "https://git.ourworld.tf/herocode/herolib_rust.git" } sal-postgresclient = { git = "https://git.ourworld.tf/herocode/herolib_rust.git" } @@ -61,6 +67,10 @@ sal-kubernetes = { git = "https://git.ourworld.tf/herocode/herolib_rust.git" } sal-service-manager = { git = "https://git.ourworld.tf/herocode/herolib_rust.git" } sal-vault = { git = "https://git.ourworld.tf/herocode/herolib_rust.git" } sal-hetzner = { git = "https://git.ourworld.tf/herocode/herolib_rust.git" } +# TUI dependencies (from actor_osis) +tracing = "0.1.41" +ratatui = "0.28" +crossterm = "0.28" [features] default = ["calendar", "finance"] diff --git a/cmd/async_actor.rs b/cmd/async_actor.rs new file mode 100644 index 0000000..cb6d70b --- /dev/null +++ b/cmd/async_actor.rs @@ -0,0 +1,71 @@ +use clap::Parser; +use log::info; +use std::time::Duration; +use tokio::sync::mpsc; + +use actor_system::{spawn_async_actor}; + +#[derive(Parser, Debug)] +#[command(name = "async_actor")] +#[command(about = "Async Actor - processes jobs concurrently with SAL modules")] +struct Args { + /// Actor ID for this instance + #[arg(short, long)] + actor_id: String, + + /// Database path + #[arg(short, long, default_value = "/tmp/actor_db")] + db_path: String, + + /// Redis URL + #[arg(short, long, default_value = "redis://localhost:6379")] + redis_url: String, + + /// Default timeout in seconds for job execution + #[arg(short, long, default_value = "300")] + timeout: u64, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + + let args = Args::parse(); + + info!("Starting Async Actor with ID: {}", args.actor_id); + info!("Database path: {}", args.db_path); + info!("Redis URL: {}", args.redis_url); + info!("Default timeout: {}s", args.timeout); + + // Create shutdown channel + let (_shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); + + // Spawn the async actor + let handle = spawn_async_actor( + args.actor_id, + args.db_path, + args.redis_url, + shutdown_rx, + Duration::from_secs(args.timeout), + ); + + // Wait for the actor to complete + match handle.await { + Ok(result) => { + match result { + Ok(()) => { + info!("Async Actor completed successfully"); + Ok(()) + } + Err(e) => { + eprintln!("Async Actor error: {}", e); + Err(e) + } + } + } + Err(e) => { + eprintln!("Failed to join async actor task: {}", e); + Err(Box::new(e)) + } + } +} diff --git a/cmd/async_actor_tui.rs b/cmd/async_actor_tui.rs new file mode 100644 index 0000000..987176e --- /dev/null +++ b/cmd/async_actor_tui.rs @@ -0,0 +1,149 @@ +//! Terminal UI for Async Actor - Monitor and dispatch jobs to async actor with SAL modules +//! +//! This binary provides a TUI interface for monitoring and dispatching jobs to the async actor. + +use anyhow::{Result, Context}; +use clap::Parser; +use log::{info, warn, error}; +use std::path::PathBuf; +use std::process::{Child, Command}; +use tokio::signal; + +#[derive(Parser)] +#[command(name = "async-actor-tui")] +#[command(about = "Terminal UI for Async Actor - Monitor and dispatch jobs with SAL modules")] +struct Args { + /// Actor ID for this instance + #[arg(short, long, default_value = "async_sal")] + actor_id: String, + + /// Redis URL for job queue + #[arg(short, long, default_value = "redis://localhost:6379")] + redis_url: String, + + /// Database path + #[arg(short, long, default_value = "/tmp/actor_db")] + db_path: String, + + /// Default timeout in seconds for job execution + #[arg(short, long, default_value = "300")] + timeout: u64, + + /// Enable verbose logging + #[arg(short, long)] + verbose: bool, +} + +/// Initialize logging based on verbosity level +fn init_logging(verbose: bool) { + if verbose { + env_logger::Builder::from_default_env() + .filter_level(log::LevelFilter::Debug) + .init(); + } else { + env_logger::Builder::from_default_env() + .filter_level(log::LevelFilter::Info) + .init(); + } +} + +/// Spawn the async actor binary as a background process +fn spawn_actor_process(args: &Args) -> Result { + // Get the crate root directory + let crate_root = std::env::var("CARGO_MANIFEST_DIR") + .unwrap_or_else(|_| ".".to_string()); + let actor_path = PathBuf::from(crate_root).join("target/debug/async_actor"); + info!("๐ŸŽฌ Spawning async actor process: {}", actor_path.display()); + + let mut cmd = Command::new(&actor_path); + + // Add command line arguments + cmd.args(&[ + "--actor-id", &args.actor_id, + "--db-path", &args.db_path, + "--redis-url", &args.redis_url, + "--timeout", &args.timeout.to_string(), + ]); + + // Redirect stdout and stderr to null to prevent logs from interfering with TUI + cmd.stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()); + + // Spawn the process + let child = cmd + .spawn() + .with_context(|| format!("Failed to spawn async actor process: {}", actor_path.display()))?; + + info!("โœ… Async actor process spawned with PID: {}", child.id()); + Ok(child) +} + +/// Cleanup function to terminate actor process +fn cleanup_actor_process(mut actor_process: Child) { + info!("๐Ÿงน Cleaning up async actor process..."); + + match actor_process.try_wait() { + Ok(Some(status)) => { + info!("Async actor process already exited with status: {}", status); + } + Ok(None) => { + info!("Terminating async actor process..."); + if let Err(e) = actor_process.kill() { + error!("Failed to kill async actor process: {}", e); + } else { + match actor_process.wait() { + Ok(status) => info!("Async actor process terminated with status: {}", status), + Err(e) => error!("Failed to wait for async actor process: {}", e), + } + } + } + Err(e) => { + error!("Failed to check async actor process status: {}", e); + } + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + // Initialize logging + init_logging(args.verbose); + + let crate_root = std::env::var("CARGO_MANIFEST_DIR") + .unwrap_or_else(|_| ".".to_string()); + + info!("๐Ÿš€ Starting Async Actor TUI..."); + info!("Actor ID: {}", args.actor_id); + info!("Actor Path: {}/target/debug/async_actor", crate_root); + info!("Redis URL: {}", args.redis_url); + info!("Database Path: {}", args.db_path); + info!("Default Timeout: {}s", args.timeout); + info!("Script Type: SAL (System Abstraction Layer)"); + + // Spawn the actor process first + let actor_process = spawn_actor_process(&args)?; + + // Give the actor a moment to start up + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + info!("๐Ÿ“‹ Async Actor TUI is running. The actor processes jobs concurrently."); + info!("๐Ÿ’ก Use Redis CLI or job submission tools to send jobs to queue: actor_queue:{}", args.actor_id); + info!("๐Ÿ”„ Jobs will be processed with SAL modules (system operations)"); + info!("โฑ๏ธ Each job has a timeout of {}s", args.timeout); + info!("๐Ÿ“Š Multiple jobs can run simultaneously"); + info!("Press Ctrl+C to exit..."); + + // Wait for Ctrl+C + let result = tokio::select! { + _ = signal::ctrl_c() => { + info!("Received Ctrl+C, shutting down..."); + Ok(()) + } + }; + + // Clean up the actor process + cleanup_actor_process(actor_process); + + result +} diff --git a/cmd/sync_actor.rs b/cmd/sync_actor.rs new file mode 100644 index 0000000..1725e73 --- /dev/null +++ b/cmd/sync_actor.rs @@ -0,0 +1,70 @@ +use clap::Parser; +use log::info; +use tokio::sync::mpsc; + +use actor_system::{spawn_sync_actor}; + +#[derive(Parser, Debug)] +#[command(name = "sync_actor")] +#[command(about = "Sync Actor - processes jobs sequentially with DSL modules")] +struct Args { + /// Actor ID for this instance + #[arg(short, long)] + actor_id: String, + + /// Database path + #[arg(short, long, default_value = "/tmp/actor_db")] + db_path: String, + + /// Redis URL + #[arg(short, long, default_value = "redis://localhost:6379")] + redis_url: String, + + /// Preserve completed tasks in Redis (don't delete them) + #[arg(short, long, default_value = "false")] + preserve_tasks: bool, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + + let args = Args::parse(); + + info!("Starting Sync Actor with ID: {}", args.actor_id); + info!("Database path: {}", args.db_path); + info!("Redis URL: {}", args.redis_url); + info!("Preserve tasks: {}", args.preserve_tasks); + + // Create shutdown channel + let (_shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); + + // Spawn the sync actor + let handle = spawn_sync_actor( + args.actor_id, + args.db_path, + args.redis_url, + shutdown_rx, + args.preserve_tasks, + ); + + // Wait for the actor to complete + match handle.await { + Ok(result) => { + match result { + Ok(()) => { + info!("Sync Actor completed successfully"); + Ok(()) + } + Err(e) => { + eprintln!("Sync Actor error: {}", e); + Err(e) + } + } + } + Err(e) => { + eprintln!("Failed to join sync actor task: {}", e); + Err(Box::new(e)) + } + } +} diff --git a/cmd/sync_actor_tui.rs b/cmd/sync_actor_tui.rs new file mode 100644 index 0000000..11d88a8 --- /dev/null +++ b/cmd/sync_actor_tui.rs @@ -0,0 +1,153 @@ +//! Terminal UI for Sync Actor - Monitor and dispatch jobs to sync actor with DSL modules +//! +//! This binary provides a TUI interface for monitoring and dispatching jobs to the sync actor. + +use anyhow::{Result, Context}; +use clap::Parser; +use log::{info, warn, error}; +use std::path::PathBuf; +use std::process::{Child, Command}; +use tokio::signal; + +#[derive(Parser)] +#[command(name = "sync-actor-tui")] +#[command(about = "Terminal UI for Sync Actor - Monitor and dispatch jobs with DSL modules")] +struct Args { + /// Actor ID for this instance + #[arg(short, long, default_value = "sync_osis")] + actor_id: String, + + /// Redis URL for job queue + #[arg(short, long, default_value = "redis://localhost:6379")] + redis_url: String, + + /// Database path + #[arg(short, long, default_value = "/tmp/actor_db")] + db_path: String, + + /// Preserve completed tasks in Redis (don't delete them) + #[arg(short, long, default_value = "false")] + preserve_tasks: bool, + + /// Enable verbose logging + #[arg(short, long)] + verbose: bool, +} + +/// Initialize logging based on verbosity level +fn init_logging(verbose: bool) { + if verbose { + env_logger::Builder::from_default_env() + .filter_level(log::LevelFilter::Debug) + .init(); + } else { + env_logger::Builder::from_default_env() + .filter_level(log::LevelFilter::Info) + .init(); + } +} + +/// Spawn the sync actor binary as a background process +fn spawn_actor_process(args: &Args) -> Result { + // Get the crate root directory + let crate_root = std::env::var("CARGO_MANIFEST_DIR") + .unwrap_or_else(|_| ".".to_string()); + let actor_path = PathBuf::from(crate_root).join("target/debug/sync_actor"); + info!("๐ŸŽฌ Spawning sync actor process: {}", actor_path.display()); + + let mut cmd = Command::new(&actor_path); + + // Add command line arguments + cmd.args(&[ + "--actor-id", &args.actor_id, + "--db-path", &args.db_path, + "--redis-url", &args.redis_url, + "--preserve-tasks", &args.preserve_tasks.to_string(), + ]); + + // Redirect stdout and stderr to null to prevent logs from interfering with TUI + cmd.stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()); + + // Spawn the process + let child = cmd + .spawn() + .with_context(|| format!("Failed to spawn sync actor process: {}", actor_path.display()))?; + + info!("โœ… Sync actor process spawned with PID: {}", child.id()); + Ok(child) +} + +/// Cleanup function to terminate actor process +fn cleanup_actor_process(mut actor_process: Child) { + info!("๐Ÿงน Cleaning up sync actor process..."); + + match actor_process.try_wait() { + Ok(Some(status)) => { + info!("Sync actor process already exited with status: {}", status); + } + Ok(None) => { + info!("Terminating sync actor process..."); + if let Err(e) = actor_process.kill() { + error!("Failed to kill sync actor process: {}", e); + } else { + match actor_process.wait() { + Ok(status) => info!("Sync actor process terminated with status: {}", status), + Err(e) => error!("Failed to wait for sync actor process: {}", e), + } + } + } + Err(e) => { + error!("Failed to check sync actor process status: {}", e); + } + } +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + // Initialize logging + init_logging(args.verbose); + + let crate_root = std::env::var("CARGO_MANIFEST_DIR") + .unwrap_or_else(|_| ".".to_string()); + + info!("๐Ÿš€ Starting Sync Actor TUI..."); + info!("Actor ID: {}", args.actor_id); + info!("Actor Path: {}/target/debug/sync_actor", crate_root); + info!("Redis URL: {}", args.redis_url); + info!("Database Path: {}", args.db_path); + info!("Preserve Tasks: {}", args.preserve_tasks); + info!("Script Type: DSL (Domain Specific Language)"); + + // Spawn the actor process first + let actor_process = spawn_actor_process(&args)?; + + // Give the actor a moment to start up + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + info!("๐Ÿ“‹ Sync Actor TUI is running. The actor processes jobs sequentially."); + info!("๐Ÿ’ก Use Redis CLI or job submission tools to send jobs to queue: actor_queue:{}", args.actor_id); + info!("๐Ÿ”„ Jobs will be processed with DSL modules (business operations)"); + info!("๐Ÿ“Š Jobs are processed one at a time in order"); + if args.preserve_tasks { + info!("๐Ÿ’พ Completed tasks will be preserved in Redis"); + } else { + info!("๐Ÿ—‘๏ธ Completed tasks will be cleaned up from Redis"); + } + info!("Press Ctrl+C to exit..."); + + // Wait for Ctrl+C + let result = tokio::select! { + _ = signal::ctrl_c() => { + info!("Received Ctrl+C, shutting down..."); + Ok(()) + } + }; + + // Clean up the actor process + cleanup_actor_process(actor_process); + + result +} diff --git a/src/async_runner.rs b/src/async_runner.rs new file mode 100644 index 0000000..d77fb2d --- /dev/null +++ b/src/async_runner.rs @@ -0,0 +1,270 @@ +use hero_job::Job; +use log::{debug, error, info}; +use rhai::Engine; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::{mpsc, Mutex}; +use tokio::task::JoinHandle; + +use crate::runner_trait::Runner; + +/// Represents a running job with its handle and metadata +struct RunningJob { + job_id: String, + handle: JoinHandle>>, + started_at: std::time::Instant, +} + +/// Builder for AsyncRunner +#[derive(Default)] +pub struct AsyncRunnerBuilder { + runner_id: Option, + db_path: Option, + redis_url: Option, + default_timeout: Option, + engine: Option Engine + Send + Sync>>, +} + +impl AsyncRunnerBuilder { + pub fn new() -> Self { + Self::default() + } + + pub fn runner_id>(mut self, runner_id: S) -> Self { + self.runner_id = Some(runner_id.into()); + self + } + + pub fn db_path>(mut self, db_path: S) -> Self { + self.db_path = Some(db_path.into()); + self + } + + pub fn redis_url>(mut self, redis_url: S) -> Self { + self.redis_url = Some(redis_url.into()); + self + } + + pub fn default_timeout(mut self, timeout: Duration) -> Self { + self.default_timeout = Some(timeout); + self + } + + pub fn engine_factory(mut self, factory: F) -> Self + where + F: Fn() -> Engine + Send + Sync + 'static, + { + self.engine = Some(Arc::new(factory)); + self + } + + pub fn build(self) -> Result { + Ok(AsyncRunner { + runner_id: self.runner_id.ok_or("runner_id is required")?, + db_path: self.db_path.ok_or("db_path is required")?, + redis_url: self.redis_url.ok_or("redis_url is required")?, + default_timeout: self.default_timeout.unwrap_or(Duration::from_secs(300)), + engine_factory: self.engine.ok_or("engine factory is required")?, + running_jobs: Arc::new(Mutex::new(HashMap::new())), + }) + } +} + +/// Asynchronous runner that processes jobs concurrently +pub struct AsyncRunner { + pub runner_id: String, + pub db_path: String, + pub redis_url: String, + pub default_timeout: Duration, + pub engine_factory: Arc Engine + Send + Sync>, + running_jobs: Arc>>, +} + +impl AsyncRunner { + /// Create a new AsyncRunnerBuilder + pub fn builder() -> AsyncRunnerBuilder { + AsyncRunnerBuilder::new() + } + + /// Add a running job to the tracking map + async fn add_running_job(&self, job_id: String, handle: JoinHandle>>) { + let running_job = RunningJob { + job_id: job_id.clone(), + handle, + started_at: std::time::Instant::now(), + }; + + let mut jobs = self.running_jobs.lock().await; + jobs.insert(job_id.clone(), running_job); + debug!("Async Runner: Added running job '{}'. Total running: {}", + job_id, jobs.len()); + } + + /// Remove a completed job from the tracking map + async fn remove_running_job(&self, job_id: &str) { + let mut jobs = self.running_jobs.lock().await; + if let Some(job) = jobs.remove(job_id) { + let duration = job.started_at.elapsed(); + debug!("Async Runner: Removed completed job '{}' after {:?}. Remaining: {}", + job_id, duration, jobs.len()); + } + } + + /// Get the count of currently running jobs + pub async fn running_job_count(&self) -> usize { + let jobs = self.running_jobs.lock().await; + jobs.len() + } + + /// Cleanup any finished jobs from the running jobs map + async fn cleanup_finished_jobs(&self) { + let mut jobs = self.running_jobs.lock().await; + let mut to_remove = Vec::new(); + + for (job_id, running_job) in jobs.iter() { + if running_job.handle.is_finished() { + to_remove.push(job_id.clone()); + } + } + + for job_id in to_remove { + if let Some(job) = jobs.remove(&job_id) { + let duration = job.started_at.elapsed(); + debug!("Async Runner: Cleaned up finished job '{}' after {:?}", + job_id, duration); + } + } + } + +} + + +impl Runner for AsyncRunner { + fn process_job(&self, job: Job) -> Result> { + let job_id = job.id.clone(); + let runner_id = &self.runner_id; + + // Determine timeout (use job-specific timeout if available, otherwise default) + let job_timeout = if job.timeout > 0 { + Duration::from_secs(job.timeout) + } else { + self.default_timeout + }; + + info!("Async Runner '{}', Job {}: Spawning job execution task with timeout {:?}", + runner_id, job_id, job_timeout); + + // Clone necessary data for the spawned task + let job_id_clone = job_id.clone(); + let runner_id_clone = runner_id.clone(); + let runner_id_debug = runner_id.clone(); + let job_id_debug = job_id.clone(); + let _redis_url_clone = self.redis_url.clone(); + let running_jobs_clone = Arc::clone(&self.running_jobs); + let engine_factory = Arc::clone(&self.engine_factory); + let db_path_clone = self.db_path.clone(); + + // Spawn the job execution task + let job_handle = tokio::spawn(async move { + // Create a new engine instance for this job + let mut engine = engine_factory(); + let mut db_config = rhai::Map::new(); + db_config.insert("DB_PATH".into(), db_path_clone.into()); + db_config.insert("CALLER_ID".into(), job.caller_id.clone().into()); + db_config.insert("CONTEXT_ID".into(), job.context_id.clone().into()); + engine.set_default_tag(rhai::Dynamic::from(db_config)); + + // Execute the Rhai script + let result = match engine.eval::(&job.payload) { + Ok(result) => { + let result_str = if result.is::() { + result.into_string().unwrap() + } else { + result.to_string() + }; + info!("Async Runner '{}', Job {}: Script executed successfully. Result: {}", + runner_id_clone, job_id_clone, result_str); + Ok(result_str) + } + Err(e) => { + let error_msg = format!("Script execution error: {}", e); + error!("Async Runner '{}', Job {}: {}", runner_id_clone, job_id_clone, error_msg); + Err(Box::new(e) as Box) + } + }; + + // Remove this job from the running jobs map when it completes + let mut jobs = running_jobs_clone.lock().await; + if let Some(running_job) = jobs.remove(&job_id_clone) { + let duration = running_job.started_at.elapsed(); + debug!("Async Runner '{}': Removed completed job '{}' after {:?}", + runner_id_debug, job_id_debug, duration); + } + + result + }); + + // Add the job to the running jobs map + let running_job = RunningJob { + job_id: job_id.clone(), + handle: job_handle, + started_at: std::time::Instant::now(), + }; + + let running_jobs_clone = Arc::clone(&self.running_jobs); + let job_id_for_map = job_id.clone(); + tokio::spawn(async move { + let mut jobs = running_jobs_clone.lock().await; + jobs.insert(job_id_for_map, running_job); + debug!("Async Runner: Added running job '{}'. Total running: {}", + job_id, jobs.len()); + }); + + // For async runners, we return immediately with a placeholder + // The actual result will be handled by the spawned task + Ok("Job spawned for async processing".to_string()) + } + + fn runner_type(&self) -> &'static str { + "Async" + } + + fn runner_id(&self) -> &str { + &self.runner_id + } + + fn redis_url(&self) -> &str { + &self.redis_url + } +} + +/// Convenience function to spawn an asynchronous runner using the trait interface +/// +/// This function provides a clean interface for the new async runner implementation +/// with timeout support. +pub fn spawn_async_runner( + runner_id: String, + db_path: String, + redis_url: String, + shutdown_rx: mpsc::Receiver<()>, + default_timeout: std::time::Duration, + engine_factory: F, +) -> JoinHandle>> +where + F: Fn() -> Engine + Send + Sync + 'static, +{ + use std::sync::Arc; + + let runner = Arc::new( + AsyncRunner::builder() + .runner_id(runner_id) + .db_path(db_path) + .redis_url(redis_url) + .default_timeout(default_timeout) + .engine_factory(engine_factory) + .build() + .expect("Failed to build AsyncRunner") + ); + crate::runner_trait::spawn_runner(runner, shutdown_rx) +} diff --git a/src/bin/runner_osis/README.md b/src/bin/runner_osis/README.md new file mode 100644 index 0000000..119274a --- /dev/null +++ b/src/bin/runner_osis/README.md @@ -0,0 +1,118 @@ +# OSIS Runner + +The OSIS (Object Storage Information System) Runner is a synchronous job processing engine that executes Rhai scripts with access to OSIS-specific operations and data management capabilities. + +## Features + +- **Synchronous Processing**: Processes jobs sequentially, ensuring deterministic execution order +- **Redis Integration**: Uses Redis for job queue management and coordination +- **OSIS Operations**: Access to object storage, metadata management, and information system operations +- **Task Persistence**: Optional task preservation for debugging and audit purposes +- **Graceful Shutdown**: Responds to SIGINT (Ctrl+C) for clean termination +- **SQLite Database**: Local database storage for job state and metadata + +## Usage + +```bash +cargo run --bin runner_osis -- [OPTIONS] +``` + +### Arguments + +- ``: Unique identifier for this runner instance (required, positional) + +### Options + +- `-d, --db-path `: SQLite database file path (default: `/tmp/osis.db`) +- `-r, --redis-url `: Redis connection URL (default: `redis://localhost:6379`) +- `-p, --preserve-tasks`: Preserve completed tasks in database for debugging (default: `false`) + +### Examples + +```bash +# Basic usage with default settings +cargo run --bin runner_osis -- myrunner + +# Custom Redis URL and database path +cargo run --bin runner_osis -- osis-prod -r redis://prod-redis:6379 -d /var/lib/osis.db + +# Enable task preservation for debugging +cargo run --bin runner_osis -- debug-runner -p +``` + +## Available OSIS Modules + +The OSIS runner provides access to specialized modules for information system operations: + +- **Object Storage**: File and object management operations +- **Metadata Management**: Information indexing and retrieval +- **Data Processing**: Content analysis and transformation +- **System Integration**: Interface with external information systems +- **Audit and Logging**: Comprehensive operation tracking + +## Architecture + +The OSIS runner uses a synchronous architecture that: + +1. Connects to Redis for job queue management +2. Initializes SQLite database for local state management +3. Creates a Rhai engine with OSIS modules registered +4. Processes jobs sequentially in FIFO order +5. Optionally preserves task history for debugging +6. Handles graceful shutdown on SIGINT + +## Synchronous vs Asynchronous + +Unlike the SAL runner, the OSIS runner processes jobs synchronously: + +- **Sequential Processing**: Jobs are processed one at a time +- **Deterministic Order**: Ensures predictable execution sequence +- **Resource Safety**: Prevents resource conflicts in data operations +- **Debugging Friendly**: Easier to trace and debug job execution + +## Database Schema + +The runner maintains a SQLite database with the following structure: + +- **Jobs Table**: Active and completed job records +- **Task History**: Optional preservation of task execution details +- **Metadata**: Runner configuration and state information + +## Error Handling + +The runner provides detailed error messages for: + +- Redis connection failures +- Database initialization and access problems +- Script execution errors +- Resource cleanup issues +- Shutdown sequence problems + +## Logging + +Set the `RUST_LOG` environment variable to control logging levels: + +```bash +RUST_LOG=info cargo run --bin runner_osis -- myrunner +``` + +Available log levels: `error`, `warn`, `info`, `debug`, `trace` + +## Task Preservation + +When `--preserve-tasks` is enabled: + +- Completed tasks remain in the database +- Useful for debugging and audit trails +- May require periodic cleanup for long-running instances +- Increases database size over time + +## Use Cases + +The OSIS runner is ideal for: + +- Data processing pipelines requiring strict ordering +- Information system operations with dependencies +- Batch processing jobs that must complete sequentially +- Debugging scenarios where task history is important +- Operations requiring transactional consistency diff --git a/src/bin/runner_osis/engine.rs b/src/bin/runner_osis/engine.rs new file mode 100644 index 0000000..c6e5be5 --- /dev/null +++ b/src/bin/runner_osis/engine.rs @@ -0,0 +1,123 @@ +use rhai::Engine; +use rhailib_dsl; +use std::sync::{Arc, OnceLock}; + +/// Engine factory for creating and sharing Rhai engines with DSL modules. +pub struct EngineFactory { + engine: Arc, +} + +impl EngineFactory { + /// Create a new engine factory with a configured Rhai engine. + pub fn new() -> Self { + let mut engine = Engine::new(); + register_dsl_modules(&mut engine); + // Logger + hero_logger::rhai_integration::configure_rhai_logging(&mut engine, "osis_runner"); + + Self { + engine: Arc::new(engine), + } + } + + /// Get a shared reference to the engine. + pub fn get_engine(&self) -> Arc { + Arc::clone(&self.engine) + } + + /// Get the global singleton engine factory. + pub fn global() -> &'static EngineFactory { + static FACTORY: OnceLock = OnceLock::new(); + FACTORY.get_or_init(|| EngineFactory::new()) + } +} + +/// Register basic object functions directly in the engine. +/// This provides object functionality without relying on the problematic rhailib_dsl object module. +fn register_object_functions(engine: &mut Engine) { + use heromodels::models::object::Object; + + // Register the Object type + engine.register_type_with_name::("Object"); + + // Register constructor function + engine.register_fn("new_object", || Object::new()); + + // Register setter functions + engine.register_fn("object_title", |obj: &mut Object, title: String| { + obj.title = title; + obj.clone() + }); + + engine.register_fn( + "object_description", + |obj: &mut Object, description: String| { + obj.description = description; + obj.clone() + }, + ); + + // Register getter functions + engine.register_fn("get_object_id", |obj: &mut Object| obj.id() as i64); + engine.register_fn("get_object_title", |obj: &mut Object| obj.title.clone()); + engine.register_fn("get_object_description", |obj: &mut Object| { + obj.description.clone() + }); +} + +/// Registers all DSL modules with the provided Rhai engine. +/// +/// This function is the main entry point for integrating the rhailib DSL with a Rhai engine. +/// It registers all business domain modules, making their functions available to Rhai scripts. +/// +/// # Arguments +/// +/// * `engine` - A mutable reference to the Rhai engine to register modules with +/// +/// # Registered Modules +/// +/// This function registers the following domain modules: +/// - Access control functions +/// - Business operation functions (companies, products, sales, shareholders) +/// - Calendar and scheduling functions +/// - Circle and community management functions +/// - Company management functions +/// - Contact management functions +/// - Core utility functions +/// - Financial operation functions (accounts, assets, marketplace) +/// - Workflow management functions (flows, steps, signatures) +/// - Library and content management functions +/// - Generic object manipulation functions (custom implementation) +pub fn register_dsl_modules(engine: &mut Engine) { + rhailib_dsl::access::register_access_rhai_module(engine); + rhailib_dsl::biz::register_biz_rhai_module(engine); + rhailib_dsl::calendar::register_calendar_rhai_module(engine); + rhailib_dsl::circle::register_circle_rhai_module(engine); + rhailib_dsl::company::register_company_rhai_module(engine); + rhailib_dsl::contact::register_contact_rhai_module(engine); + rhailib_dsl::core::register_core_rhai_module(engine); + rhailib_dsl::finance::register_finance_rhai_modules(engine); + // rhailib_dsl::flow::register_flow_rhai_modules(engine); + rhailib_dsl::library::register_library_rhai_module(engine); + // Skip problematic object module for now - can be implemented separately if needed + // rhailib_dsl::object::register_object_fns(engine); + rhailib_dsl::payment::register_payment_rhai_module(engine); + + // Register basic object functionality directly + register_object_functions(engine); + + println!("Rhailib Domain Specific Language modules registered successfully."); +} + +/// Create a new osis engine instance. +pub fn create_osis_engine() -> Engine { + let mut engine = Engine::new(); + register_dsl_modules(&mut engine); + hero_logger::rhai_integration::configure_rhai_logging(&mut engine, "osis_runner"); + engine +} + +/// Create a shared osis engine using the factory. +pub fn create_shared_osis_engine() -> Arc { + EngineFactory::global().get_engine() +} diff --git a/src/bin/runner_osis/main.rs b/src/bin/runner_osis/main.rs new file mode 100644 index 0000000..e81b079 --- /dev/null +++ b/src/bin/runner_osis/main.rs @@ -0,0 +1,79 @@ +use actor_system::spawn_sync_runner; +use clap::Parser; +use log::{error, info}; +use tokio::sync::mpsc; + +mod engine; +use engine::create_osis_engine; + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Runner ID + runner_id: String, + + /// Database path + #[arg(short, long, default_value = "/tmp/osis.db")] + db_path: String, + + /// Redis URL + #[arg(short = 'r', long, default_value = "redis://localhost:6379")] + redis_url: String, + + /// Preserve tasks after completion + #[arg(short, long, default_value_t = false)] + preserve_tasks: bool, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialize logging + env_logger::init(); + + let args = Args::parse(); + + info!("Starting OSIS Sync Runner with ID: {}", args.runner_id); + info!("Database path: {}", args.db_path); + info!("Redis URL: {}", args.redis_url); + info!("Preserve tasks: {}", args.preserve_tasks); + + // Create shutdown channel + let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); + + // Setup signal handling for graceful shutdown + let shutdown_tx_clone = shutdown_tx.clone(); + tokio::spawn(async move { + tokio::signal::ctrl_c().await.expect("Failed to listen for ctrl+c"); + info!("Received Ctrl+C, initiating shutdown..."); + let _ = shutdown_tx_clone.send(()).await; + }); + + // Spawn the sync runner with engine factory + let runner_handle = spawn_sync_runner( + args.runner_id.clone(), + args.db_path, + args.redis_url, + shutdown_rx, + args.preserve_tasks, + create_osis_engine, + ); + + info!("OSIS Sync Runner '{}' started successfully", args.runner_id); + + // Wait for the runner to complete + match runner_handle.await { + Ok(Ok(())) => { + info!("OSIS Sync Runner '{}' shut down successfully", args.runner_id); + } + Ok(Err(e)) => { + error!("OSIS Sync Runner '{}' encountered an error: {}", args.runner_id, e); + return Err(e); + } + Err(e) => { + error!("Failed to join OSIS Sync Runner '{}' task: {}", args.runner_id, e); + return Err(Box::new(e)); + } + } + + Ok(()) +} diff --git a/src/bin/runner_sal/README.md b/src/bin/runner_sal/README.md new file mode 100644 index 0000000..3d0a694 --- /dev/null +++ b/src/bin/runner_sal/README.md @@ -0,0 +1,87 @@ +# SAL Runner + +The SAL (System Abstraction Layer) Runner is an asynchronous job processing engine that executes Rhai scripts with access to system-level operations and infrastructure management capabilities. + +## Features + +- **Asynchronous Processing**: Handles multiple jobs concurrently with configurable timeouts +- **Redis Integration**: Uses Redis for job queue management and coordination +- **System Operations**: Full access to SAL modules including OS, networking, containers, and cloud services +- **Graceful Shutdown**: Responds to SIGINT (Ctrl+C) for clean termination +- **Comprehensive Logging**: Detailed logging for monitoring and debugging + +## Usage + +```bash +cargo run --bin runner_sal -- [OPTIONS] +``` + +### Arguments + +- ``: Unique identifier for this runner instance (required, positional) + +### Options + +- `-d, --db-path `: Database file path (default: `/tmp/sal.db`) +- `-r, --redis-url `: Redis connection URL (default: `redis://localhost:6379`) +- `-t, --timeout `: Default job timeout in seconds (default: `300`) + +### Examples + +```bash +# Basic usage with default settings +cargo run --bin runner_sal -- myrunner + +# Custom Redis URL and database path +cargo run --bin runner_sal -- production-runner -r redis://prod-redis:6379 -d /var/lib/sal.db + +# Custom timeout for long-running jobs +cargo run --bin runner_sal -- batch-runner -t 3600 +``` + +## Available SAL Modules + +The SAL runner provides access to the following system modules through Rhai scripts: + +- **OS Operations**: File system, process management, system information +- **Redis Client**: Redis database operations and caching +- **PostgreSQL Client**: Database connectivity and queries +- **Process Management**: System process control and monitoring +- **Virtualization**: Container and VM management +- **Git Operations**: Version control system integration +- **Zinit Client**: Service management and initialization +- **Mycelium**: Networking and mesh connectivity +- **Text Processing**: String manipulation and text utilities +- **Network Operations**: HTTP requests, network utilities +- **Kubernetes**: Container orchestration and cluster management +- **Hetzner Cloud**: Cloud infrastructure management + +## Architecture + +The SAL runner uses an asynchronous architecture that: + +1. Connects to Redis for job queue management +2. Creates a Rhai engine with all SAL modules registered +3. Processes jobs concurrently with configurable timeouts +4. Handles graceful shutdown on SIGINT +5. Provides comprehensive error handling and logging + +## Error Handling + +The runner provides detailed error messages for common issues: + +- Redis connection failures +- Database access problems +- Script execution errors +- Timeout handling +- Resource cleanup on shutdown + +## Logging + +Set the `RUST_LOG` environment variable to control logging levels: + +```bash +RUST_LOG=debug cargo run --bin runner_sal -- myrunner +``` + +Available log levels: `error`, `warn`, `info`, `debug`, `trace` diff --git a/src/engine.rs b/src/bin/runner_sal/engine.rs similarity index 88% rename from src/engine.rs rename to src/bin/runner_sal/engine.rs index 127bc74..5e41ac7 100644 --- a/src/engine.rs +++ b/src/bin/runner_sal/engine.rs @@ -71,7 +71,7 @@ pub use sal_kubernetes::KubernetesManager; pub use sal_os::rhai::copy as os_copy; pub use sal_hetzner::rhai::register_hetzner_module; -/// Engine factory for creating and sharing Rhai engines. +/// Engine factory for creating and sharing Rhai engines with SAL modules. pub struct EngineFactory { engine: Arc, } @@ -82,7 +82,7 @@ impl EngineFactory { let mut engine = Engine::new(); register_sal_modules(&mut engine); // Logger - hero_logger::rhai_integration::configure_rhai_logging(&mut engine, "osis_actor"); + hero_logger::rhai_integration::configure_rhai_logging(&mut engine, "sal_runner"); Self { engine: Arc::new(engine), @@ -118,15 +118,15 @@ pub fn register_sal_modules(engine: &mut Engine) { println!("SAL modules registered successfully."); } -/// Create a shared heromodels engine using the factory. -pub fn create_system_engine() -> Arc { - EngineFactory::global().get_engine() +/// Create a new SAL engine instance. +pub fn create_sal_engine() -> Engine { + let mut engine = Engine::new(); + register_sal_modules(&mut engine); + hero_logger::rhai_integration::configure_rhai_logging(&mut engine, "sal_runner"); + engine } -/// Evaluate a Rhai script string. -pub fn eval_script( - engine: &Engine, - script: &str, -) -> Result> { - engine.eval(script) -} \ No newline at end of file +/// Create a shared system engine using the factory. +pub fn create_shared_sal_engine() -> Arc { + EngineFactory::global().get_engine() +} diff --git a/src/bin/runner_sal/main.rs b/src/bin/runner_sal/main.rs new file mode 100644 index 0000000..3f34ee5 --- /dev/null +++ b/src/bin/runner_sal/main.rs @@ -0,0 +1,81 @@ +use actor_system::{spawn_async_runner, AsyncRunner}; +use clap::Parser; +use log::{error, info}; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::mpsc; + +mod engine; +use engine::create_sal_engine; + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Runner ID + runner_id: String, + + /// Database path + #[arg(short, long, default_value = "/tmp/sal.db")] + db_path: String, + + /// Redis URL + #[arg(short = 'r', long, default_value = "redis://localhost:6379")] + redis_url: String, + + /// Default timeout for jobs in seconds + #[arg(short, long, default_value_t = 300)] + timeout: u64, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialize logging + env_logger::init(); + + let args = Args::parse(); + + info!("Starting SAL Async Runner with ID: {}", args.runner_id); + info!("Database path: {}", args.db_path); + info!("Redis URL: {}", args.redis_url); + info!("Default timeout: {} seconds", args.timeout); + + // Create shutdown channel + let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); + + // Setup signal handling for graceful shutdown + let shutdown_tx_clone = shutdown_tx.clone(); + tokio::spawn(async move { + tokio::signal::ctrl_c().await.expect("Failed to listen for ctrl+c"); + info!("Received Ctrl+C, initiating shutdown..."); + let _ = shutdown_tx_clone.send(()).await; + }); + + // Spawn the async runner with engine factory + let runner_handle = spawn_async_runner( + args.runner_id.clone(), + args.db_path, + args.redis_url, + shutdown_rx, + Duration::from_secs(args.timeout), + create_sal_engine, + ); + + info!("SAL Async Runner '{}' started successfully", args.runner_id); + + // Wait for the runner to complete + match runner_handle.await { + Ok(Ok(())) => { + info!("SAL Async Runner '{}' shut down successfully", args.runner_id); + } + Ok(Err(e)) => { + error!("SAL Async Runner '{}' encountered an error: {}", args.runner_id, e); + return Err(e); + } + Err(e) => { + error!("Failed to join SAL Async Runner '{}' task: {}", args.runner_id, e); + return Err(Box::new(e)); + } + } + + Ok(()) +} diff --git a/src/engine/mod.rs b/src/engine/mod.rs new file mode 100644 index 0000000..3e6fee3 --- /dev/null +++ b/src/engine/mod.rs @@ -0,0 +1,19 @@ +/// Engine module for Rhai script execution +/// +/// This module provides two different engine configurations: +/// - `system`: SAL modules for system operations (async worker) +/// - `osis`: DSL modules for business operations (sync worker) + +pub mod system; +pub mod osis; + +// Re-export common Rhai types for convenience +pub use rhai::{Array, Dynamic, Engine, EvalAltResult, Map}; + +/// Evaluate a Rhai script string with any engine +pub fn eval_script( + engine: &Engine, + script: &str, +) -> Result> { + engine.eval(script) +} diff --git a/src/engine/osis.rs b/src/engine/osis.rs new file mode 100644 index 0000000..d407075 --- /dev/null +++ b/src/engine/osis.rs @@ -0,0 +1,143 @@ +// use heromodels::models::heroledger::rhai::register_heroledger_rhai_modules; +use rhai::Engine; +use rhailib_dsl; +use std::sync::{Arc, OnceLock}; + +/// Engine factory for creating and sharing Rhai engines with DSL modules. +pub struct EngineFactory { + engine: Arc, +} + +impl EngineFactory { + /// Create a new engine factory with a configured Rhai engine. + pub fn new() -> Self { + let mut engine = Engine::new(); + register_dsl_modules(&mut engine); + // Logger + hero_logger::rhai_integration::configure_rhai_logging(&mut engine, "osis_actor"); + + Self { + engine: Arc::new(engine), + } + } + + /// Get a shared reference to the engine. + pub fn get_engine(&self) -> Arc { + Arc::clone(&self.engine) + } + + /// Get the global singleton engine factory. + pub fn global() -> &'static EngineFactory { + static FACTORY: OnceLock = OnceLock::new(); + FACTORY.get_or_init(|| EngineFactory::new()) + } +} + +/// Register basic object functions directly in the engine. +/// This provides object functionality without relying on the problematic rhailib_dsl object module. +fn register_object_functions(engine: &mut Engine) { + use heromodels::models::object::Object; + + // Register the Object type + engine.register_type_with_name::("Object"); + + // Register constructor function + engine.register_fn("new_object", || Object::new()); + + // Register setter functions + engine.register_fn("object_title", |obj: &mut Object, title: String| { + obj.title = title; + obj.clone() + }); + + engine.register_fn( + "object_description", + |obj: &mut Object, description: String| { + obj.description = description; + obj.clone() + }, + ); + + // Register getter functions + engine.register_fn("get_object_id", |obj: &mut Object| obj.id() as i64); + engine.register_fn("get_object_title", |obj: &mut Object| obj.title.clone()); + engine.register_fn("get_object_description", |obj: &mut Object| { + obj.description.clone() + }); +} + +/// Registers all DSL modules with the provided Rhai engine. +/// +/// This function is the main entry point for integrating the rhailib DSL with a Rhai engine. +/// It registers all business domain modules, making their functions available to Rhai scripts. +/// +/// # Arguments +/// +/// * `engine` - A mutable reference to the Rhai engine to register modules with +/// +/// # Example +/// +/// ```rust +/// use rhai::Engine; +/// use actor_system::engine::osis::register_dsl_modules; +/// +/// let mut engine = Engine::new(); +/// register_dsl_modules(&mut engine); +/// +/// // Engine now has access to all DSL functions +/// let result = engine.eval::(r#" +/// let company = new_company().name("Test Corp"); +/// company.name +/// "#).unwrap(); +/// assert_eq!(result, "Test Corp"); +/// ``` +/// +/// # Registered Modules +/// +/// This function registers the following domain modules: +/// - Access control functions +/// - Business operation functions (companies, products, sales, shareholders) +/// - Calendar and scheduling functions +/// - Circle and community management functions +/// - Company management functions +/// - Contact management functions +/// - Core utility functions +/// - Financial operation functions (accounts, assets, marketplace) +/// - Workflow management functions (flows, steps, signatures) +/// - Library and content management functions +/// - Generic object manipulation functions (custom implementation) +pub fn register_dsl_modules(engine: &mut Engine) { + rhailib_dsl::access::register_access_rhai_module(engine); + rhailib_dsl::biz::register_biz_rhai_module(engine); + rhailib_dsl::calendar::register_calendar_rhai_module(engine); + rhailib_dsl::circle::register_circle_rhai_module(engine); + rhailib_dsl::company::register_company_rhai_module(engine); + rhailib_dsl::contact::register_contact_rhai_module(engine); + rhailib_dsl::core::register_core_rhai_module(engine); + rhailib_dsl::finance::register_finance_rhai_modules(engine); + // rhailib_dsl::flow::register_flow_rhai_modules(engine); + rhailib_dsl::library::register_library_rhai_module(engine); + // Skip problematic object module for now - can be implemented separately if needed + // rhailib_dsl::object::register_object_fns(engine); + rhailib_dsl::payment::register_payment_rhai_module(engine); + + // Register basic object functionality directly + register_object_functions(engine); + // heromodels::heroledger::rhai::register_heroledger_rhai_modules(&mut engine); + + + println!("Rhailib Domain Specific Language modules registered successfully."); +} + +/// Create a new osis engine instance. +pub fn create_osis_engine() -> Engine { + let mut engine = Engine::new(); + register_dsl_modules(&mut engine); + hero_logger::rhai_integration::configure_rhai_logging(&mut engine, "osis_actor"); + engine +} + +/// Create a shared osis engine using the factory. +pub fn create_shared_osis_engine() -> Arc { + EngineFactory::global().get_engine() +} diff --git a/src/engine/system.rs b/src/engine/system.rs new file mode 100644 index 0000000..9e163a0 --- /dev/null +++ b/src/engine/system.rs @@ -0,0 +1,124 @@ +use std::sync::{Arc, OnceLock}; +// Re-export common Rhai types for convenience +pub use rhai::{Array, Dynamic, Engine, EvalAltResult, Map}; + +// Re-export specific functions from sal-os package +pub use sal_os::rhai::{ + delete, + // Download functions + download, + download_install, + // File system functions + exist, + file_size, + find_dir, + find_dirs, + find_file, + find_files, + mkdir, + register_os_module, + rsync, +}; + +// Re-export Redis client module registration function +pub use sal_redisclient::rhai::register_redisclient_module; + +// Re-export PostgreSQL client module registration function +pub use sal_postgresclient::rhai::register_postgresclient_module; + +pub use sal_process::rhai::{ + kill, + process_get, + process_list, + register_process_module, + // Run functions + // Process management functions + which, +}; + +// Re-export virt functions from sal-virt package +pub use sal_virt::rhai::nerdctl::{ + nerdctl_copy, + nerdctl_exec, + nerdctl_image_build, + nerdctl_image_commit, + nerdctl_image_pull, + nerdctl_image_push, + nerdctl_image_remove, + nerdctl_image_tag, + // Image functions + nerdctl_images, + nerdctl_list, + nerdctl_remove, + // Container functions + nerdctl_run, + nerdctl_run_with_name, + nerdctl_run_with_port, + nerdctl_stop, +}; +pub use sal_virt::rhai::{ + bah_new, register_bah_module, register_nerdctl_module, register_rfs_module, +}; + +pub use sal_git::rhai::register_git_module; +pub use sal_git::{GitRepo, GitTree}; +pub use sal_zinit_client::rhai::register_zinit_module; +pub use sal_mycelium::rhai::register_mycelium_module; +pub use sal_text::rhai::register_text_module; +pub use sal_net::rhai::register_net_module; +pub use sal_kubernetes::rhai::register_kubernetes_module; +pub use sal_kubernetes::KubernetesManager; +pub use sal_os::rhai::copy as os_copy; +pub use sal_hetzner::rhai::register_hetzner_module; + +/// Engine factory for creating and sharing Rhai engines with SAL modules. +pub struct EngineFactory { + engine: Arc, +} + +impl EngineFactory { + /// Create a new engine factory with a configured Rhai engine. + pub fn new() -> Self { + let mut engine = Engine::new(); + register_sal_modules(&mut engine); + // Logger + hero_logger::rhai_integration::configure_rhai_logging(&mut engine, "system_actor"); + + Self { + engine: Arc::new(engine), + } + } + + /// Get a shared reference to the engine. + pub fn get_engine(&self) -> Arc { + Arc::clone(&self.engine) + } + + /// Get the global singleton engine factory. + pub fn global() -> &'static EngineFactory { + static FACTORY: OnceLock = OnceLock::new(); + FACTORY.get_or_init(|| EngineFactory::new()) + } +} + +pub fn register_sal_modules(engine: &mut Engine) { + let _ = sal_os::rhai::register_os_module(engine); + let _ = sal_redisclient::rhai::register_redisclient_module(engine); + let _ = sal_postgresclient::rhai::register_postgresclient_module(engine); + let _ = sal_process::rhai::register_process_module(engine); + let _ = sal_virt::rhai::register_virt_module(engine); + let _ = sal_git::rhai::register_git_module(engine); + let _ = sal_zinit_client::rhai::register_zinit_module(engine); + let _ = sal_mycelium::rhai::register_mycelium_module(engine); + let _ = sal_text::rhai::register_text_module(engine); + let _ = sal_net::rhai::register_net_module(engine); + let _ = sal_kubernetes::rhai::register_kubernetes_module(engine); + let _ = sal_hetzner::rhai::register_hetzner_module(engine); + + println!("SAL modules registered successfully."); +} + +/// Create a shared system engine using the factory. +pub fn create_system_engine() -> Arc { + EngineFactory::global().get_engine() +} diff --git a/src/lib.rs b/src/lib.rs index a8de2ba..823100d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,340 +1,82 @@ -mod engine; +// Core modules +pub mod runner_trait; +pub mod async_runner; +pub mod sync_runner; +pub mod engine; -// Public exports -pub use engine::register_sal_modules; -pub use engine::create_system_engine; +// Public exports for convenience +pub use runner_trait::{Runner, RunnerConfig, spawn_runner}; +pub use async_runner::{AsyncRunner, spawn_async_runner}; +pub use sync_runner::{SyncRunner, spawn_sync_runner}; +pub use engine::system::{register_sal_modules, create_system_engine}; +pub use engine::osis::{register_dsl_modules, create_osis_engine, create_shared_osis_engine}; -use async_trait::async_trait; -use hero_job::{Job, JobStatus}; -use log::{debug, error, info, warn}; -use rhai::Engine; -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Duration; -use tokio::sync::{mpsc, Mutex}; -use tokio::task::JoinHandle; -use tokio::time::timeout; -use baobab_actor::{actor_trait::Actor, spawn_actor, initialize_redis_connection}; +// Re-export job types from hero-job crate +pub use hero_job::{Job, JobStatus, JobError, JobBuilder}; +pub use redis::AsyncCommands; +use log::{debug, error, info}; -/// Represents a running job with its handle and metadata -#[derive(Debug)] -struct RunningJob { - job_id: String, - handle: JoinHandle<()>, - started_at: std::time::Instant, +const NAMESPACE_PREFIX: &str = "hero:job:"; +const BLPOP_TIMEOUT_SECONDS: usize = 5; + +/// Initialize Redis connection for the runner +pub async fn initialize_redis_connection( + runner_id: &str, + redis_url: &str, +) -> Result> { + let redis_client = redis::Client::open(redis_url) + .map_err(|e| { + error!("Runner for Runner ID '{}': Failed to open Redis client: {}", runner_id, e); + e + })?; + + let redis_conn = redis_client.get_multiplexed_async_connection().await + .map_err(|e| { + error!("Runner for Runner ID '{}': Failed to get Redis connection: {}", runner_id, e); + e + })?; + + info!("Runner for Runner ID '{}' successfully connected to Redis.", runner_id); + Ok(redis_conn) } -/// Builder for AsyncWorker -#[derive(Debug, Default)] -pub struct AsyncWorkerBuilder { - actor_id: Option, - db_path: Option, - redis_url: Option, - default_timeout: Option, -} - -impl AsyncWorkerBuilder { - pub fn new() -> Self { - Self::default() - } - - pub fn actor_id>(mut self, actor_id: S) -> Self { - self.actor_id = Some(actor_id.into()); - self - } - - pub fn db_path>(mut self, db_path: S) -> Self { - self.db_path = Some(db_path.into()); - self - } - - pub fn redis_url>(mut self, redis_url: S) -> Self { - self.redis_url = Some(redis_url.into()); - self - } - - pub fn default_timeout(mut self, timeout: Duration) -> Self { - self.default_timeout = Some(timeout); - self - } - - pub fn build(self) -> Result { - Ok(AsyncWorker { - actor_id: self.actor_id.ok_or("actor_id is required")?, - db_path: self.db_path.ok_or("db_path is required")?, - redis_url: self.redis_url.ok_or("redis_url is required")?, - default_timeout: self.default_timeout.unwrap_or(Duration::from_secs(300)), - running_jobs: Arc::new(Mutex::new(HashMap::new())), - }) - } -} - -/// Asynchronous actor that processes jobs concurrently -#[derive(Debug, Clone)] -pub struct AsyncWorker { - pub actor_id: String, - pub db_path: String, - pub redis_url: String, - pub default_timeout: Duration, - running_jobs: Arc>>, -} - -impl AsyncWorker { - /// Create a new AsyncWorkerBuilder - pub fn builder() -> AsyncWorkerBuilder { - AsyncWorkerBuilder::new() - } - - /// Add a running job to the tracking map - async fn add_running_job(&self, job_id: String, handle: JoinHandle<()>) { - let running_job = RunningJob { - job_id: job_id.clone(), - handle, - started_at: std::time::Instant::now(), - }; - - let mut jobs = self.running_jobs.lock().await; - jobs.insert(job_id.clone(), running_job); - debug!("Async Worker: Added running job '{}'. Total running: {}", - job_id, jobs.len()); - } - - /// Remove a completed job from the tracking map - async fn remove_running_job(&self, job_id: &str) { - let mut jobs = self.running_jobs.lock().await; - if let Some(job) = jobs.remove(job_id) { - let duration = job.started_at.elapsed(); - debug!("Async Worker: Removed completed job '{}' after {:?}. Remaining: {}", - job_id, duration, jobs.len()); - } - } - - /// Get the count of currently running jobs - pub async fn running_job_count(&self) -> usize { - let jobs = self.running_jobs.lock().await; - jobs.len() - } - - /// Cleanup any finished jobs from the running jobs map - async fn cleanup_finished_jobs(&self) { - let mut jobs = self.running_jobs.lock().await; - let mut to_remove = Vec::new(); - - for (job_id, running_job) in jobs.iter() { - if running_job.handle.is_finished() { - to_remove.push(job_id.clone()); - } - } - - for job_id in to_remove { - if let Some(job) = jobs.remove(&job_id) { - let duration = job.started_at.elapsed(); - debug!("Async Worker: Cleaned up finished job '{}' after {:?}", - job_id, duration); - } - } - } - - /// Execute a single job asynchronously with timeout support - async fn execute_job_with_timeout( - job: Job, - engine: Engine, - actor_id: String, - redis_url: String, - job_timeout: Duration, - ) { - let job_id = job.id.clone(); - info!("Async Worker '{}', Job {}: Starting execution with timeout {:?}", - actor_id, job_id, job_timeout); - - // Create a new Redis connection for this job - let mut redis_conn = match initialize_redis_connection(&actor_id, &redis_url).await { - Ok(conn) => conn, - Err(e) => { - error!("Async Worker '{}', Job {}: Failed to initialize Redis connection: {}", - actor_id, job_id, e); - return; - } - }; - - // Update job status to Started - if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Started).await { - error!("Async Worker '{}', Job {}: Failed to update status to Started: {}", - actor_id, job_id, e); - return; - } - - // Create the script execution task - let script_task = async { - // Execute the Rhai script - match engine.eval::(&job.script) { - Ok(result) => { - let result_str = format!("{:?}", result); - info!("Async Worker '{}', Job {}: Script executed successfully. Result: {}", - actor_id, job_id, result_str); - - // Update job with success result - if let Err(e) = Job::set_result(&mut redis_conn, &job_id, &result_str).await { - error!("Async Worker '{}', Job {}: Failed to set result: {}", - actor_id, job_id, e); - return; - } - - if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Finished).await { - error!("Async Worker '{}', Job {}: Failed to update status to Finished: {}", - actor_id, job_id, e); - } - } - Err(e) => { - let error_msg = format!("Script execution error: {}", e); - error!("Async Worker '{}', Job {}: {}", actor_id, job_id, error_msg); - - // Update job with error - if let Err(e) = Job::set_error(&mut redis_conn, &job_id, &error_msg).await { - error!("Async Worker '{}', Job {}: Failed to set error: {}", - actor_id, job_id, e); - return; - } - - if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Error).await { - error!("Async Worker '{}', Job {}: Failed to update status to Error: {}", - actor_id, job_id, e); - } - } - } - }; - - // Execute the script with timeout - match timeout(job_timeout, script_task).await { - Ok(()) => { - info!("Async Worker '{}', Job {}: Completed within timeout", actor_id, job_id); - } - Err(_) => { - warn!("Async Worker '{}', Job {}: Timed out after {:?}, marking as error", - actor_id, job_id, job_timeout); - - let timeout_msg = format!("Job timed out after {:?}", job_timeout); - if let Err(e) = Job::set_error(&mut redis_conn, &job_id, &timeout_msg).await { - error!("Async Worker '{}', Job {}: Failed to set timeout error: {}", - actor_id, job_id, e); - } - - if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Error).await { - error!("Async Worker '{}', Job {}: Failed to update status to Error after timeout: {}", - actor_id, job_id, e); - } - } - } - - info!("Async Worker '{}', Job {}: Job processing completed", actor_id, job_id); - } -} - -impl Default for AsyncWorker { - fn default() -> Self { - // Default AsyncWorker with placeholder values - // In practice, use the builder pattern instead - Self { - actor_id: "default_async_actor".to_string(), - db_path: "/tmp".to_string(), - redis_url: "redis://localhost:6379".to_string(), - default_timeout: Duration::from_secs(300), - running_jobs: Arc::new(Mutex::new(HashMap::new())), - } - } -} - -#[async_trait] -impl Actor for AsyncWorker { - async fn process_job(&self, job: hero_job::Job, _redis_conn: &mut redis::aio::MultiplexedConnection) { - let job_id = job.id.clone(); - let actor_id = &self.actor_id.clone(); - - // Determine timeout (use job-specific timeout if available, otherwise default) - let job_timeout = if job.timeout.as_secs() > 0 { - job.timeout - } else { - self.default_timeout // Use actor's default timeout - }; - - info!("Async Worker '{}', Job {}: Spawning job execution task with timeout {:?}", - actor_id, job_id, job_timeout); - - // Clone necessary data for the spawned task - let job_id_clone = job_id.clone(); - let actor_id_clone = actor_id.clone(); - let actor_id_debug = actor_id.clone(); // Additional clone for debug statement - let job_id_debug = job_id.clone(); // Additional clone for debug statement - let redis_url_clone = self.redis_url.clone(); - let running_jobs_clone = Arc::clone(&self.running_jobs); - - // Spawn the job execution task - let job_handle = tokio::spawn(async move { - // Create engine for this job - we need to get it from somewhere - // For now, let's assume we need to create a new engine instance - let mut engine = rhai::Engine::new(); - engine::register_sal_modules(&mut engine); - - Self::execute_job_with_timeout( - job, - engine, - actor_id_clone, - redis_url_clone, - job_timeout, - ).await; - - // Remove this job from the running jobs map when it completes - let mut jobs = running_jobs_clone.lock().await; - if let Some(running_job) = jobs.remove(&job_id_clone) { - let duration = running_job.started_at.elapsed(); - debug!("Async Worker '{}': Removed completed job '{}' after {:?}", - actor_id_debug, job_id_debug, duration); - } - }); - - // Add the job to the running jobs map - self.add_running_job(job_id, job_handle).await; - - // Cleanup finished jobs periodically - self.cleanup_finished_jobs().await; - } - - fn actor_type(&self) -> &'static str { - "Async" +/// Load job from Redis using the supervisor's Job API +pub async fn load_job_from_redis( + redis_conn: &mut redis::aio::MultiplexedConnection, + job_id: &str, + runner_id: &str, +) -> Result { + debug!("Runner '{}', Job {}: Loading job from Redis", runner_id, job_id); + + // Load job data from Redis hash + let job_key = format!("hero:job:{}", job_id); + let job_data: std::collections::HashMap = redis_conn.hgetall(&job_key).await + .map_err(JobError::Redis)?; + + if job_data.is_empty() { + return Err(JobError::NotFound(job_id.to_string())); } - fn actor_id(&self) -> &str { - &self.actor_id - } + // Parse job from hash data using the supervisor's Job struct + let job = Job { + id: job_id.to_string(), + caller_id: job_data.get("caller_id").unwrap_or(&"".to_string()).clone(), + context_id: job_data.get("context_id").unwrap_or(&"".to_string()).clone(), + payload: job_data.get("payload").unwrap_or(&"".to_string()).clone(), + runner: job_data.get("runner").unwrap_or(&"default".to_string()).clone(), + executor: job_data.get("executor").unwrap_or(&"rhai".to_string()).clone(), + timeout: job_data.get("timeout").and_then(|s| s.parse().ok()).unwrap_or(300), + env_vars: serde_json::from_str(job_data.get("env_vars").unwrap_or(&"{}".to_string())) + .map_err(JobError::Serialization)?, + created_at: job_data.get("created_at") + .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) + .map(|dt| dt.with_timezone(&chrono::Utc)) + .unwrap_or_else(chrono::Utc::now), + updated_at: job_data.get("updated_at") + .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) + .map(|dt| dt.with_timezone(&chrono::Utc)) + .unwrap_or_else(chrono::Utc::now), + }; - fn redis_url(&self) -> &str { - &self.redis_url - } -} - - -/// Convenience function to spawn an asynchronous actor using the trait interface -/// -/// This function provides a clean interface for the new async actor implementation -/// with timeout support. -pub fn spawn_async_actor( - actor_id: String, - db_path: String, - engine: rhai::Engine, - redis_url: String, - shutdown_rx: mpsc::Receiver<()>, - default_timeout: std::time::Duration, -) -> JoinHandle>> { - use std::sync::Arc; - - let actor = Arc::new( - AsyncWorker::builder() - .actor_id(actor_id) - .db_path(db_path) - .redis_url(redis_url) - .default_timeout(default_timeout) - .build() - .expect("Failed to build AsyncActor") - ); - spawn_actor(actor, shutdown_rx) + Ok(job) } \ No newline at end of file diff --git a/src/runner_trait.rs b/src/runner_trait.rs new file mode 100644 index 0000000..f0be836 --- /dev/null +++ b/src/runner_trait.rs @@ -0,0 +1,267 @@ +//! # Runner Trait Abstraction +//! +//! This module provides a trait-based abstraction for Rhai runners that eliminates +//! code duplication between synchronous and asynchronous runner implementations. +//! +//! The `Runner` trait defines the common interface and behavior, while specific +//! implementations handle job processing differently (sync vs async). +//! +//! ## Architecture +//! +//! ```text +//! โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” +//! โ”‚ SyncRunner โ”‚ โ”‚ AsyncRunner โ”‚ +//! โ”‚ โ”‚ โ”‚ โ”‚ +//! โ”‚ process_job() โ”‚ โ”‚ process_job() โ”‚ +//! โ”‚ (sequential) โ”‚ โ”‚ (concurrent) โ”‚ +//! โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ +//! โ”‚ โ”‚ +//! โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ +//! โ”‚ +//! โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” +//! โ”‚ Runner Trait โ”‚ +//! โ”‚ โ”‚ +//! โ”‚ spawn() โ”‚ +//! โ”‚ config โ”‚ +//! โ”‚ common loop โ”‚ +//! โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ +//! ``` + +use hero_job::{Job, JobStatus}; +use log::{debug, error, info}; +use redis::AsyncCommands; + +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; + +use crate::{initialize_redis_connection, BLPOP_TIMEOUT_SECONDS}; + +/// Configuration for runner instances +#[derive(Debug, Clone)] +pub struct RunnerConfig { + pub runner_id: String, + pub db_path: String, + pub redis_url: String, + pub default_timeout: Option, // Only used by async runners +} + +impl RunnerConfig { + /// Create a new runner configuration + pub fn new( + runner_id: String, + db_path: String, + redis_url: String, + ) -> Self { + Self { + runner_id, + db_path, + redis_url, + default_timeout: None, + } + } + + /// Set default timeout for async runners + pub fn with_default_timeout(mut self, timeout: Duration) -> Self { + self.default_timeout = Some(timeout); + self + } +} + +/// Trait defining the common interface for Rhai runners +/// +/// This trait abstracts the common functionality between synchronous and +/// asynchronous runners, allowing them to share the same spawn logic and +/// Redis polling loop while implementing different job processing strategies. +pub trait Runner: Send + Sync + 'static { + /// Process a single job + /// + /// This is the core method that differentiates runner implementations: + /// - Sync runners process jobs sequentially, one at a time + /// - Async runners spawn concurrent tasks for each job + /// + /// # Arguments + /// + /// * `job` - The job to process + /// + /// Note: The engine is now owned by the runner implementation as a field + /// For sync runners, this should be a blocking operation + /// For async runners, this can spawn tasks and return immediately + fn process_job(&self, job: Job) -> Result>; + + /// Get the runner type name for logging + fn runner_type(&self) -> &'static str; + + /// Get runner ID for this runner instance + fn runner_id(&self) -> &str; + + /// Get Redis URL for this runner instance + fn redis_url(&self) -> &str; + + /// Spawn the runner + /// + /// This method provides the common runner loop implementation that both + /// sync and async runners can use. It handles: + /// - Redis connection setup + /// - Job polling from Redis queue + /// - Shutdown signal handling + /// - Delegating job processing to the implementation + /// + /// Note: The engine is now owned by the runner implementation as a field + fn spawn( + self: Arc, + mut shutdown_rx: mpsc::Receiver<()>, + ) -> JoinHandle>> { + tokio::spawn(async move { + let runner_id = self.runner_id(); + let redis_url = self.redis_url(); + // Canonical work queue based on runner_id + let queue_key = format!("runner_queue:{}", runner_id); + info!( + "{} Runner '{}' starting. Connecting to Redis at {}. Listening on queue: {}", + self.runner_type(), + runner_id, + redis_url, + queue_key + ); + + let mut redis_conn = initialize_redis_connection(runner_id, redis_url).await?; + + loop { + let blpop_keys = vec![queue_key.clone()]; + tokio::select! { + // Listen for shutdown signal + _ = shutdown_rx.recv() => { + info!("{} Runner '{}': Shutdown signal received. Terminating loop.", + self.runner_type(), runner_id); + break; + } + // Listen for tasks from Redis + blpop_result = redis_conn.blpop(&blpop_keys, BLPOP_TIMEOUT_SECONDS as f64) => { + debug!("{} Runner '{}': Attempting BLPOP on queue: {}", + self.runner_type(), runner_id, queue_key); + + let response: Option<(String, String)> = match blpop_result { + Ok(resp) => resp, + Err(e) => { + error!("{} Runner '{}': Redis BLPOP error on queue {}: {}. Runner for this circle might stop.", + self.runner_type(), runner_id, queue_key, e); + return Err(Box::new(e) as Box); + } + }; + + if let Some((_queue_name_recv, job_id)) = response { + info!("{} Runner '{}' received job_id: {} from queue: {}", + self.runner_type(), runner_id, job_id, _queue_name_recv); + + // Load the job from Redis + match crate::load_job_from_redis(&mut redis_conn, &job_id, runner_id).await { + Ok(job) => { + // Check for ping job and handle it directly + if job.payload.trim() == "ping" { + info!("{} Runner '{}': Received ping job '{}', responding with pong", + self.runner_type(), runner_id, job_id); + + // Update job status to started + if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Started).await { + error!("{} Runner '{}': Failed to update ping job '{}' status to Started: {}", + self.runner_type(), runner_id, job_id, e); + } + + // Set result to "pong" and mark as finished + if let Err(e) = Job::set_result(&mut redis_conn, &job_id, "pong").await { + error!("{} Runner '{}': Failed to set ping job '{}' result: {}", + self.runner_type(), runner_id, job_id, e); + } + + if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Finished).await { + error!("{} Runner '{}': Failed to update ping job '{}' status to Finished: {}", + self.runner_type(), runner_id, job_id, e); + } + + info!("{} Runner '{}': Successfully responded to ping job '{}' with pong", + self.runner_type(), runner_id, job_id); + } else { + // Update job status to started + if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Started).await { + error!("{} Runner '{}': Failed to update job '{}' status to Started: {}", + self.runner_type(), runner_id, job_id, e); + } + + // Delegate job processing to the implementation + match self.process_job(job) { + Ok(result) => { + // Set result and mark as finished + if let Err(e) = Job::set_result(&mut redis_conn, &job_id, &result).await { + error!("{} Runner '{}': Failed to set job '{}' result: {}", + self.runner_type(), runner_id, job_id, e); + } + + if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Finished).await { + error!("{} Runner '{}': Failed to update job '{}' status to Finished: {}", + self.runner_type(), runner_id, job_id, e); + } + } + Err(e) => { + let error_str = format!("{:?}", e); + error!("{} Runner '{}': Job '{}' processing failed: {}", + self.runner_type(), runner_id, job_id, error_str); + + // Set error and mark as error + if let Err(e) = Job::set_error(&mut redis_conn, &job_id, &error_str).await { + error!("{} Runner '{}': Failed to set job '{}' error: {}", + self.runner_type(), runner_id, job_id, e); + } + + if let Err(e) = Job::update_status(&mut redis_conn, &job_id, JobStatus::Error).await { + error!("{} Runner '{}': Failed to update job '{}' status to Error: {}", + self.runner_type(), runner_id, job_id, e); + } + } + } + } + } + Err(e) => { + error!("{} Runner '{}': Failed to load job '{}': {}", + self.runner_type(), runner_id, job_id, e); + } + } + } else { + debug!("{} Runner '{}': BLPOP timed out on queue {}. No new tasks.", + self.runner_type(), runner_id, queue_key); + } + } + } + } + + info!("{} Runner '{}' has shut down.", self.runner_type(), runner_id); + Ok(()) + }) + } +} + +/// Convenience function to spawn a runner with the trait-based interface +/// +/// This function provides a unified interface for spawning any runner implementation +/// that implements the Runner trait. +/// +/// # Arguments +/// +/// * `runner` - The runner implementation to spawn +/// * `shutdown_rx` - Channel receiver for shutdown signals +/// +/// # Returns +/// +/// Returns a `JoinHandle` that can be awaited to wait for runner shutdown. +pub fn spawn_runner( + runner: Arc, + shutdown_rx: mpsc::Receiver<()>, +) -> JoinHandle>> { + runner.spawn(shutdown_rx) +} + +/// Helper to derive queue name from runner_id +fn derive_queue_from_runner_id(runner_id: &str) -> String { + format!("runner_queue:{}", runner_id) +} diff --git a/src/sync_runner.rs b/src/sync_runner.rs new file mode 100644 index 0000000..5c3972d --- /dev/null +++ b/src/sync_runner.rs @@ -0,0 +1,124 @@ +use hero_job::Job; +use log::{debug, error, info}; +use rhai::{Dynamic, Engine}; +use std::sync::Arc; + +use crate::runner_trait::Runner; + +/// Configuration for sync runner instances +#[derive(Debug, Clone)] +pub struct SyncRunnerConfig { + pub runner_id: String, + pub db_path: String, + pub redis_url: String, + pub preserve_tasks: bool, +} + +/// Synchronous runner that processes jobs sequentially +pub struct SyncRunner { + pub config: SyncRunnerConfig, + pub engine_factory: Arc Engine + Send + Sync>, +} + +impl SyncRunner { + /// Create a new SyncRunner with the provided engine factory + pub fn new(config: SyncRunnerConfig, engine_factory: F) -> Self + where + F: Fn() -> Engine + Send + Sync + 'static, + { + Self { + config, + engine_factory: Arc::new(engine_factory), + } + } + + /// Execute a job with the given engine, setting proper job context + /// + /// This function sets up the engine with job context (DB_PATH, CALLER_ID, CONTEXT_ID) + /// and evaluates the script. It returns the result or error. + fn execute_job_with_engine( + engine: &mut Engine, + job: &Job, + db_path: &str, + ) -> Result> { + // Set up job context in the engine + let mut db_config = rhai::Map::new(); + db_config.insert("DB_PATH".into(), db_path.to_string().into()); + db_config.insert("CALLER_ID".into(), job.caller_id.clone().into()); + db_config.insert("CONTEXT_ID".into(), job.context_id.clone().into()); + engine.set_default_tag(Dynamic::from(db_config)); + + debug!("Sync Runner for Context ID '{}': Evaluating script with Rhai engine (job context set).", job.context_id); + + // Execute the script with the configured engine + engine.eval::(&job.payload) + } + + +} + +impl Runner for SyncRunner { + fn process_job(&self, job: Job) -> Result> { + let job_id = &job.id; + let runner_id = &self.config.runner_id; + + debug!("Sync Runner '{}', Job {}: Processing started.", runner_id, job_id); + info!("Sync Runner '{}' processing job_id: {}. Script: {:.50}...", job.context_id, job_id, job.payload); + + // Create a new engine instance for this job execution + let mut engine = (self.engine_factory)(); + + // Execute the script + match Self::execute_job_with_engine(&mut engine, &job, &self.config.db_path) { + Ok(result) => { + let output_str = if result.is::() { + result.into_string().unwrap() + } else { + result.to_string() + }; + info!("Sync Runner for Context ID '{}' job {} completed. Output: {}", job.context_id, job.id, output_str); + Ok(output_str) + } + Err(e) => { + let error_str = format!("{:?}", *e); + error!("Sync Runner for Context ID '{}' job {} script evaluation failed. Error: {}", job.context_id, job.id, error_str); + Err(Box::new(e) as Box) + } + } + } + + fn runner_type(&self) -> &'static str { + "Sync" + } + + fn runner_id(&self) -> &str { + &self.config.runner_id + } + + fn redis_url(&self) -> &str { + &self.config.redis_url + } +} + +/// Convenience function to spawn a synchronous runner using the trait interface +pub fn spawn_sync_runner( + runner_id: String, + db_path: String, + redis_url: String, + shutdown_rx: tokio::sync::mpsc::Receiver<()>, + preserve_tasks: bool, + engine_factory: F, +) -> tokio::task::JoinHandle>> +where + F: Fn() -> Engine + Send + Sync + 'static, +{ + let config = SyncRunnerConfig { + runner_id, + db_path, + redis_url, + preserve_tasks, + }; + + let runner = Arc::new(SyncRunner::new(config, engine_factory)); + crate::runner_trait::spawn_runner(runner, shutdown_rx) +} diff --git a/test_sync_runner.rs b/test_sync_runner.rs new file mode 100644 index 0000000..075d720 --- /dev/null +++ b/test_sync_runner.rs @@ -0,0 +1,47 @@ +use actor_system::spawn_sync_runner; +use actor_system::engine::osis::create_osis_engine; +use tokio::sync::mpsc; +use tokio::time::{sleep, Duration}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialize logging + env_logger::init(); + + // Create shutdown channel + let (shutdown_tx, shutdown_rx) = mpsc::channel(1); + + println!("Starting sync runner test..."); + + // Spawn the sync runner with proper parameters + let runner_handle = spawn_sync_runner( + "test_runner".to_string(), + "/tmp/test_runner.db".to_string(), + "redis://localhost:6379".to_string(), + shutdown_rx, + false, // preserve_tasks + || create_osis_engine(), + ); + + // Let it run for a few seconds + println!("Sync runner started, letting it run for 5 seconds..."); + sleep(Duration::from_secs(5)).await; + + // Send shutdown signal + println!("Sending shutdown signal..."); + let _ = shutdown_tx.send(()).await; + + // Wait for the runner to finish + match runner_handle.await { + Ok(result) => { + match result { + Ok(_) => println!("Sync runner completed successfully!"), + Err(e) => println!("Sync runner error: {}", e), + } + } + Err(e) => println!("Join error: {}", e), + } + + println!("Test completed!"); + Ok(()) +}