workflow orchestration wip
This commit is contained in:
parent
1bcdad6a17
commit
9c03b5ed37
198
Cargo.lock
generated
198
Cargo.lock
generated
@ -123,6 +123,28 @@ version = "0.7.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
|
||||
|
||||
[[package]]
|
||||
name = "async-stream"
|
||||
version = "0.3.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
|
||||
dependencies = [
|
||||
"async-stream-impl",
|
||||
"futures-core",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-stream-impl"
|
||||
version = "0.3.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.88"
|
||||
@ -193,12 +215,27 @@ version = "2.9.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967"
|
||||
|
||||
[[package]]
|
||||
name = "block-buffer"
|
||||
version = "0.10.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71"
|
||||
dependencies = [
|
||||
"generic-array",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bumpalo"
|
||||
version = "3.17.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf"
|
||||
|
||||
[[package]]
|
||||
name = "byteorder"
|
||||
version = "1.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
|
||||
|
||||
[[package]]
|
||||
name = "bytes"
|
||||
version = "1.10.1"
|
||||
@ -327,6 +364,16 @@ version = "1.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
|
||||
|
||||
[[package]]
|
||||
name = "colored"
|
||||
version = "2.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c"
|
||||
dependencies = [
|
||||
"lazy_static",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "combine"
|
||||
version = "4.6.7"
|
||||
@ -377,6 +424,15 @@ version = "0.8.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
|
||||
|
||||
[[package]]
|
||||
name = "cpufeatures"
|
||||
version = "0.2.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crc32fast"
|
||||
version = "1.4.2"
|
||||
@ -453,6 +509,16 @@ version = "0.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "43da5946c66ffcc7745f48db692ffbb10a83bfe0afd96235c5c2a4fb23994929"
|
||||
|
||||
[[package]]
|
||||
name = "crypto-common"
|
||||
version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
|
||||
dependencies = [
|
||||
"generic-array",
|
||||
"typenum",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "csv"
|
||||
version = "1.3.1"
|
||||
@ -474,6 +540,12 @@ dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "data-encoding"
|
||||
version = "2.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476"
|
||||
|
||||
[[package]]
|
||||
name = "derive"
|
||||
version = "0.1.0"
|
||||
@ -482,6 +554,16 @@ dependencies = [
|
||||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "digest"
|
||||
version = "0.10.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
|
||||
dependencies = [
|
||||
"block-buffer",
|
||||
"crypto-common",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dirs"
|
||||
version = "4.0.0"
|
||||
@ -715,6 +797,16 @@ dependencies = [
|
||||
"slab",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "generic-array"
|
||||
version = "0.14.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a"
|
||||
dependencies = [
|
||||
"typenum",
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "getrandom"
|
||||
version = "0.2.16"
|
||||
@ -1372,6 +1464,32 @@ dependencies = [
|
||||
"vcpkg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "orchestrator"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"heromodels",
|
||||
"heromodels_core",
|
||||
"log",
|
||||
"reqwest",
|
||||
"rhai",
|
||||
"rhai_dispatcher",
|
||||
"rhailib_dsl",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-test",
|
||||
"tokio-tungstenite",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ourdb"
|
||||
version = "0.1.0"
|
||||
@ -1769,6 +1887,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"clap",
|
||||
"colored",
|
||||
"env_logger",
|
||||
"log",
|
||||
"redis",
|
||||
@ -1818,6 +1937,7 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tempfile",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
@ -2006,6 +2126,17 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sha1"
|
||||
version = "0.10.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cpufeatures",
|
||||
"digest",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sha1_smol"
|
||||
version = "1.0.1"
|
||||
@ -2331,6 +2462,42 @@ dependencies = [
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-stream"
|
||||
version = "0.1.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-test"
|
||||
version = "0.4.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-tungstenite"
|
||||
version = "0.20.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c"
|
||||
dependencies = [
|
||||
"futures-util",
|
||||
"log",
|
||||
"tokio",
|
||||
"tungstenite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-util"
|
||||
version = "0.7.15"
|
||||
@ -2421,6 +2588,31 @@ dependencies = [
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tungstenite"
|
||||
version = "0.20.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"data-encoding",
|
||||
"http",
|
||||
"httparse",
|
||||
"log",
|
||||
"rand",
|
||||
"sha1",
|
||||
"thiserror",
|
||||
"url",
|
||||
"utf-8",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "typenum"
|
||||
version = "1.18.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f"
|
||||
|
||||
[[package]]
|
||||
name = "unicode-ident"
|
||||
version = "1.0.18"
|
||||
@ -2450,6 +2642,12 @@ dependencies = [
|
||||
"percent-encoding",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "utf-8"
|
||||
version = "0.7.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
|
||||
|
||||
[[package]]
|
||||
name = "utf8_iter"
|
||||
version = "1.0.4"
|
||||
|
@ -35,6 +35,7 @@ members = [
|
||||
"src/engine",
|
||||
"src/worker",
|
||||
"src/monitor", # Added the new monitor package to workspace
|
||||
"src/orchestrator", # Added the new orchestrator package to workspace
|
||||
"src/macros", "src/dsl", "src/derive",
|
||||
]
|
||||
resolver = "2" # Recommended for new workspaces
|
||||
|
@ -17,6 +17,7 @@ uuid = { version = "1.6", features = ["v4", "serde"] }
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
log = "0.4"
|
||||
tokio = { version = "1", features = ["macros", "rt-multi-thread"] } # For async main in examples, and general async
|
||||
colored = "2.0"
|
||||
|
||||
[dev-dependencies] # For examples later
|
||||
env_logger = "0.10"
|
||||
|
@ -1,6 +1,7 @@
|
||||
use clap::Parser;
|
||||
use rhai_dispatcher::{RhaiDispatcher, RhaiDispatcherBuilder};
|
||||
use log::{error, info};
|
||||
use colored::Colorize;
|
||||
use std::io::{self, Write};
|
||||
use std::time::Duration;
|
||||
|
||||
@ -50,10 +51,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
|
||||
// Configure logging based on verbosity level
|
||||
let log_config = match args.verbose {
|
||||
0 => "warn,rhai_dispatcher=info",
|
||||
1 => "info,rhai_dispatcher=debug",
|
||||
2 => "debug",
|
||||
_ => "trace",
|
||||
0 => "warn,rhai_dispatcher=warn",
|
||||
1 => "info,rhai_dispatcher=info",
|
||||
2 => "debug,rhai_dispatcher=debug",
|
||||
_ => "trace,rhai_dispatcher=trace",
|
||||
};
|
||||
|
||||
std::env::set_var("RUST_LOG", log_config);
|
||||
@ -67,14 +68,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
env_logger::init();
|
||||
}
|
||||
|
||||
info!("🔗 Starting Rhai Dispatcher");
|
||||
info!("📋 Configuration:");
|
||||
info!(" Caller ID: {}", args.caller_id);
|
||||
info!(" Context ID: {}", args.context_id);
|
||||
info!(" Worker ID: {}", args.worker_id);
|
||||
info!(" Redis URL: {}", args.redis_url);
|
||||
info!(" Timeout: {}s", args.timeout);
|
||||
info!("");
|
||||
if args.verbose > 0 {
|
||||
info!("🔗 Starting Rhai Dispatcher");
|
||||
info!("📋 Configuration:");
|
||||
info!(" Caller ID: {}", args.caller_id);
|
||||
info!(" Context ID: {}", args.context_id);
|
||||
info!(" Worker ID: {}", args.worker_id);
|
||||
info!(" Redis URL: {}", args.redis_url);
|
||||
info!(" Timeout: {}s", args.timeout);
|
||||
info!("");
|
||||
}
|
||||
|
||||
// Create the Rhai client
|
||||
let client = RhaiDispatcherBuilder::new()
|
||||
@ -84,16 +87,22 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
.redis_url(&args.redis_url)
|
||||
.build()?;
|
||||
|
||||
info!("✅ Connected to Redis at {}", args.redis_url);
|
||||
if args.verbose > 0 {
|
||||
info!("✅ Connected to Redis at {}", args.redis_url);
|
||||
}
|
||||
|
||||
// Determine execution mode
|
||||
if let Some(script_content) = args.script {
|
||||
// Execute inline script
|
||||
info!("📜 Executing inline script");
|
||||
if args.verbose > 0 {
|
||||
info!("📜 Executing inline script");
|
||||
}
|
||||
execute_script(&client, script_content, args.timeout).await?;
|
||||
} else if let Some(file_path) = args.file {
|
||||
// Execute script from file
|
||||
info!("📁 Loading script from file: {}", file_path);
|
||||
if args.verbose > 0 {
|
||||
info!("📁 Loading script from file: {}", file_path);
|
||||
}
|
||||
let script_content = std::fs::read_to_string(&file_path)
|
||||
.map_err(|e| format!("Failed to read script file '{}': {}", file_path, e))?;
|
||||
execute_script(&client, script_content, args.timeout).await?;
|
||||
@ -101,7 +110,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Interactive mode
|
||||
info!("🎮 Entering interactive mode");
|
||||
info!("Type Rhai scripts and press Enter to execute. Type 'exit' or 'quit' to close.");
|
||||
run_interactive_mode(&client, args.timeout).await?;
|
||||
run_interactive_mode(&client, args.timeout, args.verbose).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@ -145,6 +154,7 @@ async fn execute_script(
|
||||
async fn run_interactive_mode(
|
||||
client: &RhaiDispatcher,
|
||||
timeout_secs: u64,
|
||||
verbose: u8,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let timeout = Duration::from_secs(timeout_secs);
|
||||
|
||||
@ -166,7 +176,9 @@ async fn run_interactive_mode(
|
||||
break;
|
||||
}
|
||||
|
||||
info!("⚡ Executing: {}", input);
|
||||
if verbose > 0 {
|
||||
info!("⚡ Executing: {}", input);
|
||||
}
|
||||
|
||||
match client
|
||||
.new_play_request()
|
||||
@ -176,16 +188,15 @@ async fn run_interactive_mode(
|
||||
.await
|
||||
{
|
||||
Ok(result) => {
|
||||
println!("Status: {}", result.status);
|
||||
if let Some(output) = result.output {
|
||||
println!("Output: {}", output);
|
||||
println!("{}", output.color("green"));
|
||||
}
|
||||
if let Some(error) = result.error {
|
||||
println!("Error: {}", error);
|
||||
println!("{}", format!("error: {}", error).color("red"));
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("❌ Execution failed: {}", e);
|
||||
println!("{}", format!("error: {}", e).red());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -262,16 +262,17 @@ impl RhaiDispatcherBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
/// Internal representation of a script execution request.
|
||||
/// Representation of a script execution request.
|
||||
///
|
||||
/// This structure contains all the information needed to execute a Rhai script
|
||||
/// on a worker service, including the script content, target worker, and timeout.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PlayRequest {
|
||||
id: String,
|
||||
worker_id: String,
|
||||
context_id: String,
|
||||
script: String,
|
||||
timeout: Duration,
|
||||
pub id: String,
|
||||
pub worker_id: String,
|
||||
pub context_id: String,
|
||||
pub script: String,
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
/// Builder for constructing and submitting script execution requests.
|
||||
@ -301,6 +302,7 @@ pub struct PlayRequestBuilder<'a> {
|
||||
caller_id: String,
|
||||
script: String,
|
||||
timeout: Duration,
|
||||
retries: u32,
|
||||
}
|
||||
|
||||
impl<'a> PlayRequestBuilder<'a> {
|
||||
@ -312,7 +314,8 @@ impl<'a> PlayRequestBuilder<'a> {
|
||||
context_id: client.context_id.clone(),
|
||||
caller_id: client.caller_id.clone(),
|
||||
script: "".to_string(),
|
||||
timeout: Duration::from_secs(10),
|
||||
timeout: Duration::from_secs(5),
|
||||
retries: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@ -384,10 +387,6 @@ impl<'a> PlayRequestBuilder<'a> {
|
||||
|
||||
pub async fn await_response(self) -> Result<RhaiTaskDetails, RhaiDispatcherError> {
|
||||
// Build the request and submit using self.client
|
||||
println!(
|
||||
"Awaiting response for request {} with timeout {:?}",
|
||||
self.request_id, self.timeout
|
||||
);
|
||||
let result = self
|
||||
.client
|
||||
.submit_play_request_and_await_result(&self.build()?)
|
||||
|
@ -18,6 +18,7 @@ reqwest = { version = "0.11", features = ["json"] }
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
dotenv = "0.15"
|
||||
rhai_dispatcher = { path = "../dispatcher" }
|
||||
thiserror = "1.0"
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3"
|
||||
|
@ -3,7 +3,8 @@ use rhai::{Engine, EvalAltResult, Scope};
|
||||
use std::fs;
|
||||
use std::env;
|
||||
|
||||
fn main() -> Result<(), Box<EvalAltResult>> {
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<EvalAltResult>> {
|
||||
// Load environment variables from .env file
|
||||
dotenv::from_filename("examples/payment/.env").ok();
|
||||
|
||||
|
@ -20,11 +20,8 @@ print(`Product created: ${product.name}`);
|
||||
// Create the product in Stripe (non-blocking)
|
||||
print("🔄 Dispatching product creation to Stripe...");
|
||||
try {
|
||||
let product_result = product.create_async(STRIPE_API_KEY, "payment-example", "new_create_product_response", "new_create_product_error");
|
||||
let product_result = product.create_async("payment-example", "payment-context", STRIPE_API_KEY);
|
||||
print(`✅ Product creation dispatched: ${product_result}`);
|
||||
// In non-blocking mode, we use a demo product ID for the rest of the example
|
||||
let product_id = "prod_demo_example_id";
|
||||
print("💡 Using demo product ID for remaining operations in non-blocking mode");
|
||||
} catch(error) {
|
||||
print(`❌ Failed to dispatch product creation: ${error}`);
|
||||
print("This is expected with a demo API key. In production, use a valid Stripe secret key.");
|
||||
@ -40,7 +37,7 @@ let upfront_price = new_price()
|
||||
.product(product_id)
|
||||
.metadata("type", "upfront");
|
||||
|
||||
let upfront_result = upfront_price.create_async(STRIPE_API_KEY, "payment-example", "new_create_price_response", "new_create_price_error");
|
||||
let upfront_result = upfront_price.create_async("payment-example", "payment-context", STRIPE_API_KEY);
|
||||
print(`✅ Upfront Price creation dispatched: ${upfront_result}`);
|
||||
let upfront_price_id = "price_demo_upfront_id";
|
||||
|
||||
@ -52,7 +49,7 @@ let monthly_price = new_price()
|
||||
.recurring("month")
|
||||
.metadata("type", "monthly_subscription");
|
||||
|
||||
let monthly_result = monthly_price.create_async(STRIPE_API_KEY, "payment-example", "new_create_price_response", "new_create_price_error");
|
||||
let monthly_result = monthly_price.create_async("payment-example", "payment-context", STRIPE_API_KEY);
|
||||
print(`✅ Monthly Price creation dispatched: ${monthly_result}`);
|
||||
let monthly_price_id = "price_demo_monthly_id";
|
||||
|
||||
@ -65,7 +62,7 @@ let annual_price = new_price()
|
||||
.metadata("type", "annual_subscription")
|
||||
.metadata("discount", "2_months_free");
|
||||
|
||||
let annual_result = annual_price.create_async(STRIPE_API_KEY, "payment-example", "new_create_price_response", "new_create_price_error");
|
||||
let annual_result = annual_price.create_async("payment-example", "payment-context", STRIPE_API_KEY);
|
||||
print(`✅ Annual Price creation dispatched: ${annual_result}`);
|
||||
let annual_price_id = "price_demo_annual_id";
|
||||
|
||||
@ -78,7 +75,7 @@ let percent_coupon = new_coupon()
|
||||
.metadata("campaign", "new_customer_discount")
|
||||
.metadata("code", "WELCOME25");
|
||||
|
||||
let percent_result = percent_coupon.create_async(STRIPE_API_KEY, "payment-example", "new_create_coupon_response", "new_create_coupon_error");
|
||||
let percent_result = percent_coupon.create_async("payment-example", "payment-context", STRIPE_API_KEY);
|
||||
print(`✅ 25% Off Coupon creation dispatched: ${percent_result}`);
|
||||
let percent_coupon_id = "coupon_demo_25percent_id";
|
||||
|
||||
@ -90,7 +87,7 @@ let amount_coupon = new_coupon()
|
||||
.metadata("campaign", "loyalty_program")
|
||||
.metadata("code", "LOYAL5");
|
||||
|
||||
let amount_result = amount_coupon.create_async(STRIPE_API_KEY, "payment-example", "new_create_coupon_response", "new_create_coupon_error");
|
||||
let amount_result = amount_coupon.create_async("payment-example", "payment-context", STRIPE_API_KEY);
|
||||
print(`✅ $5 Off Coupon creation dispatched: ${amount_result}`);
|
||||
let amount_coupon_id = "coupon_demo_5dollar_id";
|
||||
|
||||
@ -108,7 +105,7 @@ let payment_intent = new_payment_intent()
|
||||
.metadata("price_id", upfront_price_id)
|
||||
.metadata("payment_type", "upfront");
|
||||
|
||||
let payment_result = payment_intent.create_async(STRIPE_API_KEY, "payment-example", "new_create_payment_intent_response", "new_create_payment_intent_error");
|
||||
let payment_result = payment_intent.create_async("payment-example", "payment-context", STRIPE_API_KEY);
|
||||
print(`✅ Payment Intent creation dispatched: ${payment_result}`);
|
||||
let payment_intent_id = "pi_demo_payment_intent_id";
|
||||
|
||||
@ -124,7 +121,7 @@ let subscription = new_subscription()
|
||||
.metadata("trial", "14_days")
|
||||
.metadata("source", "website_signup");
|
||||
|
||||
let subscription_result = subscription.create_async(STRIPE_API_KEY, "payment-example", "new_create_subscription_response", "new_create_subscription_error");
|
||||
let subscription_result = subscription.create_async("payment-example", "payment-context", STRIPE_API_KEY);
|
||||
print(`✅ Subscription creation dispatched: ${subscription_result}`);
|
||||
let subscription_id = "sub_demo_subscription_id";
|
||||
|
||||
@ -140,7 +137,7 @@ let multi_subscription = new_subscription()
|
||||
.metadata("licenses", "5")
|
||||
.metadata("addons", "premium_support");
|
||||
|
||||
let multi_result = multi_subscription.create_async(STRIPE_API_KEY, "payment-example", "new_create_subscription_response", "new_create_subscription_error");
|
||||
let multi_result = multi_subscription.create_async("payment-example", "payment-context", STRIPE_API_KEY);
|
||||
print(`✅ Multi-Item Subscription creation dispatched: ${multi_result}`);
|
||||
let multi_subscription_id = "sub_demo_multi_subscription_id";
|
||||
|
||||
@ -156,7 +153,7 @@ let discounted_payment = new_payment_intent()
|
||||
.metadata("coupon_applied", percent_coupon_id)
|
||||
.metadata("discount_percent", "25");
|
||||
|
||||
let discounted_result = discounted_payment.create_async(STRIPE_API_KEY, "payment-example", "new_create_payment_intent_response", "new_create_payment_intent_error");
|
||||
let discounted_result = discounted_payment.create_async("payment-example", "payment-context", STRIPE_API_KEY);
|
||||
print(`✅ Discounted Payment Intent creation dispatched: ${discounted_result}`);
|
||||
let discounted_payment_id = "pi_demo_discounted_payment_id";
|
||||
|
||||
|
90
src/flow/examples/stripe_flow_example.rs
Normal file
90
src/flow/examples/stripe_flow_example.rs
Normal file
@ -0,0 +1,90 @@
|
||||
//! Example demonstrating the flow manager with mock Stripe API calls
|
||||
|
||||
use flow::{new_step, new_flow, FlowExecutor};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
println!("=== Flow Manager Example ===");
|
||||
println!("Demonstrating the builder pattern API with mock Stripe workflow\n");
|
||||
|
||||
// Create the flow executor
|
||||
let executor = FlowExecutor::new("redis://127.0.0.1/").await?;
|
||||
|
||||
// Build steps using the fluent API
|
||||
let step1 = new_step("stripe_config")
|
||||
.script("mock_api_call stripe_config")
|
||||
.timeout(5)
|
||||
.retries(2)
|
||||
.build();
|
||||
|
||||
let step2 = new_step("stripe_config_confirm")
|
||||
.script("mock_api_call create_product")
|
||||
.timeout(5)
|
||||
.retries(1)
|
||||
.build();
|
||||
|
||||
let step3 = new_step("create_product")
|
||||
.script("mock_api_call create_product")
|
||||
.timeout(10)
|
||||
.retries(1)
|
||||
.build();
|
||||
|
||||
// Build flow using the fluent API
|
||||
let flow = new_flow("stripe_payment_request")
|
||||
.add_step(step1)
|
||||
.add_step(step2)
|
||||
.add_step(step3)
|
||||
.build();
|
||||
|
||||
println!("Created flow: {}", flow.name);
|
||||
println!("Flow ID: {}", flow.id);
|
||||
println!("Number of steps: {}", flow.steps.len());
|
||||
|
||||
for (i, step) in flow.steps.iter().enumerate() {
|
||||
println!(" Step {}: {} (timeout: {}s, retries: {})",
|
||||
i + 1, step.name, step.timeout_seconds, step.max_retries);
|
||||
}
|
||||
|
||||
// Execute the flow (non-blocking)
|
||||
println!("\n🚀 Starting flow execution...");
|
||||
let result = executor.execute_flow(flow.clone()).await?;
|
||||
println!("✅ {}", result);
|
||||
|
||||
// Monitor flow progress
|
||||
println!("\n📊 Monitoring flow progress...");
|
||||
for i in 0..10 {
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
|
||||
|
||||
if let Ok(Some(flow_state)) = executor.get_flow_status(&flow.id).await {
|
||||
println!(" Status: {:?}, Current step: {:?}, Completed: {}/{}",
|
||||
flow_state.status,
|
||||
flow_state.current_step,
|
||||
flow_state.completed_steps.len(),
|
||||
flow.steps.len());
|
||||
|
||||
if matches!(flow_state.status, flow::FlowStatus::Completed | flow::FlowStatus::Failed) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check final status
|
||||
if let Ok(Some(final_state)) = executor.get_flow_status(&flow.id).await {
|
||||
println!("\n🎯 Final flow status: {:?}", final_state.status);
|
||||
println!("Completed steps: {:?}", final_state.completed_steps);
|
||||
|
||||
// Check individual step results
|
||||
for step in &flow.steps {
|
||||
if let Ok(Some(step_state)) = executor.get_step_status(&flow.id, &step.id).await {
|
||||
println!(" Step '{}': {:?} (attempts: {})",
|
||||
step.name, step_state.status, step_state.attempt_count);
|
||||
if let Some(output) = &step_state.output {
|
||||
println!(" Output: {}", output);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!("\n✨ Flow execution demonstration completed!");
|
||||
Ok(())
|
||||
}
|
51
src/orchestrator/Cargo.toml
Normal file
51
src/orchestrator/Cargo.toml
Normal file
@ -0,0 +1,51 @@
|
||||
[package]
|
||||
name = "orchestrator"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
# Core async runtime
|
||||
tokio = { version = "1", features = ["macros", "rt-multi-thread", "sync", "time"] }
|
||||
async-trait = "0.1"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
|
||||
# Serialization
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
|
||||
# Error handling
|
||||
thiserror = "1.0"
|
||||
|
||||
# Collections
|
||||
uuid = { version = "1.6", features = ["v4", "serde"] }
|
||||
|
||||
# Time handling
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
|
||||
# HTTP client
|
||||
reqwest = { version = "0.11", features = ["json"] }
|
||||
|
||||
# WebSocket client
|
||||
tokio-tungstenite = "0.20"
|
||||
|
||||
# Rhai scripting
|
||||
rhai = "1.21.0"
|
||||
|
||||
# Database and models
|
||||
heromodels = { path = "/Users/timurgordon/code/git.ourworld.tf/herocode/db/heromodels" }
|
||||
heromodels_core = { path = "/Users/timurgordon/code/git.ourworld.tf/herocode/db/heromodels_core" }
|
||||
|
||||
# DSL integration for flow models
|
||||
rhailib_dsl = { path = "../dsl" }
|
||||
|
||||
# Dispatcher integration
|
||||
rhai_dispatcher = { path = "../dispatcher" }
|
||||
|
||||
# Logging
|
||||
log = "0.4"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = "0.3"
|
||||
|
||||
[dev-dependencies]
|
||||
tokio-test = "0.4"
|
320
src/orchestrator/README.md
Normal file
320
src/orchestrator/README.md
Normal file
@ -0,0 +1,320 @@
|
||||
# Rationale for Orchestrator
|
||||
|
||||
We may have scripts that run asynchrounsly, depend on human input or depend on other scripts to complete. We want to be able to implement high-level workflows of rhai scripts.
|
||||
|
||||
## Design
|
||||
|
||||
Direct Acyclic Graphs (DAGs) are a natural fit for representing workflows.
|
||||
|
||||
## Requirements
|
||||
|
||||
1. Uses Direct Acyclic Graphs (DAGs) to represent workflows.
|
||||
2. Each step in the workflow defines the script to execute, the inputs to pass to it, and the outputs to expect from it.
|
||||
3. Simplicity: the output cases are binary (success or failure), and params inputted / outputted are simple key-value pairs.
|
||||
4. Multiple steps can depend on the same step.
|
||||
5. Scripts are executed using [RhaiDispatcher](../dispatcher/README.md).
|
||||
|
||||
## Architecture
|
||||
|
||||
The Orchestrator is a simple DAG-based workflow execution system that extends the heromodels flow structures to support workflows with dependencies and distributed script execution.
|
||||
|
||||
### Core Component
|
||||
|
||||
```mermaid
|
||||
graph TB
|
||||
subgraph "Orchestrator"
|
||||
O[Orchestrator] --> RE[RhaiExecutor Trait]
|
||||
O --> DB[(Database)]
|
||||
end
|
||||
|
||||
subgraph "Executor Implementations"
|
||||
RE --> RD[RhaiDispatcher]
|
||||
RE --> WS[WebSocketClient]
|
||||
RE --> HTTP[HttpClient]
|
||||
RE --> LOCAL[LocalExecutor]
|
||||
end
|
||||
|
||||
subgraph "Data Models (heromodels)"
|
||||
F[Flow] --> FS[FlowStep]
|
||||
FS --> SR[SignatureRequirement]
|
||||
end
|
||||
|
||||
subgraph "Infrastructure"
|
||||
RD --> RQ[Redis Queues]
|
||||
RD --> W[Workers]
|
||||
WS --> WSS[WebSocket Server]
|
||||
HTTP --> API[REST API]
|
||||
end
|
||||
```
|
||||
|
||||
### Execution Abstraction
|
||||
|
||||
The orchestrator uses a trait-based approach for script execution, allowing different execution backends:
|
||||
|
||||
#### RhaiExecutor Trait
|
||||
```rust
|
||||
use rhai_dispatcher::{PlayRequestBuilder, RhaiTaskDetails, RhaiDispatcherError};
|
||||
|
||||
#[async_trait]
|
||||
pub trait RhaiExecutor {
|
||||
async fn call(&self, request: PlayRequestBuilder<'_>) -> Result<RhaiTaskDetails, RhaiDispatcherError>;
|
||||
}
|
||||
```
|
||||
|
||||
#### Executor Implementations
|
||||
|
||||
**RhaiDispatcher Implementation:**
|
||||
```rust
|
||||
pub struct DispatcherExecutor {
|
||||
dispatcher: RhaiDispatcher,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl RhaiExecutor for DispatcherExecutor {
|
||||
async fn call(&self, request: PlayRequestBuilder<'_>) -> Result<RhaiTaskDetails, RhaiDispatcherError> {
|
||||
// Use RhaiDispatcher to execute script via Redis queues
|
||||
request.await_response().await
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**WebSocket Client Implementation:**
|
||||
```rust
|
||||
pub struct WebSocketExecutor {
|
||||
ws_client: WebSocketClient,
|
||||
endpoint: String,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl RhaiExecutor for WebSocketExecutor {
|
||||
async fn call(&self, request: PlayRequestBuilder<'_>) -> Result<RhaiTaskDetails, RhaiDispatcherError> {
|
||||
// Build the PlayRequest and send via WebSocket
|
||||
let play_request = request.build()?;
|
||||
|
||||
// Send script execution request via WebSocket
|
||||
let ws_message = serde_json::to_string(&play_request)?;
|
||||
self.ws_client.send(ws_message).await?;
|
||||
|
||||
// Wait for response and convert to RhaiTaskDetails
|
||||
let response = self.ws_client.receive().await?;
|
||||
serde_json::from_str(&response).map_err(RhaiDispatcherError::from)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**HTTP Client Implementation:**
|
||||
```rust
|
||||
pub struct HttpExecutor {
|
||||
http_client: reqwest::Client,
|
||||
base_url: String,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl RhaiExecutor for HttpExecutor {
|
||||
async fn call(&self, request: PlayRequestBuilder<'_>) -> Result<RhaiTaskDetails, RhaiDispatcherError> {
|
||||
// Build the PlayRequest and send via HTTP
|
||||
let play_request = request.build()?;
|
||||
|
||||
// Send script execution request via HTTP API
|
||||
let response = self.http_client
|
||||
.post(&format!("{}/execute", self.base_url))
|
||||
.json(&play_request)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
response.json().await.map_err(RhaiDispatcherError::from)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Local Executor Implementation:**
|
||||
```rust
|
||||
pub struct LocalExecutor {
|
||||
engine: Engine,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl RhaiExecutor for LocalExecutor {
|
||||
async fn call(&self, request: PlayRequestBuilder<'_>) -> Result<RhaiTaskDetails, RhaiDispatcherError> {
|
||||
// Build the PlayRequest and execute locally
|
||||
let play_request = request.build()?;
|
||||
|
||||
// Execute script directly in local Rhai engine
|
||||
let result = self.engine.eval::<String>(&play_request.script);
|
||||
|
||||
// Convert to RhaiTaskDetails format
|
||||
let task_details = RhaiTaskDetails {
|
||||
task_id: play_request.id,
|
||||
script: play_request.script,
|
||||
status: if result.is_ok() { "completed".to_string() } else { "error".to_string() },
|
||||
output: result.ok(),
|
||||
error: result.err().map(|e| e.to_string()),
|
||||
created_at: chrono::Utc::now(),
|
||||
updated_at: chrono::Utc::now(),
|
||||
caller_id: "local".to_string(),
|
||||
context_id: play_request.context_id,
|
||||
worker_id: "local".to_string(),
|
||||
};
|
||||
|
||||
Ok(task_details)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Data Model Extensions
|
||||
|
||||
Simple extensions to the existing heromodels flow structures:
|
||||
|
||||
#### Enhanced FlowStep Model
|
||||
```rust
|
||||
// Extends heromodels::models::flow::FlowStep
|
||||
pub struct FlowStep {
|
||||
// ... existing heromodels::models::flow::FlowStep fields
|
||||
pub script: String, // Rhai script to execute
|
||||
pub depends_on: Vec<u32>, // IDs of steps this step depends on
|
||||
pub context_id: String, // Execution context (circle)
|
||||
pub inputs: HashMap<String, String>, // Input parameters
|
||||
pub outputs: HashMap<String, String>, // Output results
|
||||
}
|
||||
```
|
||||
|
||||
### Execution Flow
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
participant Client as Client
|
||||
participant O as Orchestrator
|
||||
participant RE as RhaiExecutor
|
||||
participant DB as Database
|
||||
|
||||
Client->>O: Submit Flow
|
||||
O->>DB: Store flow and steps
|
||||
O->>O: Find steps with no dependencies
|
||||
|
||||
loop Until all steps complete
|
||||
O->>RE: Execute ready steps
|
||||
RE-->>O: Return results
|
||||
O->>DB: Update step status
|
||||
O->>O: Find newly ready steps
|
||||
end
|
||||
|
||||
O->>Client: Flow completed
|
||||
```
|
||||
|
||||
### Flexible Orchestrator Implementation
|
||||
|
||||
```rust
|
||||
use rhai_dispatcher::{RhaiDispatcher, PlayRequestBuilder};
|
||||
use std::collections::HashSet;
|
||||
|
||||
pub struct Orchestrator<E: RhaiExecutor> {
|
||||
executor: E,
|
||||
database: Arc<Database>,
|
||||
}
|
||||
|
||||
impl<E: RhaiExecutor> Orchestrator<E> {
|
||||
pub fn new(executor: E, database: Arc<Database>) -> Self {
|
||||
Self { executor, database }
|
||||
}
|
||||
|
||||
pub async fn execute_flow(&self, flow: Flow) -> Result<(), OrchestratorError> {
|
||||
// 1. Store flow in database
|
||||
self.database.collection::<Flow>()?.set(&flow)?;
|
||||
|
||||
// 2. Find steps with no dependencies (depends_on is empty)
|
||||
let mut pending_steps: Vec<FlowStep> = flow.steps.clone();
|
||||
let mut completed_steps: HashSet<u32> = HashSet::new();
|
||||
|
||||
while !pending_steps.is_empty() {
|
||||
// Find ready steps (all dependencies completed)
|
||||
let ready_steps: Vec<FlowStep> = pending_steps
|
||||
.iter()
|
||||
.filter(|step| {
|
||||
step.depends_on.iter().all(|dep_id| completed_steps.contains(dep_id))
|
||||
})
|
||||
.cloned()
|
||||
.collect();
|
||||
|
||||
if ready_steps.is_empty() {
|
||||
return Err(OrchestratorError::NoReadySteps);
|
||||
}
|
||||
|
||||
// Execute ready steps concurrently
|
||||
let mut tasks = Vec::new();
|
||||
for step in ready_steps {
|
||||
let executor = &self.executor;
|
||||
let task = async move {
|
||||
// Create PlayRequestBuilder for this step
|
||||
let request = RhaiDispatcher::new_play_request()
|
||||
.script(&step.script)
|
||||
.context_id(&step.context_id)
|
||||
.worker_id(&step.worker_id);
|
||||
|
||||
// Execute via the trait
|
||||
let result = executor.call(request).await?;
|
||||
Ok((step.base_data.id, result))
|
||||
};
|
||||
tasks.push(task);
|
||||
}
|
||||
|
||||
// Wait for all ready steps to complete
|
||||
let results = futures::future::try_join_all(tasks).await?;
|
||||
|
||||
// Update step status and mark as completed
|
||||
for (step_id, task_details) in results {
|
||||
if task_details.status == "completed" {
|
||||
completed_steps.insert(step_id);
|
||||
// Update step status in database
|
||||
// self.update_step_status(step_id, "completed", task_details.output).await?;
|
||||
} else {
|
||||
return Err(OrchestratorError::StepFailed(step_id, task_details.error));
|
||||
}
|
||||
}
|
||||
|
||||
// Remove completed steps from pending
|
||||
pending_steps.retain(|step| !completed_steps.contains(&step.base_data.id));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn get_flow_status(&self, flow_id: u32) -> Result<FlowStatus, OrchestratorError> {
|
||||
// Return current status of flow and all its steps
|
||||
let flow = self.database.collection::<Flow>()?.get(flow_id)?;
|
||||
// Implementation would check step statuses and return overall flow status
|
||||
Ok(FlowStatus::Running) // Placeholder
|
||||
}
|
||||
}
|
||||
|
||||
pub enum OrchestratorError {
|
||||
DatabaseError(String),
|
||||
ExecutorError(RhaiDispatcherError),
|
||||
NoReadySteps,
|
||||
StepFailed(u32, Option<String>),
|
||||
}
|
||||
|
||||
pub enum FlowStatus {
|
||||
Pending,
|
||||
Running,
|
||||
Completed,
|
||||
Failed,
|
||||
}
|
||||
|
||||
// Usage examples:
|
||||
// let orchestrator = Orchestrator::new(DispatcherExecutor::new(dispatcher), db);
|
||||
// let orchestrator = Orchestrator::new(WebSocketExecutor::new(ws_client), db);
|
||||
// let orchestrator = Orchestrator::new(HttpExecutor::new(http_client), db);
|
||||
// let orchestrator = Orchestrator::new(LocalExecutor::new(engine), db);
|
||||
```
|
||||
|
||||
### Key Features
|
||||
|
||||
1. **DAG Validation**: Ensures no circular dependencies exist in the `depends_on` relationships
|
||||
2. **Parallel Execution**: Executes independent steps concurrently via multiple workers
|
||||
3. **Simple Dependencies**: Each step lists the step IDs it depends on
|
||||
4. **RhaiDispatcher Integration**: Uses existing dispatcher for script execution
|
||||
5. **Binary Outcomes**: Steps either succeed or fail (keeping it simple as per requirements)
|
||||
|
||||
This simple architecture provides DAG-based workflow execution while leveraging the existing rhailib infrastructure and keeping complexity minimal.
|
||||
|
||||
|
283
src/orchestrator/examples/basic_workflow.rs
Normal file
283
src/orchestrator/examples/basic_workflow.rs
Normal file
@ -0,0 +1,283 @@
|
||||
//! Basic workflow example demonstrating orchestrator usage
|
||||
|
||||
use orchestrator::{
|
||||
interface::LocalInterface,
|
||||
orchestrator::Orchestrator,
|
||||
OrchestratedFlow, OrchestratedFlowStep, FlowStatus,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Initialize logging
|
||||
tracing_subscriber::fmt().init();
|
||||
|
||||
// Create executor
|
||||
let executor = Arc::new(LocalInterface::new());
|
||||
|
||||
// Create orchestrator
|
||||
let orchestrator = Orchestrator::new(executor);
|
||||
|
||||
println!("🚀 Starting basic workflow example");
|
||||
|
||||
// Example 1: Simple sequential workflow
|
||||
println!("\n📋 Example 1: Sequential Workflow");
|
||||
let sequential_flow = create_sequential_workflow();
|
||||
let flow_id = orchestrator.execute_flow(sequential_flow).await?;
|
||||
|
||||
// Wait for completion and show results
|
||||
wait_and_show_results(&orchestrator, flow_id, "Sequential").await;
|
||||
|
||||
// Example 2: Parallel workflow with convergence
|
||||
println!("\n📋 Example 2: Parallel Workflow");
|
||||
let parallel_flow = create_parallel_workflow();
|
||||
let flow_id = orchestrator.execute_flow(parallel_flow).await?;
|
||||
|
||||
// Wait for completion and show results
|
||||
wait_and_show_results(&orchestrator, flow_id, "Parallel").await;
|
||||
|
||||
// Example 3: Complex workflow with multiple dependencies
|
||||
println!("\n📋 Example 3: Complex Workflow");
|
||||
let complex_flow = create_complex_workflow();
|
||||
let flow_id = orchestrator.execute_flow(complex_flow).await?;
|
||||
|
||||
// Wait for completion and show results
|
||||
wait_and_show_results(&orchestrator, flow_id, "Complex").await;
|
||||
|
||||
// Clean up completed flows
|
||||
orchestrator.cleanup_completed_flows().await;
|
||||
|
||||
println!("\n✅ All examples completed successfully!");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create a simple sequential workflow
|
||||
fn create_sequential_workflow() -> OrchestratedFlow {
|
||||
let step1 = OrchestratedFlowStep::new("data_preparation")
|
||||
.script(r#"
|
||||
let data = [1, 2, 3, 4, 5];
|
||||
let sum = 0;
|
||||
for item in data {
|
||||
sum += item;
|
||||
}
|
||||
let result = sum;
|
||||
"#)
|
||||
.context_id("sequential_context")
|
||||
.worker_id("worker_1");
|
||||
|
||||
let step2 = OrchestratedFlowStep::new("data_processing")
|
||||
.script(r#"
|
||||
let processed_data = dep_1_result * 2;
|
||||
let result = processed_data;
|
||||
"#)
|
||||
.depends_on(step1.id())
|
||||
.context_id("sequential_context")
|
||||
.worker_id("worker_2");
|
||||
|
||||
let step3 = OrchestratedFlowStep::new("data_output")
|
||||
.script(r#"
|
||||
let final_result = "Processed value: " + dep_2_result;
|
||||
let result = final_result;
|
||||
"#)
|
||||
.depends_on(step2.id())
|
||||
.context_id("sequential_context")
|
||||
.worker_id("worker_3");
|
||||
|
||||
OrchestratedFlow::new("sequential_workflow")
|
||||
.add_step(step1)
|
||||
.add_step(step2)
|
||||
.add_step(step3)
|
||||
}
|
||||
|
||||
/// Create a parallel workflow with convergence
|
||||
fn create_parallel_workflow() -> OrchestratedFlow {
|
||||
let step1 = OrchestratedFlowStep::new("fetch_user_data")
|
||||
.script(r#"
|
||||
let user_id = 12345;
|
||||
let user_name = "Alice";
|
||||
let result = user_name;
|
||||
"#)
|
||||
.context_id("parallel_context")
|
||||
.worker_id("user_service");
|
||||
|
||||
let step2 = OrchestratedFlowStep::new("fetch_order_data")
|
||||
.script(r#"
|
||||
let order_id = 67890;
|
||||
let order_total = 99.99;
|
||||
let result = order_total;
|
||||
"#)
|
||||
.context_id("parallel_context")
|
||||
.worker_id("order_service");
|
||||
|
||||
let step3 = OrchestratedFlowStep::new("fetch_inventory_data")
|
||||
.script(r#"
|
||||
let product_id = "ABC123";
|
||||
let stock_count = 42;
|
||||
let result = stock_count;
|
||||
"#)
|
||||
.context_id("parallel_context")
|
||||
.worker_id("inventory_service");
|
||||
|
||||
let step4 = OrchestratedFlowStep::new("generate_report")
|
||||
.script(r#"
|
||||
let report = "User: " + dep_1_result +
|
||||
", Order Total: $" + dep_2_result +
|
||||
", Stock: " + dep_3_result + " units";
|
||||
let result = report;
|
||||
"#)
|
||||
.depends_on(step1.id())
|
||||
.depends_on(step2.id())
|
||||
.depends_on(step3.id())
|
||||
.context_id("parallel_context")
|
||||
.worker_id("report_service");
|
||||
|
||||
OrchestratedFlow::new("parallel_workflow")
|
||||
.add_step(step1)
|
||||
.add_step(step2)
|
||||
.add_step(step3)
|
||||
.add_step(step4)
|
||||
}
|
||||
|
||||
/// Create a complex workflow with multiple dependency levels
|
||||
fn create_complex_workflow() -> OrchestratedFlow {
|
||||
// Level 1: Initial data gathering
|
||||
let step1 = OrchestratedFlowStep::new("load_config")
|
||||
.script(r#"
|
||||
let config = #{
|
||||
api_url: "https://api.example.com",
|
||||
timeout: 30,
|
||||
retries: 3
|
||||
};
|
||||
let result = config.api_url;
|
||||
"#)
|
||||
.context_id("complex_context")
|
||||
.worker_id("config_service");
|
||||
|
||||
let step2 = OrchestratedFlowStep::new("authenticate")
|
||||
.script(r#"
|
||||
let token = "auth_token_12345";
|
||||
let expires_in = 3600;
|
||||
let result = token;
|
||||
"#)
|
||||
.context_id("complex_context")
|
||||
.worker_id("auth_service");
|
||||
|
||||
// Level 2: Data fetching (depends on config and auth)
|
||||
let step3 = OrchestratedFlowStep::new("fetch_customers")
|
||||
.script(r#"
|
||||
let api_url = dep_1_result;
|
||||
let auth_token = dep_2_result;
|
||||
let customers = ["Customer A", "Customer B", "Customer C"];
|
||||
let result = customers.len();
|
||||
"#)
|
||||
.depends_on(step1.id())
|
||||
.depends_on(step2.id())
|
||||
.context_id("complex_context")
|
||||
.worker_id("customer_service");
|
||||
|
||||
let step4 = OrchestratedFlowStep::new("fetch_products")
|
||||
.script(r#"
|
||||
let api_url = dep_1_result;
|
||||
let auth_token = dep_2_result;
|
||||
let products = ["Product X", "Product Y", "Product Z"];
|
||||
let result = products.len();
|
||||
"#)
|
||||
.depends_on(step1.id())
|
||||
.depends_on(step2.id())
|
||||
.context_id("complex_context")
|
||||
.worker_id("product_service");
|
||||
|
||||
// Level 3: Data processing (depends on fetched data)
|
||||
let step5 = OrchestratedFlowStep::new("calculate_metrics")
|
||||
.script(r#"
|
||||
let customer_count = dep_3_result;
|
||||
let product_count = dep_4_result;
|
||||
let ratio = customer_count / product_count;
|
||||
let result = ratio;
|
||||
"#)
|
||||
.depends_on(step3.id())
|
||||
.depends_on(step4.id())
|
||||
.context_id("complex_context")
|
||||
.worker_id("analytics_service");
|
||||
|
||||
// Level 4: Final reporting
|
||||
let step6 = OrchestratedFlowStep::new("generate_dashboard")
|
||||
.script(r#"
|
||||
let customer_count = dep_3_result;
|
||||
let product_count = dep_4_result;
|
||||
let ratio = dep_5_result;
|
||||
let dashboard = "Dashboard: " + customer_count + " customers, " +
|
||||
product_count + " products, ratio: " + ratio;
|
||||
let result = dashboard;
|
||||
"#)
|
||||
.depends_on(step3.id())
|
||||
.depends_on(step4.id())
|
||||
.depends_on(step5.id())
|
||||
.context_id("complex_context")
|
||||
.worker_id("dashboard_service");
|
||||
|
||||
OrchestratedFlow::new("complex_workflow")
|
||||
.add_step(step1)
|
||||
.add_step(step2)
|
||||
.add_step(step3)
|
||||
.add_step(step4)
|
||||
.add_step(step5)
|
||||
.add_step(step6)
|
||||
}
|
||||
|
||||
/// Wait for flow completion and show results
|
||||
async fn wait_and_show_results(
|
||||
orchestrator: &Orchestrator<LocalInterface>,
|
||||
flow_id: u32,
|
||||
workflow_name: &str,
|
||||
) {
|
||||
println!(" ⏳ Executing {} workflow (ID: {})...", workflow_name, flow_id);
|
||||
|
||||
// Poll for completion
|
||||
loop {
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
|
||||
|
||||
if let Some(execution) = orchestrator.get_flow_status(flow_id).await {
|
||||
match execution.status {
|
||||
FlowStatus::Completed => {
|
||||
println!(" ✅ {} workflow completed successfully!", workflow_name);
|
||||
println!(" 📊 Executed {} steps in {:?}",
|
||||
execution.completed_steps.len(),
|
||||
execution.completed_at.unwrap() - execution.started_at);
|
||||
|
||||
// Show step results
|
||||
for (step_id, outputs) in &execution.step_results {
|
||||
if let Some(result) = outputs.get("result") {
|
||||
let step_name = execution.flow.orchestrated_steps
|
||||
.iter()
|
||||
.find(|s| s.id() == *step_id)
|
||||
.map(|s| s.flow_step.name.as_str())
|
||||
.unwrap_or("unknown");
|
||||
println!(" 📝 Step '{}': {}", step_name, result);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
FlowStatus::Failed => {
|
||||
println!(" ❌ {} workflow failed!", workflow_name);
|
||||
if !execution.failed_steps.is_empty() {
|
||||
println!(" 💥 Failed steps: {:?}", execution.failed_steps);
|
||||
}
|
||||
break;
|
||||
}
|
||||
FlowStatus::Running => {
|
||||
print!(".");
|
||||
std::io::Write::flush(&mut std::io::stdout()).unwrap();
|
||||
}
|
||||
FlowStatus::Pending => {
|
||||
println!(" ⏸️ {} workflow is pending...", workflow_name);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
println!(" ❓ {} workflow not found!", workflow_name);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
61
src/orchestrator/src/interface/dispatcher.rs
Normal file
61
src/orchestrator/src/interface/dispatcher.rs
Normal file
@ -0,0 +1,61 @@
|
||||
//! Dispatcher interface implementation using RhaiDispatcher
|
||||
|
||||
use crate::RhaiInterface;
|
||||
use async_trait::async_trait;
|
||||
use rhai_dispatcher::{PlayRequest, RhaiDispatcher, RhaiDispatcherError};
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Dispatcher-based interface using RhaiDispatcher
|
||||
pub struct DispatcherInterface {
|
||||
dispatcher: Arc<RhaiDispatcher>,
|
||||
}
|
||||
|
||||
impl DispatcherInterface {
|
||||
/// Create a new dispatcher interface
|
||||
pub fn new(dispatcher: Arc<RhaiDispatcher>) -> Self {
|
||||
Self { dispatcher }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl RhaiInterface for DispatcherInterface {
|
||||
async fn submit_play_request(&self, play_request: &PlayRequest) -> Result<(), RhaiDispatcherError> {
|
||||
self.dispatcher.submit_play_request(play_request).await
|
||||
}
|
||||
|
||||
async fn submit_play_request_and_await_result(&self, play_request: &PlayRequest) -> Result<String, RhaiDispatcherError> {
|
||||
self.dispatcher.submit_play_request_and_await_result(play_request).await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_dispatcher_interface_creation() {
|
||||
// This test just verifies we can create the interface
|
||||
// Note: Actual testing would require a properly configured RhaiDispatcher
|
||||
// For now, we'll create a mock or skip the actual dispatcher creation
|
||||
|
||||
// This is a placeholder test - adjust based on actual RhaiDispatcher constructor
|
||||
// let dispatcher = Arc::new(RhaiDispatcher::new());
|
||||
// let interface = DispatcherInterface::new(dispatcher);
|
||||
|
||||
// Just verify the test compiles for now
|
||||
assert!(true);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_dispatcher_interface_methods() {
|
||||
// This test would verify the interface methods work correctly
|
||||
// when a proper RhaiDispatcher is available
|
||||
|
||||
let play_request = PlayRequest {
|
||||
script: "let x = 5; x + 3".to_string(),
|
||||
};
|
||||
|
||||
// Placeholder assertions - would test actual functionality with real dispatcher
|
||||
assert_eq!(play_request.script, "let x = 5; x + 3");
|
||||
}
|
||||
}
|
111
src/orchestrator/src/interface/local.rs
Normal file
111
src/orchestrator/src/interface/local.rs
Normal file
@ -0,0 +1,111 @@
|
||||
//! Local interface implementation for in-process script execution
|
||||
|
||||
use crate::RhaiInterface;
|
||||
use async_trait::async_trait;
|
||||
use rhai_dispatcher::{PlayRequest, RhaiDispatcherError};
|
||||
|
||||
/// Local interface for in-process script execution
|
||||
pub struct LocalInterface {
|
||||
engine: rhai::Engine,
|
||||
}
|
||||
|
||||
impl LocalInterface {
|
||||
/// Create a new local interface
|
||||
pub fn new() -> Self {
|
||||
let engine = rhai::Engine::new();
|
||||
Self { engine }
|
||||
}
|
||||
|
||||
/// Create a new local interface with custom engine
|
||||
pub fn with_engine(engine: rhai::Engine) -> Self {
|
||||
Self { engine }
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for LocalInterface {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl RhaiInterface for LocalInterface {
|
||||
async fn submit_play_request(&self, _play_request: &PlayRequest) -> Result<(), RhaiDispatcherError> {
|
||||
// For local interface, fire-and-forget doesn't make much sense
|
||||
// We'll just execute and ignore the result
|
||||
let _ = self.submit_play_request_and_await_result(_play_request).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn submit_play_request_and_await_result(&self, play_request: &PlayRequest) -> Result<String, RhaiDispatcherError> {
|
||||
let mut scope = rhai::Scope::new();
|
||||
|
||||
// Execute the script
|
||||
let result = self
|
||||
.engine
|
||||
.eval_with_scope::<rhai::Dynamic>(&mut scope, &play_request.script)
|
||||
.map_err(|e| RhaiDispatcherError::TaskNotFound(format!("Script execution error: {}", e)))?;
|
||||
|
||||
// Return the result as a string
|
||||
if result.is_unit() {
|
||||
Ok(String::new())
|
||||
} else {
|
||||
Ok(result.to_string())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_local_interface_basic() {
|
||||
let interface = LocalInterface::new();
|
||||
let play_request = PlayRequest {
|
||||
script: "let x = 5; x + 3".to_string(),
|
||||
};
|
||||
|
||||
let result = interface.submit_play_request_and_await_result(&play_request).await;
|
||||
assert!(result.is_ok());
|
||||
|
||||
let output = result.unwrap();
|
||||
assert_eq!(output, "8");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_local_interface_fire_and_forget() {
|
||||
let interface = LocalInterface::new();
|
||||
let play_request = PlayRequest {
|
||||
script: "let x = 5; x + 3".to_string(),
|
||||
};
|
||||
|
||||
let result = interface.submit_play_request(&play_request).await;
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_local_interface_with_error() {
|
||||
let interface = LocalInterface::new();
|
||||
let play_request = PlayRequest {
|
||||
script: "invalid_syntax +++".to_string(),
|
||||
};
|
||||
|
||||
let result = interface.submit_play_request_and_await_result(&play_request).await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_local_interface_empty_result() {
|
||||
let interface = LocalInterface::new();
|
||||
let play_request = PlayRequest {
|
||||
script: "let x = 42;".to_string(),
|
||||
};
|
||||
|
||||
let result = interface.submit_play_request_and_await_result(&play_request).await;
|
||||
assert!(result.is_ok());
|
||||
|
||||
let output = result.unwrap();
|
||||
assert_eq!(output, "");
|
||||
}
|
||||
}
|
9
src/orchestrator/src/interface/mod.rs
Normal file
9
src/orchestrator/src/interface/mod.rs
Normal file
@ -0,0 +1,9 @@
|
||||
//! Interface implementations for different backends
|
||||
|
||||
pub mod local;
|
||||
pub mod ws;
|
||||
pub mod dispatcher;
|
||||
|
||||
pub use local::*;
|
||||
pub use ws::*;
|
||||
pub use dispatcher::*;
|
117
src/orchestrator/src/interface/ws.rs
Normal file
117
src/orchestrator/src/interface/ws.rs
Normal file
@ -0,0 +1,117 @@
|
||||
//! WebSocket interface implementation for remote script execution
|
||||
|
||||
use crate::RhaiInterface;
|
||||
use async_trait::async_trait;
|
||||
use rhai_dispatcher::{PlayRequest, RhaiDispatcherError};
|
||||
use reqwest::Client;
|
||||
use serde_json::json;
|
||||
|
||||
/// WebSocket-based interface for remote script execution
|
||||
pub struct WsInterface {
|
||||
client: Client,
|
||||
base_url: String,
|
||||
}
|
||||
|
||||
impl WsInterface {
|
||||
/// Create a new WebSocket interface
|
||||
pub fn new(base_url: String) -> Self {
|
||||
Self {
|
||||
client: Client::new(),
|
||||
base_url,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl RhaiInterface for WsInterface {
|
||||
async fn submit_play_request(&self, play_request: &PlayRequest) -> Result<(), RhaiDispatcherError> {
|
||||
let payload = json!({
|
||||
"script": play_request.script
|
||||
});
|
||||
|
||||
let response = self
|
||||
.client
|
||||
.post(&format!("{}/submit", self.base_url))
|
||||
.json(&payload)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| RhaiDispatcherError::TaskNotFound(format!("Network error: {}", e)))?;
|
||||
|
||||
if response.status().is_success() {
|
||||
Ok(())
|
||||
} else {
|
||||
let error_text = response
|
||||
.text()
|
||||
.await
|
||||
.unwrap_or_else(|_| "Unknown error".to_string());
|
||||
Err(RhaiDispatcherError::TaskNotFound(format!("HTTP error: {}", error_text)))
|
||||
}
|
||||
}
|
||||
|
||||
async fn submit_play_request_and_await_result(&self, play_request: &PlayRequest) -> Result<String, RhaiDispatcherError> {
|
||||
let payload = json!({
|
||||
"script": play_request.script
|
||||
});
|
||||
|
||||
let response = self
|
||||
.client
|
||||
.post(&format!("{}/execute", self.base_url))
|
||||
.json(&payload)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| RhaiDispatcherError::TaskNotFound(format!("Network error: {}", e)))?;
|
||||
|
||||
if response.status().is_success() {
|
||||
let result: String = response
|
||||
.text()
|
||||
.await
|
||||
.map_err(|e| RhaiDispatcherError::TaskNotFound(format!("Response parsing error: {}", e)))?;
|
||||
Ok(result)
|
||||
} else {
|
||||
let error_text = response
|
||||
.text()
|
||||
.await
|
||||
.unwrap_or_else(|_| "Unknown error".to_string());
|
||||
Err(RhaiDispatcherError::TaskNotFound(format!("HTTP error: {}", error_text)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_ws_interface_creation() {
|
||||
let interface = WsInterface::new("http://localhost:8080".to_string());
|
||||
assert_eq!(interface.base_url, "http://localhost:8080");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_ws_interface_call_with_mock_server() {
|
||||
// This test would require a mock HTTP server
|
||||
// For now, just test that we can create the interface
|
||||
let interface = WsInterface::new("http://localhost:8080".to_string());
|
||||
|
||||
let play_request = PlayRequest {
|
||||
script: "let x = 1;".to_string(),
|
||||
};
|
||||
|
||||
// This will fail without a real server, but that's expected in unit tests
|
||||
let result = interface.submit_play_request_and_await_result(&play_request).await;
|
||||
assert!(result.is_err()); // Expected to fail without server
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_ws_interface_fire_and_forget() {
|
||||
let interface = WsInterface::new("http://localhost:8080".to_string());
|
||||
|
||||
let play_request = PlayRequest {
|
||||
script: "let x = 1;".to_string(),
|
||||
};
|
||||
|
||||
// This will fail without a real server, but that's expected in unit tests
|
||||
let result = interface.submit_play_request(&play_request).await;
|
||||
assert!(result.is_err()); // Expected to fail without server
|
||||
}
|
||||
}
|
35
src/orchestrator/src/lib.rs
Normal file
35
src/orchestrator/src/lib.rs
Normal file
@ -0,0 +1,35 @@
|
||||
//! # Orchestrator
|
||||
//!
|
||||
//! A simple DAG-based workflow execution system that extends the heromodels flow structures
|
||||
//! to support workflows with dependencies and distributed script execution.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use rhai_dispatcher::{PlayRequest, RhaiDispatcherError};
|
||||
|
||||
pub mod interface;
|
||||
pub mod orchestrator;
|
||||
|
||||
pub use interface::*;
|
||||
pub use orchestrator::*;
|
||||
|
||||
/// Trait for executing Rhai scripts through different backends
|
||||
/// Uses the same signature as RhaiDispatcher for consistency
|
||||
#[async_trait]
|
||||
pub trait RhaiInterface {
|
||||
/// Submit a play request without waiting for result (fire-and-forget)
|
||||
async fn submit_play_request(&self, play_request: &PlayRequest) -> Result<(), RhaiDispatcherError>;
|
||||
|
||||
/// Submit a play request and await the result
|
||||
/// Returns just the output string on success
|
||||
async fn submit_play_request_and_await_result(&self, play_request: &PlayRequest) -> Result<String, RhaiDispatcherError>;
|
||||
}
|
||||
|
||||
// Re-export the flow models from DSL
|
||||
pub use rhailib_dsl::flow::{OrchestratedFlow, OrchestratedFlowStep, OrchestratorError, FlowStatus};
|
||||
|
||||
// Conversion from RhaiDispatcherError to OrchestratorError
|
||||
impl From<RhaiDispatcherError> for OrchestratorError {
|
||||
fn from(err: RhaiDispatcherError) -> Self {
|
||||
OrchestratorError::ExecutorError(err.to_string())
|
||||
}
|
||||
}
|
418
src/orchestrator/src/orchestrator.rs
Normal file
418
src/orchestrator/src/orchestrator.rs
Normal file
@ -0,0 +1,418 @@
|
||||
//! Main orchestrator implementation for DAG-based workflow execution
|
||||
|
||||
use crate::{
|
||||
OrchestratedFlow, OrchestratedFlowStep, OrchestratorError, FlowStatus, RhaiInterface,
|
||||
};
|
||||
use rhai_dispatcher::PlayRequest;
|
||||
use futures::future::try_join_all;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
/// Main orchestrator for executing DAG-based workflows
|
||||
pub struct Orchestrator<I: RhaiInterface> {
|
||||
/// Interface for running scripts
|
||||
interface: Arc<I>,
|
||||
|
||||
/// Active flow executions
|
||||
active_flows: Arc<RwLock<HashMap<u32, FlowExecution>>>,
|
||||
}
|
||||
|
||||
/// Represents an active flow execution
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct FlowExecution {
|
||||
/// The flow being executed
|
||||
pub flow: OrchestratedFlow,
|
||||
|
||||
/// Current status
|
||||
pub status: FlowStatus,
|
||||
|
||||
/// Completed step IDs
|
||||
pub completed_steps: HashSet<u32>,
|
||||
|
||||
/// Failed step IDs
|
||||
pub failed_steps: HashSet<u32>,
|
||||
|
||||
/// Step results
|
||||
pub step_results: HashMap<u32, HashMap<String, String>>,
|
||||
|
||||
/// Execution start time
|
||||
pub started_at: chrono::DateTime<chrono::Utc>,
|
||||
|
||||
/// Execution end time
|
||||
pub completed_at: Option<chrono::DateTime<chrono::Utc>>,
|
||||
}
|
||||
|
||||
impl FlowExecution {
|
||||
/// Create a new flow execution
|
||||
pub fn new(flow: OrchestratedFlow) -> Self {
|
||||
Self {
|
||||
flow,
|
||||
status: FlowStatus::Pending,
|
||||
completed_steps: HashSet::new(),
|
||||
failed_steps: HashSet::new(),
|
||||
step_results: HashMap::new(),
|
||||
started_at: chrono::Utc::now(),
|
||||
completed_at: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if a step is ready to execute (all dependencies completed)
|
||||
pub fn is_step_ready(&self, step: &OrchestratedFlowStep) -> bool {
|
||||
if self.completed_steps.contains(&step.id()) || self.failed_steps.contains(&step.id()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
step.depends_on.iter().all(|dep_id| self.completed_steps.contains(dep_id))
|
||||
}
|
||||
|
||||
/// Get all ready steps
|
||||
pub fn get_ready_steps(&self) -> Vec<&OrchestratedFlowStep> {
|
||||
self.flow
|
||||
.orchestrated_steps
|
||||
.iter()
|
||||
.filter(|step| self.is_step_ready(step))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Mark a step as completed
|
||||
pub fn complete_step(&mut self, step_id: u32, outputs: HashMap<String, String>) {
|
||||
self.completed_steps.insert(step_id);
|
||||
self.step_results.insert(step_id, outputs);
|
||||
|
||||
// Check if flow is complete
|
||||
if self.completed_steps.len() == self.flow.orchestrated_steps.len() {
|
||||
self.status = FlowStatus::Completed;
|
||||
self.completed_at = Some(chrono::Utc::now());
|
||||
}
|
||||
}
|
||||
|
||||
/// Mark a step as failed
|
||||
pub fn fail_step(&mut self, step_id: u32) {
|
||||
self.failed_steps.insert(step_id);
|
||||
self.status = FlowStatus::Failed;
|
||||
self.completed_at = Some(chrono::Utc::now());
|
||||
}
|
||||
|
||||
/// Check if the flow execution is finished
|
||||
pub fn is_finished(&self) -> bool {
|
||||
matches!(self.status, FlowStatus::Completed | FlowStatus::Failed)
|
||||
}
|
||||
}
|
||||
|
||||
impl<I: RhaiInterface + Send + Sync + 'static> Orchestrator<I> {
|
||||
/// Create a new orchestrator
|
||||
pub fn new(interface: Arc<I>) -> Self {
|
||||
Self {
|
||||
interface,
|
||||
active_flows: Arc::new(RwLock::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
/// Start executing a flow
|
||||
pub async fn execute_flow(&self, flow: OrchestratedFlow) -> Result<u32, OrchestratorError> {
|
||||
let flow_id = flow.id();
|
||||
flow.validate_dag()?;
|
||||
|
||||
info!("Starting execution of flow {} with {} steps", flow_id, flow.orchestrated_steps.len());
|
||||
|
||||
// Create flow execution
|
||||
let mut execution = FlowExecution::new(flow);
|
||||
execution.status = FlowStatus::Running;
|
||||
|
||||
// Store the execution
|
||||
{
|
||||
let mut active_flows = self.active_flows.write().await;
|
||||
active_flows.insert(flow_id, execution);
|
||||
}
|
||||
|
||||
// Start execution in background
|
||||
let orchestrator = self.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = orchestrator.execute_flow_steps(flow_id).await {
|
||||
error!("Flow {} execution failed: {}", flow_id, e);
|
||||
|
||||
// Mark flow as failed
|
||||
let mut active_flows = orchestrator.active_flows.write().await;
|
||||
if let Some(execution) = active_flows.get_mut(&flow_id) {
|
||||
execution.status = FlowStatus::Failed;
|
||||
execution.completed_at = Some(chrono::Utc::now());
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(flow_id)
|
||||
}
|
||||
|
||||
/// Execute flow steps using DAG traversal
|
||||
async fn execute_flow_steps(&self, flow_id: u32) -> Result<(), OrchestratorError> {
|
||||
loop {
|
||||
let ready_steps = {
|
||||
let active_flows = self.active_flows.read().await;
|
||||
let execution = active_flows
|
||||
.get(&flow_id)
|
||||
.ok_or(OrchestratorError::StepNotFound(flow_id))?;
|
||||
|
||||
if execution.is_finished() {
|
||||
info!("Flow {} execution completed with status: {:?}", flow_id, execution.status);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
execution.get_ready_steps().into_iter().cloned().collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
if ready_steps.is_empty() {
|
||||
// Check if we're deadlocked
|
||||
let active_flows = self.active_flows.read().await;
|
||||
let execution = active_flows
|
||||
.get(&flow_id)
|
||||
.ok_or(OrchestratorError::StepNotFound(flow_id))?;
|
||||
|
||||
if !execution.is_finished() {
|
||||
warn!("No ready steps found for flow {} - possible deadlock", flow_id);
|
||||
return Err(OrchestratorError::NoReadySteps);
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
debug!("Executing {} ready steps for flow {}", ready_steps.len(), flow_id);
|
||||
|
||||
// Execute ready steps concurrently
|
||||
let step_futures = ready_steps.into_iter().map(|step| {
|
||||
let orchestrator = self.clone();
|
||||
async move {
|
||||
orchestrator.execute_step(flow_id, step).await
|
||||
}
|
||||
});
|
||||
|
||||
// Wait for all steps to complete
|
||||
let results = try_join_all(step_futures).await?;
|
||||
|
||||
// Update execution state
|
||||
{
|
||||
let mut active_flows = self.active_flows.write().await;
|
||||
let execution = active_flows
|
||||
.get_mut(&flow_id)
|
||||
.ok_or(OrchestratorError::StepNotFound(flow_id))?;
|
||||
|
||||
for (step_id, outputs) in results {
|
||||
execution.complete_step(step_id, outputs);
|
||||
}
|
||||
}
|
||||
|
||||
// Small delay to prevent tight loop
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute a single step
|
||||
async fn execute_step(
|
||||
&self,
|
||||
flow_id: u32,
|
||||
step: OrchestratedFlowStep,
|
||||
) -> Result<(u32, HashMap<String, String>), OrchestratorError> {
|
||||
let step_id = step.id();
|
||||
info!("Executing step {} for flow {}", step_id, flow_id);
|
||||
|
||||
// Prepare inputs with dependency outputs
|
||||
let mut inputs = step.inputs.clone();
|
||||
|
||||
// Add outputs from dependency steps
|
||||
{
|
||||
let active_flows = self.active_flows.read().await;
|
||||
let execution = active_flows
|
||||
.get(&flow_id)
|
||||
.ok_or(OrchestratorError::StepNotFound(flow_id))?;
|
||||
|
||||
for dep_id in &step.depends_on {
|
||||
if let Some(dep_outputs) = execution.step_results.get(dep_id) {
|
||||
for (key, value) in dep_outputs {
|
||||
inputs.insert(format!("dep_{}_{}", dep_id, key), value.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create play request
|
||||
let play_request = PlayRequest {
|
||||
id: format!("{}_{}", flow_id, step_id),
|
||||
worker_id: step.worker_id.clone(),
|
||||
context_id: step.context_id.clone(),
|
||||
script: step.script.clone(),
|
||||
timeout: std::time::Duration::from_secs(30), // Default timeout
|
||||
};
|
||||
|
||||
// Execute the script
|
||||
match self.interface.submit_play_request_and_await_result(&play_request).await {
|
||||
Ok(output) => {
|
||||
info!("Step {} completed successfully", step_id);
|
||||
let mut outputs = HashMap::new();
|
||||
outputs.insert("result".to_string(), output);
|
||||
Ok((step_id, outputs))
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Step {} failed: {}", step_id, e);
|
||||
|
||||
// Mark step as failed
|
||||
{
|
||||
let mut active_flows = self.active_flows.write().await;
|
||||
if let Some(execution) = active_flows.get_mut(&flow_id) {
|
||||
execution.fail_step(step_id);
|
||||
}
|
||||
}
|
||||
|
||||
Err(OrchestratorError::StepFailed(step_id, Some(e.to_string())))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the status of a flow execution
|
||||
pub async fn get_flow_status(&self, flow_id: u32) -> Option<FlowExecution> {
|
||||
let active_flows = self.active_flows.read().await;
|
||||
active_flows.get(&flow_id).cloned()
|
||||
}
|
||||
|
||||
/// Cancel a flow execution
|
||||
pub async fn cancel_flow(&self, flow_id: u32) -> Result<(), OrchestratorError> {
|
||||
let mut active_flows = self.active_flows.write().await;
|
||||
if let Some(execution) = active_flows.get_mut(&flow_id) {
|
||||
execution.status = FlowStatus::Failed;
|
||||
execution.completed_at = Some(chrono::Utc::now());
|
||||
info!("Flow {} cancelled", flow_id);
|
||||
Ok(())
|
||||
} else {
|
||||
Err(OrchestratorError::StepNotFound(flow_id))
|
||||
}
|
||||
}
|
||||
|
||||
/// List all active flows
|
||||
pub async fn list_active_flows(&self) -> Vec<(u32, FlowStatus)> {
|
||||
let active_flows = self.active_flows.read().await;
|
||||
active_flows
|
||||
.iter()
|
||||
.map(|(id, execution)| (*id, execution.status.clone()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Clean up completed flows
|
||||
pub async fn cleanup_completed_flows(&self) {
|
||||
let mut active_flows = self.active_flows.write().await;
|
||||
active_flows.retain(|_, execution| !execution.is_finished());
|
||||
}
|
||||
}
|
||||
|
||||
impl<I: RhaiInterface + Send + Sync> Clone for Orchestrator<I> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
interface: self.interface.clone(),
|
||||
active_flows: self.active_flows.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::interface::LocalInterface;
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_simple_flow_execution() {
|
||||
let interface = Arc::new(LocalInterface::new());
|
||||
let orchestrator = Orchestrator::new(interface);
|
||||
|
||||
// Create a simple flow with two steps
|
||||
let step1 = OrchestratedFlowStep::new("step1")
|
||||
.script("let result = 10;")
|
||||
.context_id("test")
|
||||
.worker_id("worker1");
|
||||
|
||||
let step2 = OrchestratedFlowStep::new("step2")
|
||||
.script("let result = dep_1_result + 5;")
|
||||
.depends_on(step1.id())
|
||||
.context_id("test")
|
||||
.worker_id("worker1");
|
||||
|
||||
let flow = OrchestratedFlow::new("test_flow")
|
||||
.add_step(step1)
|
||||
.add_step(step2);
|
||||
|
||||
// Execute the flow
|
||||
let flow_id = orchestrator.execute_flow(flow).await.unwrap();
|
||||
|
||||
// Wait for completion
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
|
||||
|
||||
let status = orchestrator.get_flow_status(flow_id).await.unwrap();
|
||||
assert_eq!(status.status, FlowStatus::Completed);
|
||||
assert_eq!(status.completed_steps.len(), 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_parallel_execution() {
|
||||
let interface = Arc::new(LocalInterface::new());
|
||||
let orchestrator = Orchestrator::new(interface);
|
||||
|
||||
// Create a flow with parallel steps
|
||||
let step1 = OrchestratedFlowStep::new("step1")
|
||||
.script("let result = 10;")
|
||||
.context_id("test")
|
||||
.worker_id("worker1");
|
||||
|
||||
let step2 = OrchestratedFlowStep::new("step2")
|
||||
.script("let result = 20;")
|
||||
.context_id("test")
|
||||
.worker_id("worker2");
|
||||
|
||||
let step3 = OrchestratedFlowStep::new("step3")
|
||||
.script("let result = dep_1_result + dep_2_result;")
|
||||
.depends_on(step1.id())
|
||||
.depends_on(step2.id())
|
||||
.context_id("test")
|
||||
.worker_id("worker3");
|
||||
|
||||
let flow = OrchestratedFlow::new("parallel_flow")
|
||||
.add_step(step1)
|
||||
.add_step(step2)
|
||||
.add_step(step3);
|
||||
|
||||
// Execute the flow
|
||||
let flow_id = orchestrator.execute_flow(flow).await.unwrap();
|
||||
|
||||
// Wait for completion
|
||||
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
|
||||
|
||||
let status = orchestrator.get_flow_status(flow_id).await.unwrap();
|
||||
assert_eq!(status.status, FlowStatus::Completed);
|
||||
assert_eq!(status.completed_steps.len(), 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_flow_execution_state() {
|
||||
let step1 = OrchestratedFlowStep::new("step1").script("let x = 1;");
|
||||
let step2 = OrchestratedFlowStep::new("step2")
|
||||
.script("let y = 2;")
|
||||
.depends_on(step1.id());
|
||||
|
||||
let flow = OrchestratedFlow::new("test_flow")
|
||||
.add_step(step1.clone())
|
||||
.add_step(step2.clone());
|
||||
|
||||
let mut execution = FlowExecution::new(flow);
|
||||
|
||||
// Initially, only step1 should be ready
|
||||
assert!(execution.is_step_ready(&step1));
|
||||
assert!(!execution.is_step_ready(&step2));
|
||||
|
||||
// After completing step1, step2 should be ready
|
||||
execution.complete_step(step1.id(), HashMap::new());
|
||||
assert!(!execution.is_step_ready(&step1)); // Already completed
|
||||
assert!(execution.is_step_ready(&step2));
|
||||
|
||||
// After completing step2, flow should be complete
|
||||
execution.complete_step(step2.id(), HashMap::new());
|
||||
assert_eq!(execution.status, FlowStatus::Completed);
|
||||
}
|
||||
}
|
42
src/orchestrator/src/services.rs
Normal file
42
src/orchestrator/src/services.rs
Normal file
@ -0,0 +1,42 @@
|
||||
//! Main orchestrator implementation for DAG-based workflow execution
|
||||
|
||||
use crate::{
|
||||
OrchestratedFlow, OrchestratedFlowStep, OrchestratorError, FlowStatus, RhaiInterface, ScriptRequest,
|
||||
};
|
||||
use futures::future::try_join_all;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
impl<I: RhaiInterface + Send + Sync + 'static> Orchestrator<I> {
|
||||
/// Get a flow by ID
|
||||
pub fn get_flow(&self, flow_id: u32) -> Result<OrchestratedFlow, OrchestratorError> {
|
||||
self.interface
|
||||
.new_play_request()
|
||||
.script(format!("json_encode(get_flow({}))", flow_id))
|
||||
.submit_play_request_and_await_result()
|
||||
.await
|
||||
.map(|result| serde_json::from_str(&result).unwrap())
|
||||
}
|
||||
|
||||
pub fn get_flows(&self) -> Result<Vec<OrchestratedFlow>, OrchestratorError> {
|
||||
self.interface
|
||||
.new_play_request()
|
||||
.script("json_encode(get_flows())")
|
||||
.submit_play_request_and_await_result()
|
||||
.await
|
||||
.map(|result| serde_json::from_str(&result).unwrap())
|
||||
}
|
||||
|
||||
pub fn get_active_flows(&self) -> Result<Vec<OrchestratedFlow>, OrchestratorError> {
|
||||
self.interface
|
||||
.new_play_request()
|
||||
.script("json_encode(get_flows())")
|
||||
.submit_play_request_and_await_result()
|
||||
.await
|
||||
.map(|result| serde_json::from_str(&result).unwrap())
|
||||
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user