diff --git a/Cargo.lock b/Cargo.lock index 134e695..3b49281 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -123,6 +123,28 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "async-trait" version = "0.1.88" @@ -193,12 +215,27 @@ version = "2.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "bumpalo" version = "3.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.10.1" @@ -327,6 +364,16 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "colored" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" +dependencies = [ + "lazy_static", + "windows-sys 0.59.0", +] + [[package]] name = "combine" version = "4.6.7" @@ -377,6 +424,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + [[package]] name = "crc32fast" version = "1.4.2" @@ -453,6 +509,16 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43da5946c66ffcc7745f48db692ffbb10a83bfe0afd96235c5c2a4fb23994929" +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "csv" version = "1.3.1" @@ -474,6 +540,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "data-encoding" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" + [[package]] name = "derive" version = "0.1.0" @@ -482,6 +554,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "dirs" version = "4.0.0" @@ -715,6 +797,16 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "getrandom" version = "0.2.16" @@ -1372,6 +1464,32 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "orchestrator" +version = "0.1.0" +dependencies = [ + "async-trait", + "chrono", + "futures", + "futures-util", + "heromodels", + "heromodels_core", + "log", + "reqwest", + "rhai", + "rhai_dispatcher", + "rhailib_dsl", + "serde", + "serde_json", + "thiserror", + "tokio", + "tokio-test", + "tokio-tungstenite", + "tracing", + "tracing-subscriber", + "uuid", +] + [[package]] name = "ourdb" version = "0.1.0" @@ -1769,6 +1887,7 @@ version = "0.1.0" dependencies = [ "chrono", "clap", + "colored", "env_logger", "log", "redis", @@ -1818,6 +1937,7 @@ dependencies = [ "serde", "serde_json", "tempfile", + "thiserror", "tokio", ] @@ -2006,6 +2126,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha1_smol" version = "1.0.1" @@ -2331,6 +2462,42 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-test" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.15" @@ -2421,6 +2588,31 @@ dependencies = [ "thiserror", ] +[[package]] +name = "tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + +[[package]] +name = "typenum" +version = "1.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f" + [[package]] name = "unicode-ident" version = "1.0.18" @@ -2450,6 +2642,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8_iter" version = "1.0.4" diff --git a/Cargo.toml b/Cargo.toml index 0cef7af..83bd74e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ members = [ "src/engine", "src/worker", "src/monitor", # Added the new monitor package to workspace + "src/orchestrator", # Added the new orchestrator package to workspace "src/macros", "src/dsl", "src/derive", ] resolver = "2" # Recommended for new workspaces diff --git a/src/dispatcher/Cargo.toml b/src/dispatcher/Cargo.toml index 6761a16..dab9a14 100644 --- a/src/dispatcher/Cargo.toml +++ b/src/dispatcher/Cargo.toml @@ -17,6 +17,7 @@ uuid = { version = "1.6", features = ["v4", "serde"] } chrono = { version = "0.4", features = ["serde"] } log = "0.4" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } # For async main in examples, and general async +colored = "2.0" [dev-dependencies] # For examples later env_logger = "0.10" diff --git a/src/dispatcher/cmd/dispatcher.rs b/src/dispatcher/cmd/dispatcher.rs index 2f42b34..a26be04 100644 --- a/src/dispatcher/cmd/dispatcher.rs +++ b/src/dispatcher/cmd/dispatcher.rs @@ -1,6 +1,7 @@ use clap::Parser; use rhai_dispatcher::{RhaiDispatcher, RhaiDispatcherBuilder}; use log::{error, info}; +use colored::Colorize; use std::io::{self, Write}; use std::time::Duration; @@ -50,10 +51,10 @@ async fn main() -> Result<(), Box> { // Configure logging based on verbosity level let log_config = match args.verbose { - 0 => "warn,rhai_dispatcher=info", - 1 => "info,rhai_dispatcher=debug", - 2 => "debug", - _ => "trace", + 0 => "warn,rhai_dispatcher=warn", + 1 => "info,rhai_dispatcher=info", + 2 => "debug,rhai_dispatcher=debug", + _ => "trace,rhai_dispatcher=trace", }; std::env::set_var("RUST_LOG", log_config); @@ -67,14 +68,16 @@ async fn main() -> Result<(), Box> { env_logger::init(); } - info!("šŸ”— Starting Rhai Dispatcher"); - info!("šŸ“‹ Configuration:"); - info!(" Caller ID: {}", args.caller_id); - info!(" Context ID: {}", args.context_id); - info!(" Worker ID: {}", args.worker_id); - info!(" Redis URL: {}", args.redis_url); - info!(" Timeout: {}s", args.timeout); - info!(""); + if args.verbose > 0 { + info!("šŸ”— Starting Rhai Dispatcher"); + info!("šŸ“‹ Configuration:"); + info!(" Caller ID: {}", args.caller_id); + info!(" Context ID: {}", args.context_id); + info!(" Worker ID: {}", args.worker_id); + info!(" Redis URL: {}", args.redis_url); + info!(" Timeout: {}s", args.timeout); + info!(""); + } // Create the Rhai client let client = RhaiDispatcherBuilder::new() @@ -84,16 +87,22 @@ async fn main() -> Result<(), Box> { .redis_url(&args.redis_url) .build()?; - info!("āœ… Connected to Redis at {}", args.redis_url); + if args.verbose > 0 { + info!("āœ… Connected to Redis at {}", args.redis_url); + } // Determine execution mode if let Some(script_content) = args.script { // Execute inline script - info!("šŸ“œ Executing inline script"); + if args.verbose > 0 { + info!("šŸ“œ Executing inline script"); + } execute_script(&client, script_content, args.timeout).await?; } else if let Some(file_path) = args.file { // Execute script from file - info!("šŸ“ Loading script from file: {}", file_path); + if args.verbose > 0 { + info!("šŸ“ Loading script from file: {}", file_path); + } let script_content = std::fs::read_to_string(&file_path) .map_err(|e| format!("Failed to read script file '{}': {}", file_path, e))?; execute_script(&client, script_content, args.timeout).await?; @@ -101,7 +110,7 @@ async fn main() -> Result<(), Box> { // Interactive mode info!("šŸŽ® Entering interactive mode"); info!("Type Rhai scripts and press Enter to execute. Type 'exit' or 'quit' to close."); - run_interactive_mode(&client, args.timeout).await?; + run_interactive_mode(&client, args.timeout, args.verbose).await?; } Ok(()) @@ -145,6 +154,7 @@ async fn execute_script( async fn run_interactive_mode( client: &RhaiDispatcher, timeout_secs: u64, + verbose: u8, ) -> Result<(), Box> { let timeout = Duration::from_secs(timeout_secs); @@ -166,7 +176,9 @@ async fn run_interactive_mode( break; } - info!("⚔ Executing: {}", input); + if verbose > 0 { + info!("⚔ Executing: {}", input); + } match client .new_play_request() @@ -176,16 +188,15 @@ async fn run_interactive_mode( .await { Ok(result) => { - println!("Status: {}", result.status); if let Some(output) = result.output { - println!("Output: {}", output); + println!("{}", output.color("green")); } if let Some(error) = result.error { - println!("Error: {}", error); + println!("{}", format!("error: {}", error).color("red")); } } Err(e) => { - error!("āŒ Execution failed: {}", e); + println!("{}", format!("error: {}", e).red()); } } diff --git a/src/dispatcher/src/lib.rs b/src/dispatcher/src/lib.rs index 3cc33ee..2f19848 100644 --- a/src/dispatcher/src/lib.rs +++ b/src/dispatcher/src/lib.rs @@ -262,16 +262,17 @@ impl RhaiDispatcherBuilder { } } -/// Internal representation of a script execution request. +/// Representation of a script execution request. /// /// This structure contains all the information needed to execute a Rhai script /// on a worker service, including the script content, target worker, and timeout. +#[derive(Debug, Clone)] pub struct PlayRequest { - id: String, - worker_id: String, - context_id: String, - script: String, - timeout: Duration, + pub id: String, + pub worker_id: String, + pub context_id: String, + pub script: String, + pub timeout: Duration, } /// Builder for constructing and submitting script execution requests. @@ -301,6 +302,7 @@ pub struct PlayRequestBuilder<'a> { caller_id: String, script: String, timeout: Duration, + retries: u32, } impl<'a> PlayRequestBuilder<'a> { @@ -312,7 +314,8 @@ impl<'a> PlayRequestBuilder<'a> { context_id: client.context_id.clone(), caller_id: client.caller_id.clone(), script: "".to_string(), - timeout: Duration::from_secs(10), + timeout: Duration::from_secs(5), + retries: 0, } } @@ -384,10 +387,6 @@ impl<'a> PlayRequestBuilder<'a> { pub async fn await_response(self) -> Result { // Build the request and submit using self.client - println!( - "Awaiting response for request {} with timeout {:?}", - self.request_id, self.timeout - ); let result = self .client .submit_play_request_and_await_result(&self.build()?) diff --git a/src/dsl/Cargo.toml b/src/dsl/Cargo.toml index ce70eb2..2482441 100644 --- a/src/dsl/Cargo.toml +++ b/src/dsl/Cargo.toml @@ -18,6 +18,7 @@ reqwest = { version = "0.11", features = ["json"] } tokio = { version = "1", features = ["full"] } dotenv = "0.15" rhai_dispatcher = { path = "../dispatcher" } +thiserror = "1.0" [dev-dependencies] tempfile = "3" diff --git a/src/dsl/examples/payment/main.rs b/src/dsl/examples/payment/main.rs index 0483f9e..68a6406 100644 --- a/src/dsl/examples/payment/main.rs +++ b/src/dsl/examples/payment/main.rs @@ -3,7 +3,8 @@ use rhai::{Engine, EvalAltResult, Scope}; use std::fs; use std::env; -fn main() -> Result<(), Box> { +#[tokio::main] +async fn main() -> Result<(), Box> { // Load environment variables from .env file dotenv::from_filename("examples/payment/.env").ok(); diff --git a/src/dsl/examples/payment/payment.rhai b/src/dsl/examples/payment/payment.rhai index 43c3ac4..5279552 100644 --- a/src/dsl/examples/payment/payment.rhai +++ b/src/dsl/examples/payment/payment.rhai @@ -20,11 +20,8 @@ print(`Product created: ${product.name}`); // Create the product in Stripe (non-blocking) print("šŸ”„ Dispatching product creation to Stripe..."); try { - let product_result = product.create_async(STRIPE_API_KEY, "payment-example", "new_create_product_response", "new_create_product_error"); + let product_result = product.create_async("payment-example", "payment-context", STRIPE_API_KEY); print(`āœ… Product creation dispatched: ${product_result}`); - // In non-blocking mode, we use a demo product ID for the rest of the example - let product_id = "prod_demo_example_id"; - print("šŸ’” Using demo product ID for remaining operations in non-blocking mode"); } catch(error) { print(`āŒ Failed to dispatch product creation: ${error}`); print("This is expected with a demo API key. In production, use a valid Stripe secret key."); @@ -40,7 +37,7 @@ let upfront_price = new_price() .product(product_id) .metadata("type", "upfront"); -let upfront_result = upfront_price.create_async(STRIPE_API_KEY, "payment-example", "new_create_price_response", "new_create_price_error"); +let upfront_result = upfront_price.create_async("payment-example", "payment-context", STRIPE_API_KEY); print(`āœ… Upfront Price creation dispatched: ${upfront_result}`); let upfront_price_id = "price_demo_upfront_id"; @@ -52,7 +49,7 @@ let monthly_price = new_price() .recurring("month") .metadata("type", "monthly_subscription"); -let monthly_result = monthly_price.create_async(STRIPE_API_KEY, "payment-example", "new_create_price_response", "new_create_price_error"); +let monthly_result = monthly_price.create_async("payment-example", "payment-context", STRIPE_API_KEY); print(`āœ… Monthly Price creation dispatched: ${monthly_result}`); let monthly_price_id = "price_demo_monthly_id"; @@ -65,7 +62,7 @@ let annual_price = new_price() .metadata("type", "annual_subscription") .metadata("discount", "2_months_free"); -let annual_result = annual_price.create_async(STRIPE_API_KEY, "payment-example", "new_create_price_response", "new_create_price_error"); +let annual_result = annual_price.create_async("payment-example", "payment-context", STRIPE_API_KEY); print(`āœ… Annual Price creation dispatched: ${annual_result}`); let annual_price_id = "price_demo_annual_id"; @@ -78,7 +75,7 @@ let percent_coupon = new_coupon() .metadata("campaign", "new_customer_discount") .metadata("code", "WELCOME25"); -let percent_result = percent_coupon.create_async(STRIPE_API_KEY, "payment-example", "new_create_coupon_response", "new_create_coupon_error"); +let percent_result = percent_coupon.create_async("payment-example", "payment-context", STRIPE_API_KEY); print(`āœ… 25% Off Coupon creation dispatched: ${percent_result}`); let percent_coupon_id = "coupon_demo_25percent_id"; @@ -90,7 +87,7 @@ let amount_coupon = new_coupon() .metadata("campaign", "loyalty_program") .metadata("code", "LOYAL5"); -let amount_result = amount_coupon.create_async(STRIPE_API_KEY, "payment-example", "new_create_coupon_response", "new_create_coupon_error"); +let amount_result = amount_coupon.create_async("payment-example", "payment-context", STRIPE_API_KEY); print(`āœ… $5 Off Coupon creation dispatched: ${amount_result}`); let amount_coupon_id = "coupon_demo_5dollar_id"; @@ -108,7 +105,7 @@ let payment_intent = new_payment_intent() .metadata("price_id", upfront_price_id) .metadata("payment_type", "upfront"); -let payment_result = payment_intent.create_async(STRIPE_API_KEY, "payment-example", "new_create_payment_intent_response", "new_create_payment_intent_error"); +let payment_result = payment_intent.create_async("payment-example", "payment-context", STRIPE_API_KEY); print(`āœ… Payment Intent creation dispatched: ${payment_result}`); let payment_intent_id = "pi_demo_payment_intent_id"; @@ -124,7 +121,7 @@ let subscription = new_subscription() .metadata("trial", "14_days") .metadata("source", "website_signup"); -let subscription_result = subscription.create_async(STRIPE_API_KEY, "payment-example", "new_create_subscription_response", "new_create_subscription_error"); +let subscription_result = subscription.create_async("payment-example", "payment-context", STRIPE_API_KEY); print(`āœ… Subscription creation dispatched: ${subscription_result}`); let subscription_id = "sub_demo_subscription_id"; @@ -140,7 +137,7 @@ let multi_subscription = new_subscription() .metadata("licenses", "5") .metadata("addons", "premium_support"); -let multi_result = multi_subscription.create_async(STRIPE_API_KEY, "payment-example", "new_create_subscription_response", "new_create_subscription_error"); +let multi_result = multi_subscription.create_async("payment-example", "payment-context", STRIPE_API_KEY); print(`āœ… Multi-Item Subscription creation dispatched: ${multi_result}`); let multi_subscription_id = "sub_demo_multi_subscription_id"; @@ -156,7 +153,7 @@ let discounted_payment = new_payment_intent() .metadata("coupon_applied", percent_coupon_id) .metadata("discount_percent", "25"); -let discounted_result = discounted_payment.create_async(STRIPE_API_KEY, "payment-example", "new_create_payment_intent_response", "new_create_payment_intent_error"); +let discounted_result = discounted_payment.create_async("payment-example", "payment-context", STRIPE_API_KEY); print(`āœ… Discounted Payment Intent creation dispatched: ${discounted_result}`); let discounted_payment_id = "pi_demo_discounted_payment_id"; diff --git a/src/flow/examples/stripe_flow_example.rs b/src/flow/examples/stripe_flow_example.rs new file mode 100644 index 0000000..bbed26f --- /dev/null +++ b/src/flow/examples/stripe_flow_example.rs @@ -0,0 +1,90 @@ +//! Example demonstrating the flow manager with mock Stripe API calls + +use flow::{new_step, new_flow, FlowExecutor}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + println!("=== Flow Manager Example ==="); + println!("Demonstrating the builder pattern API with mock Stripe workflow\n"); + + // Create the flow executor + let executor = FlowExecutor::new("redis://127.0.0.1/").await?; + + // Build steps using the fluent API + let step1 = new_step("stripe_config") + .script("mock_api_call stripe_config") + .timeout(5) + .retries(2) + .build(); + + let step2 = new_step("stripe_config_confirm") + .script("mock_api_call create_product") + .timeout(5) + .retries(1) + .build(); + + let step3 = new_step("create_product") + .script("mock_api_call create_product") + .timeout(10) + .retries(1) + .build(); + + // Build flow using the fluent API + let flow = new_flow("stripe_payment_request") + .add_step(step1) + .add_step(step2) + .add_step(step3) + .build(); + + println!("Created flow: {}", flow.name); + println!("Flow ID: {}", flow.id); + println!("Number of steps: {}", flow.steps.len()); + + for (i, step) in flow.steps.iter().enumerate() { + println!(" Step {}: {} (timeout: {}s, retries: {})", + i + 1, step.name, step.timeout_seconds, step.max_retries); + } + + // Execute the flow (non-blocking) + println!("\nšŸš€ Starting flow execution..."); + let result = executor.execute_flow(flow.clone()).await?; + println!("āœ… {}", result); + + // Monitor flow progress + println!("\nšŸ“Š Monitoring flow progress..."); + for i in 0..10 { + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + if let Ok(Some(flow_state)) = executor.get_flow_status(&flow.id).await { + println!(" Status: {:?}, Current step: {:?}, Completed: {}/{}", + flow_state.status, + flow_state.current_step, + flow_state.completed_steps.len(), + flow.steps.len()); + + if matches!(flow_state.status, flow::FlowStatus::Completed | flow::FlowStatus::Failed) { + break; + } + } + } + + // Check final status + if let Ok(Some(final_state)) = executor.get_flow_status(&flow.id).await { + println!("\nšŸŽÆ Final flow status: {:?}", final_state.status); + println!("Completed steps: {:?}", final_state.completed_steps); + + // Check individual step results + for step in &flow.steps { + if let Ok(Some(step_state)) = executor.get_step_status(&flow.id, &step.id).await { + println!(" Step '{}': {:?} (attempts: {})", + step.name, step_state.status, step_state.attempt_count); + if let Some(output) = &step_state.output { + println!(" Output: {}", output); + } + } + } + } + + println!("\n✨ Flow execution demonstration completed!"); + Ok(()) +} \ No newline at end of file diff --git a/src/orchestrator/Cargo.toml b/src/orchestrator/Cargo.toml new file mode 100644 index 0000000..c0925ba --- /dev/null +++ b/src/orchestrator/Cargo.toml @@ -0,0 +1,51 @@ +[package] +name = "orchestrator" +version = "0.1.0" +edition = "2021" + +[dependencies] +# Core async runtime +tokio = { version = "1", features = ["macros", "rt-multi-thread", "sync", "time"] } +async-trait = "0.1" +futures = "0.3" +futures-util = "0.3" + +# Serialization +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" + +# Error handling +thiserror = "1.0" + +# Collections +uuid = { version = "1.6", features = ["v4", "serde"] } + +# Time handling +chrono = { version = "0.4", features = ["serde"] } + +# HTTP client +reqwest = { version = "0.11", features = ["json"] } + +# WebSocket client +tokio-tungstenite = "0.20" + +# Rhai scripting +rhai = "1.21.0" + +# Database and models +heromodels = { path = "/Users/timurgordon/code/git.ourworld.tf/herocode/db/heromodels" } +heromodels_core = { path = "/Users/timurgordon/code/git.ourworld.tf/herocode/db/heromodels_core" } + +# DSL integration for flow models +rhailib_dsl = { path = "../dsl" } + +# Dispatcher integration +rhai_dispatcher = { path = "../dispatcher" } + +# Logging +log = "0.4" +tracing = "0.1" +tracing-subscriber = "0.3" + +[dev-dependencies] +tokio-test = "0.4" \ No newline at end of file diff --git a/src/orchestrator/README.md b/src/orchestrator/README.md new file mode 100644 index 0000000..301659e --- /dev/null +++ b/src/orchestrator/README.md @@ -0,0 +1,320 @@ +# Rationale for Orchestrator + +We may have scripts that run asynchrounsly, depend on human input or depend on other scripts to complete. We want to be able to implement high-level workflows of rhai scripts. + +## Design + +Direct Acyclic Graphs (DAGs) are a natural fit for representing workflows. + +## Requirements + +1. Uses Direct Acyclic Graphs (DAGs) to represent workflows. +2. Each step in the workflow defines the script to execute, the inputs to pass to it, and the outputs to expect from it. +3. Simplicity: the output cases are binary (success or failure), and params inputted / outputted are simple key-value pairs. +4. Multiple steps can depend on the same step. +5. Scripts are executed using [RhaiDispatcher](../dispatcher/README.md). + +## Architecture + +The Orchestrator is a simple DAG-based workflow execution system that extends the heromodels flow structures to support workflows with dependencies and distributed script execution. + +### Core Component + +```mermaid +graph TB + subgraph "Orchestrator" + O[Orchestrator] --> RE[RhaiExecutor Trait] + O --> DB[(Database)] + end + + subgraph "Executor Implementations" + RE --> RD[RhaiDispatcher] + RE --> WS[WebSocketClient] + RE --> HTTP[HttpClient] + RE --> LOCAL[LocalExecutor] + end + + subgraph "Data Models (heromodels)" + F[Flow] --> FS[FlowStep] + FS --> SR[SignatureRequirement] + end + + subgraph "Infrastructure" + RD --> RQ[Redis Queues] + RD --> W[Workers] + WS --> WSS[WebSocket Server] + HTTP --> API[REST API] + end +``` + +### Execution Abstraction + +The orchestrator uses a trait-based approach for script execution, allowing different execution backends: + +#### RhaiExecutor Trait +```rust +use rhai_dispatcher::{PlayRequestBuilder, RhaiTaskDetails, RhaiDispatcherError}; + +#[async_trait] +pub trait RhaiExecutor { + async fn call(&self, request: PlayRequestBuilder<'_>) -> Result; +} +``` + +#### Executor Implementations + +**RhaiDispatcher Implementation:** +```rust +pub struct DispatcherExecutor { + dispatcher: RhaiDispatcher, +} + +#[async_trait] +impl RhaiExecutor for DispatcherExecutor { + async fn call(&self, request: PlayRequestBuilder<'_>) -> Result { + // Use RhaiDispatcher to execute script via Redis queues + request.await_response().await + } +} +``` + +**WebSocket Client Implementation:** +```rust +pub struct WebSocketExecutor { + ws_client: WebSocketClient, + endpoint: String, +} + +#[async_trait] +impl RhaiExecutor for WebSocketExecutor { + async fn call(&self, request: PlayRequestBuilder<'_>) -> Result { + // Build the PlayRequest and send via WebSocket + let play_request = request.build()?; + + // Send script execution request via WebSocket + let ws_message = serde_json::to_string(&play_request)?; + self.ws_client.send(ws_message).await?; + + // Wait for response and convert to RhaiTaskDetails + let response = self.ws_client.receive().await?; + serde_json::from_str(&response).map_err(RhaiDispatcherError::from) + } +} +``` + +**HTTP Client Implementation:** +```rust +pub struct HttpExecutor { + http_client: reqwest::Client, + base_url: String, +} + +#[async_trait] +impl RhaiExecutor for HttpExecutor { + async fn call(&self, request: PlayRequestBuilder<'_>) -> Result { + // Build the PlayRequest and send via HTTP + let play_request = request.build()?; + + // Send script execution request via HTTP API + let response = self.http_client + .post(&format!("{}/execute", self.base_url)) + .json(&play_request) + .send() + .await?; + + response.json().await.map_err(RhaiDispatcherError::from) + } +} +``` + +**Local Executor Implementation:** +```rust +pub struct LocalExecutor { + engine: Engine, +} + +#[async_trait] +impl RhaiExecutor for LocalExecutor { + async fn call(&self, request: PlayRequestBuilder<'_>) -> Result { + // Build the PlayRequest and execute locally + let play_request = request.build()?; + + // Execute script directly in local Rhai engine + let result = self.engine.eval::(&play_request.script); + + // Convert to RhaiTaskDetails format + let task_details = RhaiTaskDetails { + task_id: play_request.id, + script: play_request.script, + status: if result.is_ok() { "completed".to_string() } else { "error".to_string() }, + output: result.ok(), + error: result.err().map(|e| e.to_string()), + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + caller_id: "local".to_string(), + context_id: play_request.context_id, + worker_id: "local".to_string(), + }; + + Ok(task_details) + } +} +``` + +### Data Model Extensions + +Simple extensions to the existing heromodels flow structures: + +#### Enhanced FlowStep Model +```rust +// Extends heromodels::models::flow::FlowStep +pub struct FlowStep { + // ... existing heromodels::models::flow::FlowStep fields + pub script: String, // Rhai script to execute + pub depends_on: Vec, // IDs of steps this step depends on + pub context_id: String, // Execution context (circle) + pub inputs: HashMap, // Input parameters + pub outputs: HashMap, // Output results +} +``` + +### Execution Flow + +```mermaid +sequenceDiagram + participant Client as Client + participant O as Orchestrator + participant RE as RhaiExecutor + participant DB as Database + + Client->>O: Submit Flow + O->>DB: Store flow and steps + O->>O: Find steps with no dependencies + + loop Until all steps complete + O->>RE: Execute ready steps + RE-->>O: Return results + O->>DB: Update step status + O->>O: Find newly ready steps + end + + O->>Client: Flow completed +``` + +### Flexible Orchestrator Implementation + +```rust +use rhai_dispatcher::{RhaiDispatcher, PlayRequestBuilder}; +use std::collections::HashSet; + +pub struct Orchestrator { + executor: E, + database: Arc, +} + +impl Orchestrator { + pub fn new(executor: E, database: Arc) -> Self { + Self { executor, database } + } + + pub async fn execute_flow(&self, flow: Flow) -> Result<(), OrchestratorError> { + // 1. Store flow in database + self.database.collection::()?.set(&flow)?; + + // 2. Find steps with no dependencies (depends_on is empty) + let mut pending_steps: Vec = flow.steps.clone(); + let mut completed_steps: HashSet = HashSet::new(); + + while !pending_steps.is_empty() { + // Find ready steps (all dependencies completed) + let ready_steps: Vec = pending_steps + .iter() + .filter(|step| { + step.depends_on.iter().all(|dep_id| completed_steps.contains(dep_id)) + }) + .cloned() + .collect(); + + if ready_steps.is_empty() { + return Err(OrchestratorError::NoReadySteps); + } + + // Execute ready steps concurrently + let mut tasks = Vec::new(); + for step in ready_steps { + let executor = &self.executor; + let task = async move { + // Create PlayRequestBuilder for this step + let request = RhaiDispatcher::new_play_request() + .script(&step.script) + .context_id(&step.context_id) + .worker_id(&step.worker_id); + + // Execute via the trait + let result = executor.call(request).await?; + Ok((step.base_data.id, result)) + }; + tasks.push(task); + } + + // Wait for all ready steps to complete + let results = futures::future::try_join_all(tasks).await?; + + // Update step status and mark as completed + for (step_id, task_details) in results { + if task_details.status == "completed" { + completed_steps.insert(step_id); + // Update step status in database + // self.update_step_status(step_id, "completed", task_details.output).await?; + } else { + return Err(OrchestratorError::StepFailed(step_id, task_details.error)); + } + } + + // Remove completed steps from pending + pending_steps.retain(|step| !completed_steps.contains(&step.base_data.id)); + } + + Ok(()) + } + + pub async fn get_flow_status(&self, flow_id: u32) -> Result { + // Return current status of flow and all its steps + let flow = self.database.collection::()?.get(flow_id)?; + // Implementation would check step statuses and return overall flow status + Ok(FlowStatus::Running) // Placeholder + } +} + +pub enum OrchestratorError { + DatabaseError(String), + ExecutorError(RhaiDispatcherError), + NoReadySteps, + StepFailed(u32, Option), +} + +pub enum FlowStatus { + Pending, + Running, + Completed, + Failed, +} + +// Usage examples: +// let orchestrator = Orchestrator::new(DispatcherExecutor::new(dispatcher), db); +// let orchestrator = Orchestrator::new(WebSocketExecutor::new(ws_client), db); +// let orchestrator = Orchestrator::new(HttpExecutor::new(http_client), db); +// let orchestrator = Orchestrator::new(LocalExecutor::new(engine), db); +``` + +### Key Features + +1. **DAG Validation**: Ensures no circular dependencies exist in the `depends_on` relationships +2. **Parallel Execution**: Executes independent steps concurrently via multiple workers +3. **Simple Dependencies**: Each step lists the step IDs it depends on +4. **RhaiDispatcher Integration**: Uses existing dispatcher for script execution +5. **Binary Outcomes**: Steps either succeed or fail (keeping it simple as per requirements) + +This simple architecture provides DAG-based workflow execution while leveraging the existing rhailib infrastructure and keeping complexity minimal. + + diff --git a/src/orchestrator/examples/basic_workflow.rs b/src/orchestrator/examples/basic_workflow.rs new file mode 100644 index 0000000..a1e3bd9 --- /dev/null +++ b/src/orchestrator/examples/basic_workflow.rs @@ -0,0 +1,283 @@ +//! Basic workflow example demonstrating orchestrator usage + +use orchestrator::{ + interface::LocalInterface, + orchestrator::Orchestrator, + OrchestratedFlow, OrchestratedFlowStep, FlowStatus, +}; +use std::sync::Arc; +use std::collections::HashMap; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialize logging + tracing_subscriber::fmt().init(); + + // Create executor + let executor = Arc::new(LocalInterface::new()); + + // Create orchestrator + let orchestrator = Orchestrator::new(executor); + + println!("šŸš€ Starting basic workflow example"); + + // Example 1: Simple sequential workflow + println!("\nšŸ“‹ Example 1: Sequential Workflow"); + let sequential_flow = create_sequential_workflow(); + let flow_id = orchestrator.execute_flow(sequential_flow).await?; + + // Wait for completion and show results + wait_and_show_results(&orchestrator, flow_id, "Sequential").await; + + // Example 2: Parallel workflow with convergence + println!("\nšŸ“‹ Example 2: Parallel Workflow"); + let parallel_flow = create_parallel_workflow(); + let flow_id = orchestrator.execute_flow(parallel_flow).await?; + + // Wait for completion and show results + wait_and_show_results(&orchestrator, flow_id, "Parallel").await; + + // Example 3: Complex workflow with multiple dependencies + println!("\nšŸ“‹ Example 3: Complex Workflow"); + let complex_flow = create_complex_workflow(); + let flow_id = orchestrator.execute_flow(complex_flow).await?; + + // Wait for completion and show results + wait_and_show_results(&orchestrator, flow_id, "Complex").await; + + // Clean up completed flows + orchestrator.cleanup_completed_flows().await; + + println!("\nāœ… All examples completed successfully!"); + + Ok(()) +} + +/// Create a simple sequential workflow +fn create_sequential_workflow() -> OrchestratedFlow { + let step1 = OrchestratedFlowStep::new("data_preparation") + .script(r#" + let data = [1, 2, 3, 4, 5]; + let sum = 0; + for item in data { + sum += item; + } + let result = sum; + "#) + .context_id("sequential_context") + .worker_id("worker_1"); + + let step2 = OrchestratedFlowStep::new("data_processing") + .script(r#" + let processed_data = dep_1_result * 2; + let result = processed_data; + "#) + .depends_on(step1.id()) + .context_id("sequential_context") + .worker_id("worker_2"); + + let step3 = OrchestratedFlowStep::new("data_output") + .script(r#" + let final_result = "Processed value: " + dep_2_result; + let result = final_result; + "#) + .depends_on(step2.id()) + .context_id("sequential_context") + .worker_id("worker_3"); + + OrchestratedFlow::new("sequential_workflow") + .add_step(step1) + .add_step(step2) + .add_step(step3) +} + +/// Create a parallel workflow with convergence +fn create_parallel_workflow() -> OrchestratedFlow { + let step1 = OrchestratedFlowStep::new("fetch_user_data") + .script(r#" + let user_id = 12345; + let user_name = "Alice"; + let result = user_name; + "#) + .context_id("parallel_context") + .worker_id("user_service"); + + let step2 = OrchestratedFlowStep::new("fetch_order_data") + .script(r#" + let order_id = 67890; + let order_total = 99.99; + let result = order_total; + "#) + .context_id("parallel_context") + .worker_id("order_service"); + + let step3 = OrchestratedFlowStep::new("fetch_inventory_data") + .script(r#" + let product_id = "ABC123"; + let stock_count = 42; + let result = stock_count; + "#) + .context_id("parallel_context") + .worker_id("inventory_service"); + + let step4 = OrchestratedFlowStep::new("generate_report") + .script(r#" + let report = "User: " + dep_1_result + + ", Order Total: $" + dep_2_result + + ", Stock: " + dep_3_result + " units"; + let result = report; + "#) + .depends_on(step1.id()) + .depends_on(step2.id()) + .depends_on(step3.id()) + .context_id("parallel_context") + .worker_id("report_service"); + + OrchestratedFlow::new("parallel_workflow") + .add_step(step1) + .add_step(step2) + .add_step(step3) + .add_step(step4) +} + +/// Create a complex workflow with multiple dependency levels +fn create_complex_workflow() -> OrchestratedFlow { + // Level 1: Initial data gathering + let step1 = OrchestratedFlowStep::new("load_config") + .script(r#" + let config = #{ + api_url: "https://api.example.com", + timeout: 30, + retries: 3 + }; + let result = config.api_url; + "#) + .context_id("complex_context") + .worker_id("config_service"); + + let step2 = OrchestratedFlowStep::new("authenticate") + .script(r#" + let token = "auth_token_12345"; + let expires_in = 3600; + let result = token; + "#) + .context_id("complex_context") + .worker_id("auth_service"); + + // Level 2: Data fetching (depends on config and auth) + let step3 = OrchestratedFlowStep::new("fetch_customers") + .script(r#" + let api_url = dep_1_result; + let auth_token = dep_2_result; + let customers = ["Customer A", "Customer B", "Customer C"]; + let result = customers.len(); + "#) + .depends_on(step1.id()) + .depends_on(step2.id()) + .context_id("complex_context") + .worker_id("customer_service"); + + let step4 = OrchestratedFlowStep::new("fetch_products") + .script(r#" + let api_url = dep_1_result; + let auth_token = dep_2_result; + let products = ["Product X", "Product Y", "Product Z"]; + let result = products.len(); + "#) + .depends_on(step1.id()) + .depends_on(step2.id()) + .context_id("complex_context") + .worker_id("product_service"); + + // Level 3: Data processing (depends on fetched data) + let step5 = OrchestratedFlowStep::new("calculate_metrics") + .script(r#" + let customer_count = dep_3_result; + let product_count = dep_4_result; + let ratio = customer_count / product_count; + let result = ratio; + "#) + .depends_on(step3.id()) + .depends_on(step4.id()) + .context_id("complex_context") + .worker_id("analytics_service"); + + // Level 4: Final reporting + let step6 = OrchestratedFlowStep::new("generate_dashboard") + .script(r#" + let customer_count = dep_3_result; + let product_count = dep_4_result; + let ratio = dep_5_result; + let dashboard = "Dashboard: " + customer_count + " customers, " + + product_count + " products, ratio: " + ratio; + let result = dashboard; + "#) + .depends_on(step3.id()) + .depends_on(step4.id()) + .depends_on(step5.id()) + .context_id("complex_context") + .worker_id("dashboard_service"); + + OrchestratedFlow::new("complex_workflow") + .add_step(step1) + .add_step(step2) + .add_step(step3) + .add_step(step4) + .add_step(step5) + .add_step(step6) +} + +/// Wait for flow completion and show results +async fn wait_and_show_results( + orchestrator: &Orchestrator, + flow_id: u32, + workflow_name: &str, +) { + println!(" ā³ Executing {} workflow (ID: {})...", workflow_name, flow_id); + + // Poll for completion + loop { + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + if let Some(execution) = orchestrator.get_flow_status(flow_id).await { + match execution.status { + FlowStatus::Completed => { + println!(" āœ… {} workflow completed successfully!", workflow_name); + println!(" šŸ“Š Executed {} steps in {:?}", + execution.completed_steps.len(), + execution.completed_at.unwrap() - execution.started_at); + + // Show step results + for (step_id, outputs) in &execution.step_results { + if let Some(result) = outputs.get("result") { + let step_name = execution.flow.orchestrated_steps + .iter() + .find(|s| s.id() == *step_id) + .map(|s| s.flow_step.name.as_str()) + .unwrap_or("unknown"); + println!(" šŸ“ Step '{}': {}", step_name, result); + } + } + break; + } + FlowStatus::Failed => { + println!(" āŒ {} workflow failed!", workflow_name); + if !execution.failed_steps.is_empty() { + println!(" šŸ’„ Failed steps: {:?}", execution.failed_steps); + } + break; + } + FlowStatus::Running => { + print!("."); + std::io::Write::flush(&mut std::io::stdout()).unwrap(); + } + FlowStatus::Pending => { + println!(" āøļø {} workflow is pending...", workflow_name); + } + } + } else { + println!(" ā“ {} workflow not found!", workflow_name); + break; + } + } +} \ No newline at end of file diff --git a/src/orchestrator/src/interface/dispatcher.rs b/src/orchestrator/src/interface/dispatcher.rs new file mode 100644 index 0000000..4452396 --- /dev/null +++ b/src/orchestrator/src/interface/dispatcher.rs @@ -0,0 +1,61 @@ +//! Dispatcher interface implementation using RhaiDispatcher + +use crate::RhaiInterface; +use async_trait::async_trait; +use rhai_dispatcher::{PlayRequest, RhaiDispatcher, RhaiDispatcherError}; +use std::sync::Arc; + +/// Dispatcher-based interface using RhaiDispatcher +pub struct DispatcherInterface { + dispatcher: Arc, +} + +impl DispatcherInterface { + /// Create a new dispatcher interface + pub fn new(dispatcher: Arc) -> Self { + Self { dispatcher } + } +} + +#[async_trait] +impl RhaiInterface for DispatcherInterface { + async fn submit_play_request(&self, play_request: &PlayRequest) -> Result<(), RhaiDispatcherError> { + self.dispatcher.submit_play_request(play_request).await + } + + async fn submit_play_request_and_await_result(&self, play_request: &PlayRequest) -> Result { + self.dispatcher.submit_play_request_and_await_result(play_request).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_dispatcher_interface_creation() { + // This test just verifies we can create the interface + // Note: Actual testing would require a properly configured RhaiDispatcher + // For now, we'll create a mock or skip the actual dispatcher creation + + // This is a placeholder test - adjust based on actual RhaiDispatcher constructor + // let dispatcher = Arc::new(RhaiDispatcher::new()); + // let interface = DispatcherInterface::new(dispatcher); + + // Just verify the test compiles for now + assert!(true); + } + + #[tokio::test] + async fn test_dispatcher_interface_methods() { + // This test would verify the interface methods work correctly + // when a proper RhaiDispatcher is available + + let play_request = PlayRequest { + script: "let x = 5; x + 3".to_string(), + }; + + // Placeholder assertions - would test actual functionality with real dispatcher + assert_eq!(play_request.script, "let x = 5; x + 3"); + } +} \ No newline at end of file diff --git a/src/orchestrator/src/interface/local.rs b/src/orchestrator/src/interface/local.rs new file mode 100644 index 0000000..09ac7d3 --- /dev/null +++ b/src/orchestrator/src/interface/local.rs @@ -0,0 +1,111 @@ +//! Local interface implementation for in-process script execution + +use crate::RhaiInterface; +use async_trait::async_trait; +use rhai_dispatcher::{PlayRequest, RhaiDispatcherError}; + +/// Local interface for in-process script execution +pub struct LocalInterface { + engine: rhai::Engine, +} + +impl LocalInterface { + /// Create a new local interface + pub fn new() -> Self { + let engine = rhai::Engine::new(); + Self { engine } + } + + /// Create a new local interface with custom engine + pub fn with_engine(engine: rhai::Engine) -> Self { + Self { engine } + } +} + +impl Default for LocalInterface { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl RhaiInterface for LocalInterface { + async fn submit_play_request(&self, _play_request: &PlayRequest) -> Result<(), RhaiDispatcherError> { + // For local interface, fire-and-forget doesn't make much sense + // We'll just execute and ignore the result + let _ = self.submit_play_request_and_await_result(_play_request).await?; + Ok(()) + } + + async fn submit_play_request_and_await_result(&self, play_request: &PlayRequest) -> Result { + let mut scope = rhai::Scope::new(); + + // Execute the script + let result = self + .engine + .eval_with_scope::(&mut scope, &play_request.script) + .map_err(|e| RhaiDispatcherError::TaskNotFound(format!("Script execution error: {}", e)))?; + + // Return the result as a string + if result.is_unit() { + Ok(String::new()) + } else { + Ok(result.to_string()) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_local_interface_basic() { + let interface = LocalInterface::new(); + let play_request = PlayRequest { + script: "let x = 5; x + 3".to_string(), + }; + + let result = interface.submit_play_request_and_await_result(&play_request).await; + assert!(result.is_ok()); + + let output = result.unwrap(); + assert_eq!(output, "8"); + } + + #[tokio::test] + async fn test_local_interface_fire_and_forget() { + let interface = LocalInterface::new(); + let play_request = PlayRequest { + script: "let x = 5; x + 3".to_string(), + }; + + let result = interface.submit_play_request(&play_request).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_local_interface_with_error() { + let interface = LocalInterface::new(); + let play_request = PlayRequest { + script: "invalid_syntax +++".to_string(), + }; + + let result = interface.submit_play_request_and_await_result(&play_request).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_local_interface_empty_result() { + let interface = LocalInterface::new(); + let play_request = PlayRequest { + script: "let x = 42;".to_string(), + }; + + let result = interface.submit_play_request_and_await_result(&play_request).await; + assert!(result.is_ok()); + + let output = result.unwrap(); + assert_eq!(output, ""); + } +} \ No newline at end of file diff --git a/src/orchestrator/src/interface/mod.rs b/src/orchestrator/src/interface/mod.rs new file mode 100644 index 0000000..7111544 --- /dev/null +++ b/src/orchestrator/src/interface/mod.rs @@ -0,0 +1,9 @@ +//! Interface implementations for different backends + +pub mod local; +pub mod ws; +pub mod dispatcher; + +pub use local::*; +pub use ws::*; +pub use dispatcher::*; \ No newline at end of file diff --git a/src/orchestrator/src/interface/ws.rs b/src/orchestrator/src/interface/ws.rs new file mode 100644 index 0000000..9644db3 --- /dev/null +++ b/src/orchestrator/src/interface/ws.rs @@ -0,0 +1,117 @@ +//! WebSocket interface implementation for remote script execution + +use crate::RhaiInterface; +use async_trait::async_trait; +use rhai_dispatcher::{PlayRequest, RhaiDispatcherError}; +use reqwest::Client; +use serde_json::json; + +/// WebSocket-based interface for remote script execution +pub struct WsInterface { + client: Client, + base_url: String, +} + +impl WsInterface { + /// Create a new WebSocket interface + pub fn new(base_url: String) -> Self { + Self { + client: Client::new(), + base_url, + } + } +} + +#[async_trait] +impl RhaiInterface for WsInterface { + async fn submit_play_request(&self, play_request: &PlayRequest) -> Result<(), RhaiDispatcherError> { + let payload = json!({ + "script": play_request.script + }); + + let response = self + .client + .post(&format!("{}/submit", self.base_url)) + .json(&payload) + .send() + .await + .map_err(|e| RhaiDispatcherError::TaskNotFound(format!("Network error: {}", e)))?; + + if response.status().is_success() { + Ok(()) + } else { + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + Err(RhaiDispatcherError::TaskNotFound(format!("HTTP error: {}", error_text))) + } + } + + async fn submit_play_request_and_await_result(&self, play_request: &PlayRequest) -> Result { + let payload = json!({ + "script": play_request.script + }); + + let response = self + .client + .post(&format!("{}/execute", self.base_url)) + .json(&payload) + .send() + .await + .map_err(|e| RhaiDispatcherError::TaskNotFound(format!("Network error: {}", e)))?; + + if response.status().is_success() { + let result: String = response + .text() + .await + .map_err(|e| RhaiDispatcherError::TaskNotFound(format!("Response parsing error: {}", e)))?; + Ok(result) + } else { + let error_text = response + .text() + .await + .unwrap_or_else(|_| "Unknown error".to_string()); + Err(RhaiDispatcherError::TaskNotFound(format!("HTTP error: {}", error_text))) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_ws_interface_creation() { + let interface = WsInterface::new("http://localhost:8080".to_string()); + assert_eq!(interface.base_url, "http://localhost:8080"); + } + + #[tokio::test] + async fn test_ws_interface_call_with_mock_server() { + // This test would require a mock HTTP server + // For now, just test that we can create the interface + let interface = WsInterface::new("http://localhost:8080".to_string()); + + let play_request = PlayRequest { + script: "let x = 1;".to_string(), + }; + + // This will fail without a real server, but that's expected in unit tests + let result = interface.submit_play_request_and_await_result(&play_request).await; + assert!(result.is_err()); // Expected to fail without server + } + + #[tokio::test] + async fn test_ws_interface_fire_and_forget() { + let interface = WsInterface::new("http://localhost:8080".to_string()); + + let play_request = PlayRequest { + script: "let x = 1;".to_string(), + }; + + // This will fail without a real server, but that's expected in unit tests + let result = interface.submit_play_request(&play_request).await; + assert!(result.is_err()); // Expected to fail without server + } +} \ No newline at end of file diff --git a/src/orchestrator/src/lib.rs b/src/orchestrator/src/lib.rs new file mode 100644 index 0000000..8f19477 --- /dev/null +++ b/src/orchestrator/src/lib.rs @@ -0,0 +1,35 @@ +//! # Orchestrator +//! +//! A simple DAG-based workflow execution system that extends the heromodels flow structures +//! to support workflows with dependencies and distributed script execution. + +use async_trait::async_trait; +use rhai_dispatcher::{PlayRequest, RhaiDispatcherError}; + +pub mod interface; +pub mod orchestrator; + +pub use interface::*; +pub use orchestrator::*; + +/// Trait for executing Rhai scripts through different backends +/// Uses the same signature as RhaiDispatcher for consistency +#[async_trait] +pub trait RhaiInterface { + /// Submit a play request without waiting for result (fire-and-forget) + async fn submit_play_request(&self, play_request: &PlayRequest) -> Result<(), RhaiDispatcherError>; + + /// Submit a play request and await the result + /// Returns just the output string on success + async fn submit_play_request_and_await_result(&self, play_request: &PlayRequest) -> Result; +} + +// Re-export the flow models from DSL +pub use rhailib_dsl::flow::{OrchestratedFlow, OrchestratedFlowStep, OrchestratorError, FlowStatus}; + +// Conversion from RhaiDispatcherError to OrchestratorError +impl From for OrchestratorError { + fn from(err: RhaiDispatcherError) -> Self { + OrchestratorError::ExecutorError(err.to_string()) + } +} diff --git a/src/orchestrator/src/orchestrator.rs b/src/orchestrator/src/orchestrator.rs new file mode 100644 index 0000000..69bb0b4 --- /dev/null +++ b/src/orchestrator/src/orchestrator.rs @@ -0,0 +1,418 @@ +//! Main orchestrator implementation for DAG-based workflow execution + +use crate::{ + OrchestratedFlow, OrchestratedFlowStep, OrchestratorError, FlowStatus, RhaiInterface, +}; +use rhai_dispatcher::PlayRequest; +use futures::future::try_join_all; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use tokio::sync::RwLock; +use tracing::{debug, error, info, warn}; + +/// Main orchestrator for executing DAG-based workflows +pub struct Orchestrator { + /// Interface for running scripts + interface: Arc, + + /// Active flow executions + active_flows: Arc>>, +} + +/// Represents an active flow execution +#[derive(Debug, Clone)] +pub struct FlowExecution { + /// The flow being executed + pub flow: OrchestratedFlow, + + /// Current status + pub status: FlowStatus, + + /// Completed step IDs + pub completed_steps: HashSet, + + /// Failed step IDs + pub failed_steps: HashSet, + + /// Step results + pub step_results: HashMap>, + + /// Execution start time + pub started_at: chrono::DateTime, + + /// Execution end time + pub completed_at: Option>, +} + +impl FlowExecution { + /// Create a new flow execution + pub fn new(flow: OrchestratedFlow) -> Self { + Self { + flow, + status: FlowStatus::Pending, + completed_steps: HashSet::new(), + failed_steps: HashSet::new(), + step_results: HashMap::new(), + started_at: chrono::Utc::now(), + completed_at: None, + } + } + + /// Check if a step is ready to execute (all dependencies completed) + pub fn is_step_ready(&self, step: &OrchestratedFlowStep) -> bool { + if self.completed_steps.contains(&step.id()) || self.failed_steps.contains(&step.id()) { + return false; + } + + step.depends_on.iter().all(|dep_id| self.completed_steps.contains(dep_id)) + } + + /// Get all ready steps + pub fn get_ready_steps(&self) -> Vec<&OrchestratedFlowStep> { + self.flow + .orchestrated_steps + .iter() + .filter(|step| self.is_step_ready(step)) + .collect() + } + + /// Mark a step as completed + pub fn complete_step(&mut self, step_id: u32, outputs: HashMap) { + self.completed_steps.insert(step_id); + self.step_results.insert(step_id, outputs); + + // Check if flow is complete + if self.completed_steps.len() == self.flow.orchestrated_steps.len() { + self.status = FlowStatus::Completed; + self.completed_at = Some(chrono::Utc::now()); + } + } + + /// Mark a step as failed + pub fn fail_step(&mut self, step_id: u32) { + self.failed_steps.insert(step_id); + self.status = FlowStatus::Failed; + self.completed_at = Some(chrono::Utc::now()); + } + + /// Check if the flow execution is finished + pub fn is_finished(&self) -> bool { + matches!(self.status, FlowStatus::Completed | FlowStatus::Failed) + } +} + +impl Orchestrator { + /// Create a new orchestrator + pub fn new(interface: Arc) -> Self { + Self { + interface, + active_flows: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Start executing a flow + pub async fn execute_flow(&self, flow: OrchestratedFlow) -> Result { + let flow_id = flow.id(); + flow.validate_dag()?; + + info!("Starting execution of flow {} with {} steps", flow_id, flow.orchestrated_steps.len()); + + // Create flow execution + let mut execution = FlowExecution::new(flow); + execution.status = FlowStatus::Running; + + // Store the execution + { + let mut active_flows = self.active_flows.write().await; + active_flows.insert(flow_id, execution); + } + + // Start execution in background + let orchestrator = self.clone(); + tokio::spawn(async move { + if let Err(e) = orchestrator.execute_flow_steps(flow_id).await { + error!("Flow {} execution failed: {}", flow_id, e); + + // Mark flow as failed + let mut active_flows = orchestrator.active_flows.write().await; + if let Some(execution) = active_flows.get_mut(&flow_id) { + execution.status = FlowStatus::Failed; + execution.completed_at = Some(chrono::Utc::now()); + } + } + }); + + Ok(flow_id) + } + + /// Execute flow steps using DAG traversal + async fn execute_flow_steps(&self, flow_id: u32) -> Result<(), OrchestratorError> { + loop { + let ready_steps = { + let active_flows = self.active_flows.read().await; + let execution = active_flows + .get(&flow_id) + .ok_or(OrchestratorError::StepNotFound(flow_id))?; + + if execution.is_finished() { + info!("Flow {} execution completed with status: {:?}", flow_id, execution.status); + return Ok(()); + } + + execution.get_ready_steps().into_iter().cloned().collect::>() + }; + + if ready_steps.is_empty() { + // Check if we're deadlocked + let active_flows = self.active_flows.read().await; + let execution = active_flows + .get(&flow_id) + .ok_or(OrchestratorError::StepNotFound(flow_id))?; + + if !execution.is_finished() { + warn!("No ready steps found for flow {} - possible deadlock", flow_id); + return Err(OrchestratorError::NoReadySteps); + } + + return Ok(()); + } + + debug!("Executing {} ready steps for flow {}", ready_steps.len(), flow_id); + + // Execute ready steps concurrently + let step_futures = ready_steps.into_iter().map(|step| { + let orchestrator = self.clone(); + async move { + orchestrator.execute_step(flow_id, step).await + } + }); + + // Wait for all steps to complete + let results = try_join_all(step_futures).await?; + + // Update execution state + { + let mut active_flows = self.active_flows.write().await; + let execution = active_flows + .get_mut(&flow_id) + .ok_or(OrchestratorError::StepNotFound(flow_id))?; + + for (step_id, outputs) in results { + execution.complete_step(step_id, outputs); + } + } + + // Small delay to prevent tight loop + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + } + } + + /// Execute a single step + async fn execute_step( + &self, + flow_id: u32, + step: OrchestratedFlowStep, + ) -> Result<(u32, HashMap), OrchestratorError> { + let step_id = step.id(); + info!("Executing step {} for flow {}", step_id, flow_id); + + // Prepare inputs with dependency outputs + let mut inputs = step.inputs.clone(); + + // Add outputs from dependency steps + { + let active_flows = self.active_flows.read().await; + let execution = active_flows + .get(&flow_id) + .ok_or(OrchestratorError::StepNotFound(flow_id))?; + + for dep_id in &step.depends_on { + if let Some(dep_outputs) = execution.step_results.get(dep_id) { + for (key, value) in dep_outputs { + inputs.insert(format!("dep_{}_{}", dep_id, key), value.clone()); + } + } + } + } + + // Create play request + let play_request = PlayRequest { + id: format!("{}_{}", flow_id, step_id), + worker_id: step.worker_id.clone(), + context_id: step.context_id.clone(), + script: step.script.clone(), + timeout: std::time::Duration::from_secs(30), // Default timeout + }; + + // Execute the script + match self.interface.submit_play_request_and_await_result(&play_request).await { + Ok(output) => { + info!("Step {} completed successfully", step_id); + let mut outputs = HashMap::new(); + outputs.insert("result".to_string(), output); + Ok((step_id, outputs)) + } + Err(e) => { + error!("Step {} failed: {}", step_id, e); + + // Mark step as failed + { + let mut active_flows = self.active_flows.write().await; + if let Some(execution) = active_flows.get_mut(&flow_id) { + execution.fail_step(step_id); + } + } + + Err(OrchestratorError::StepFailed(step_id, Some(e.to_string()))) + } + } + } + + /// Get the status of a flow execution + pub async fn get_flow_status(&self, flow_id: u32) -> Option { + let active_flows = self.active_flows.read().await; + active_flows.get(&flow_id).cloned() + } + + /// Cancel a flow execution + pub async fn cancel_flow(&self, flow_id: u32) -> Result<(), OrchestratorError> { + let mut active_flows = self.active_flows.write().await; + if let Some(execution) = active_flows.get_mut(&flow_id) { + execution.status = FlowStatus::Failed; + execution.completed_at = Some(chrono::Utc::now()); + info!("Flow {} cancelled", flow_id); + Ok(()) + } else { + Err(OrchestratorError::StepNotFound(flow_id)) + } + } + + /// List all active flows + pub async fn list_active_flows(&self) -> Vec<(u32, FlowStatus)> { + let active_flows = self.active_flows.read().await; + active_flows + .iter() + .map(|(id, execution)| (*id, execution.status.clone())) + .collect() + } + + /// Clean up completed flows + pub async fn cleanup_completed_flows(&self) { + let mut active_flows = self.active_flows.write().await; + active_flows.retain(|_, execution| !execution.is_finished()); + } +} + +impl Clone for Orchestrator { + fn clone(&self) -> Self { + Self { + interface: self.interface.clone(), + active_flows: self.active_flows.clone(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::interface::LocalInterface; + use std::collections::HashMap; + + #[tokio::test] + async fn test_simple_flow_execution() { + let interface = Arc::new(LocalInterface::new()); + let orchestrator = Orchestrator::new(interface); + + // Create a simple flow with two steps + let step1 = OrchestratedFlowStep::new("step1") + .script("let result = 10;") + .context_id("test") + .worker_id("worker1"); + + let step2 = OrchestratedFlowStep::new("step2") + .script("let result = dep_1_result + 5;") + .depends_on(step1.id()) + .context_id("test") + .worker_id("worker1"); + + let flow = OrchestratedFlow::new("test_flow") + .add_step(step1) + .add_step(step2); + + // Execute the flow + let flow_id = orchestrator.execute_flow(flow).await.unwrap(); + + // Wait for completion + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + let status = orchestrator.get_flow_status(flow_id).await.unwrap(); + assert_eq!(status.status, FlowStatus::Completed); + assert_eq!(status.completed_steps.len(), 2); + } + + #[tokio::test] + async fn test_parallel_execution() { + let interface = Arc::new(LocalInterface::new()); + let orchestrator = Orchestrator::new(interface); + + // Create a flow with parallel steps + let step1 = OrchestratedFlowStep::new("step1") + .script("let result = 10;") + .context_id("test") + .worker_id("worker1"); + + let step2 = OrchestratedFlowStep::new("step2") + .script("let result = 20;") + .context_id("test") + .worker_id("worker2"); + + let step3 = OrchestratedFlowStep::new("step3") + .script("let result = dep_1_result + dep_2_result;") + .depends_on(step1.id()) + .depends_on(step2.id()) + .context_id("test") + .worker_id("worker3"); + + let flow = OrchestratedFlow::new("parallel_flow") + .add_step(step1) + .add_step(step2) + .add_step(step3); + + // Execute the flow + let flow_id = orchestrator.execute_flow(flow).await.unwrap(); + + // Wait for completion + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + let status = orchestrator.get_flow_status(flow_id).await.unwrap(); + assert_eq!(status.status, FlowStatus::Completed); + assert_eq!(status.completed_steps.len(), 3); + } + + #[test] + fn test_flow_execution_state() { + let step1 = OrchestratedFlowStep::new("step1").script("let x = 1;"); + let step2 = OrchestratedFlowStep::new("step2") + .script("let y = 2;") + .depends_on(step1.id()); + + let flow = OrchestratedFlow::new("test_flow") + .add_step(step1.clone()) + .add_step(step2.clone()); + + let mut execution = FlowExecution::new(flow); + + // Initially, only step1 should be ready + assert!(execution.is_step_ready(&step1)); + assert!(!execution.is_step_ready(&step2)); + + // After completing step1, step2 should be ready + execution.complete_step(step1.id(), HashMap::new()); + assert!(!execution.is_step_ready(&step1)); // Already completed + assert!(execution.is_step_ready(&step2)); + + // After completing step2, flow should be complete + execution.complete_step(step2.id(), HashMap::new()); + assert_eq!(execution.status, FlowStatus::Completed); + } +} diff --git a/src/orchestrator/src/services.rs b/src/orchestrator/src/services.rs new file mode 100644 index 0000000..cc20137 --- /dev/null +++ b/src/orchestrator/src/services.rs @@ -0,0 +1,42 @@ +//! Main orchestrator implementation for DAG-based workflow execution + +use crate::{ + OrchestratedFlow, OrchestratedFlowStep, OrchestratorError, FlowStatus, RhaiInterface, ScriptRequest, +}; +use futures::future::try_join_all; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use tokio::sync::RwLock; +use tracing::{debug, error, info, warn}; + +impl Orchestrator { + /// Get a flow by ID + pub fn get_flow(&self, flow_id: u32) -> Result { + self.interface + .new_play_request() + .script(format!("json_encode(get_flow({}))", flow_id)) + .submit_play_request_and_await_result() + .await + .map(|result| serde_json::from_str(&result).unwrap()) + } + + pub fn get_flows(&self) -> Result, OrchestratorError> { + self.interface + .new_play_request() + .script("json_encode(get_flows())") + .submit_play_request_and_await_result() + .await + .map(|result| serde_json::from_str(&result).unwrap()) + } + + pub fn get_active_flows(&self) -> Result, OrchestratorError> { + self.interface + .new_play_request() + .script("json_encode(get_flows())") + .submit_play_request_and_await_result() + .await + .map(|result| serde_json::from_str(&result).unwrap()) + + } + +}