//! End-to-End Demo: Supervisor + Runner + Client //! //! This example demonstrates the complete workflow: //! 1. Starts a supervisor with Mycelium integration //! 2. Starts an OSIS runner //! 3. Uses the supervisor client to run jobs //! 4. Shows both job.run (blocking) and job.start (non-blocking) modes //! //! Prerequisites: //! - Redis running on localhost:6379 //! //! Usage: //! ```bash //! RUST_LOG=info cargo run --example end_to_end_demo //! ``` use anyhow::{Result, Context}; use log::{info, error}; use std::process::{Command, Child, Stdio}; use std::time::Duration; use tokio::time::sleep; use hero_supervisor_openrpc_client::{SupervisorClient, JobBuilder}; /// Configuration for the demo struct DemoConfig { redis_url: String, supervisor_port: u16, runner_id: String, db_path: String, } impl Default for DemoConfig { fn default() -> Self { Self { redis_url: "redis://localhost:6379".to_string(), supervisor_port: 3030, runner_id: "example_runner".to_string(), db_path: "/tmp/example_runner.db".to_string(), } } } /// Supervisor process wrapper struct SupervisorProcess { child: Child, } impl SupervisorProcess { fn start(config: &DemoConfig) -> Result { info!("šŸš€ Starting supervisor on port {}...", config.supervisor_port); let child = Command::new("cargo") .args(&[ "run", "--bin", "hero-supervisor", "--", "--redis-url", &config.redis_url, "--port", &config.supervisor_port.to_string(), ]) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() .context("Failed to start supervisor")?; Ok(Self { child }) } } impl Drop for SupervisorProcess { fn drop(&mut self) { info!("šŸ›‘ Stopping supervisor..."); let _ = self.child.kill(); let _ = self.child.wait(); } } /// Runner process wrapper struct RunnerProcess { child: Child, } impl RunnerProcess { fn start(config: &DemoConfig) -> Result { info!("šŸ¤– Starting OSIS runner '{}'...", config.runner_id); let child = Command::new("cargo") .args(&[ "run", "--bin", "runner_osis", "--", &config.runner_id, "--db-path", &config.db_path, "--redis-url", &config.redis_url, ]) .env("RUST_LOG", "info") .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() .context("Failed to start runner")?; Ok(Self { child }) } } impl Drop for RunnerProcess { fn drop(&mut self) { info!("šŸ›‘ Stopping runner..."); let _ = self.child.kill(); let _ = self.child.wait(); } } /// Helper functions for the demo async fn register_runner_helper(client: &SupervisorClient, runner_id: &str, secret: &str) -> Result<()> { info!("šŸ“ Registering runner '{}'...", runner_id); let queue = format!("hero:q:work:type:osis:group:default:inst:{}", runner_id); client.register_runner(secret, runner_id, &queue).await?; info!("āœ… Runner registered successfully"); Ok(()) } async fn run_job_helper(client: &SupervisorClient, job: runner_rust::job::Job, secret: &str, timeout: u64) -> Result { info!("šŸš€ Running job {} (blocking)...", job.id); let response = client.job_run(secret, job, Some(timeout)).await?; let result = response.result .ok_or_else(|| anyhow::anyhow!("No result in response"))?; info!("āœ… Job completed with result: {}", result); Ok(result) } async fn start_job_helper(client: &SupervisorClient, job: runner_rust::job::Job, secret: &str) -> Result { info!("šŸš€ Starting job {} (non-blocking)...", job.id); let response = client.job_start(secret, job).await?; info!("āœ… Job queued with ID: {}", response.job_id); Ok(response.job_id) } #[tokio::main] async fn main() -> Result<()> { // Initialize logging env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); println!("\n╔════════════════════════════════════════════════════════════╗"); println!("ā•‘ End-to-End Demo: Supervisor + Runner + Client ā•‘"); println!("ā•šā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•ā•\n"); let config = DemoConfig::default(); // Step 1: Start supervisor println!("šŸ“‹ Step 1: Starting Supervisor"); println!("─────────────────────────────────────────────────────────────"); let _supervisor = SupervisorProcess::start(&config)?; sleep(Duration::from_secs(3)).await; println!("āœ… Supervisor started on port {}\n", config.supervisor_port); // Step 2: Start runner println!("šŸ“‹ Step 2: Starting OSIS Runner"); println!("─────────────────────────────────────────────────────────────"); let _runner = RunnerProcess::start(&config)?; sleep(Duration::from_secs(3)).await; println!("āœ… Runner '{}' started\n", config.runner_id); // Step 3: Create client and register runner println!("šŸ“‹ Step 3: Registering Runner with Supervisor"); println!("─────────────────────────────────────────────────────────────"); let client = SupervisorClient::new(&format!("http://localhost:{}", config.supervisor_port))?; register_runner_helper(&client, &config.runner_id, "admin_secret").await?; println!("āœ… Runner registered\n"); sleep(Duration::from_secs(2)).await; // Step 4: Run blocking jobs (job.run) println!("šŸ“‹ Step 4: Running Blocking Jobs (job.run)"); println!("─────────────────────────────────────────────────────────────"); // Job 1: Simple calculation println!("\nšŸ”¹ Job 1: Simple Calculation"); let job1 = JobBuilder::new() .caller_id("demo_client") .context_id("demo_context") .payload("let result = 2 + 2; to_json(result)") .runner(&config.runner_id) .executor("rhai") .timeout(30) .build()?; let result1 = run_job_helper(&client, job1, "admin_secret", 30).await?; println!(" Result: {}", result1); // Job 2: String manipulation println!("\nšŸ”¹ Job 2: String Manipulation"); let job2 = JobBuilder::new() .caller_id("demo_client") .context_id("demo_context") .payload(r#"let msg = "Hello from OSIS Runner!"; to_json(msg)"#) .runner(&config.runner_id) .executor("rhai") .timeout(30) .build()?; let result2 = run_job_helper(&client, job2, "admin_secret", 30).await?; println!(" Result: {}", result2); // Job 3: Array operations println!("\nšŸ”¹ Job 3: Array Operations"); let job3 = JobBuilder::new() .caller_id("demo_client") .context_id("demo_context") .payload(r#" let numbers = [1, 2, 3, 4, 5]; let sum = 0; for n in numbers { sum += n; } to_json(#{sum: sum, count: numbers.len()}) "#) .runner(&config.runner_id) .executor("rhai") .timeout(30) .build()?; let result3 = run_job_helper(&client, job3, "admin_secret", 30).await?; println!(" Result: {}", result3); println!("\nāœ… All blocking jobs completed successfully\n"); // Step 5: Start non-blocking jobs (job.start) println!("šŸ“‹ Step 5: Starting Non-Blocking Jobs (job.start)"); println!("─────────────────────────────────────────────────────────────"); println!("\nšŸ”¹ Job 4: Background Task"); let job4 = JobBuilder::new() .caller_id("demo_client") .context_id("demo_context") .payload(r#" let result = "Background task completed"; to_json(result) "#) .runner(&config.runner_id) .executor("rhai") .timeout(30) .build()?; let job4_id = start_job_helper(&client, job4, "admin_secret").await?; println!(" Job ID: {} (running in background)", job4_id); println!("\nāœ… Non-blocking job started\n"); // Step 6: Summary println!("šŸ“‹ Step 6: Demo Summary"); println!("─────────────────────────────────────────────────────────────"); println!("āœ… Supervisor: Running on port {}", config.supervisor_port); println!("āœ… Runner: '{}' registered and processing jobs", config.runner_id); println!("āœ… Blocking jobs: 3 completed successfully"); println!("āœ… Non-blocking jobs: 1 started"); println!("\nšŸŽ‰ Demo completed successfully!"); // Keep processes running for a bit to see logs println!("\nā³ Keeping processes running for 5 seconds..."); sleep(Duration::from_secs(5)).await; println!("\nšŸ›‘ Shutting down..."); Ok(()) }