- Simplified RunnerConfig to just name, command, and optional env - Removed RunnerType and ProcessManagerType enums - Removed db_path, redis_url, binary_path from config - Made runner name also serve as queue name (no separate queue param) - Added secret-based authentication to all runner management methods - Created comprehensive osiris_openrpc example - Archived old examples to _archive/ - Updated client API to match simplified supervisor interface
279 lines
9.9 KiB
Rust
279 lines
9.9 KiB
Rust
//! End-to-End Demo: Supervisor + Runner + Client
|
|
//!
|
|
//! This example demonstrates the complete workflow:
|
|
//! 1. Starts a supervisor with Mycelium integration
|
|
//! 2. Starts an OSIS runner
|
|
//! 3. Uses the supervisor client to run jobs
|
|
//! 4. Shows both job.run (blocking) and job.start (non-blocking) modes
|
|
//!
|
|
//! Prerequisites:
|
|
//! - Redis running on localhost:6379
|
|
//!
|
|
//! Usage:
|
|
//! ```bash
|
|
//! RUST_LOG=info cargo run --example end_to_end_demo
|
|
//! ```
|
|
|
|
use anyhow::{Result, Context};
|
|
use log::{info, error};
|
|
use std::process::{Command, Child, Stdio};
|
|
use std::time::Duration;
|
|
use tokio::time::sleep;
|
|
use hero_supervisor_openrpc_client::{SupervisorClient, JobBuilder};
|
|
|
|
/// Configuration for the demo
|
|
struct DemoConfig {
|
|
redis_url: String,
|
|
supervisor_port: u16,
|
|
runner_id: String,
|
|
db_path: String,
|
|
}
|
|
|
|
impl Default for DemoConfig {
|
|
fn default() -> Self {
|
|
Self {
|
|
redis_url: "redis://localhost:6379".to_string(),
|
|
supervisor_port: 3030,
|
|
runner_id: "example_runner".to_string(),
|
|
db_path: "/tmp/example_runner.db".to_string(),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Supervisor process wrapper
|
|
struct SupervisorProcess {
|
|
child: Child,
|
|
}
|
|
|
|
impl SupervisorProcess {
|
|
fn start(config: &DemoConfig) -> Result<Self> {
|
|
info!("🚀 Starting supervisor on port {}...", config.supervisor_port);
|
|
|
|
let child = Command::new("cargo")
|
|
.args(&[
|
|
"run",
|
|
"--bin",
|
|
"hero-supervisor",
|
|
"--",
|
|
"--redis-url",
|
|
&config.redis_url,
|
|
"--port",
|
|
&config.supervisor_port.to_string(),
|
|
])
|
|
.stdout(Stdio::piped())
|
|
.stderr(Stdio::piped())
|
|
.spawn()
|
|
.context("Failed to start supervisor")?;
|
|
|
|
Ok(Self { child })
|
|
}
|
|
}
|
|
|
|
impl Drop for SupervisorProcess {
|
|
fn drop(&mut self) {
|
|
info!("🛑 Stopping supervisor...");
|
|
let _ = self.child.kill();
|
|
let _ = self.child.wait();
|
|
}
|
|
}
|
|
|
|
/// Runner process wrapper
|
|
struct RunnerProcess {
|
|
child: Child,
|
|
}
|
|
|
|
impl RunnerProcess {
|
|
fn start(config: &DemoConfig) -> Result<Self> {
|
|
info!("🤖 Starting OSIS runner '{}'...", config.runner_id);
|
|
|
|
let child = Command::new("cargo")
|
|
.args(&[
|
|
"run",
|
|
"--bin",
|
|
"runner_osis",
|
|
"--",
|
|
&config.runner_id,
|
|
"--db-path",
|
|
&config.db_path,
|
|
"--redis-url",
|
|
&config.redis_url,
|
|
])
|
|
.env("RUST_LOG", "info")
|
|
.stdout(Stdio::piped())
|
|
.stderr(Stdio::piped())
|
|
.spawn()
|
|
.context("Failed to start runner")?;
|
|
|
|
Ok(Self { child })
|
|
}
|
|
}
|
|
|
|
impl Drop for RunnerProcess {
|
|
fn drop(&mut self) {
|
|
info!("🛑 Stopping runner...");
|
|
let _ = self.child.kill();
|
|
let _ = self.child.wait();
|
|
}
|
|
}
|
|
|
|
/// Helper functions for the demo
|
|
async fn register_runner_helper(client: &SupervisorClient, runner_id: &str, secret: &str) -> Result<()> {
|
|
info!("📝 Registering runner '{}'...", runner_id);
|
|
|
|
let queue = format!("hero:q:work:type:osis:group:default:inst:{}", runner_id);
|
|
client.register_runner(secret, runner_id, &queue).await?;
|
|
|
|
info!("✅ Runner registered successfully");
|
|
Ok(())
|
|
}
|
|
|
|
async fn run_job_helper(client: &SupervisorClient, job: runner_rust::job::Job, secret: &str, timeout: u64) -> Result<String> {
|
|
info!("🚀 Running job {} (blocking)...", job.id);
|
|
|
|
let response = client.job_run(secret, job, Some(timeout)).await?;
|
|
|
|
let result = response.result
|
|
.ok_or_else(|| anyhow::anyhow!("No result in response"))?;
|
|
|
|
info!("✅ Job completed with result: {}", result);
|
|
Ok(result)
|
|
}
|
|
|
|
async fn start_job_helper(client: &SupervisorClient, job: runner_rust::job::Job, secret: &str) -> Result<String> {
|
|
info!("🚀 Starting job {} (non-blocking)...", job.id);
|
|
|
|
let response = client.job_start(secret, job).await?;
|
|
|
|
info!("✅ Job queued with ID: {}", response.job_id);
|
|
Ok(response.job_id)
|
|
}
|
|
|
|
#[tokio::main]
|
|
async fn main() -> Result<()> {
|
|
// Initialize logging
|
|
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
|
|
|
|
println!("\n╔════════════════════════════════════════════════════════════╗");
|
|
println!("║ End-to-End Demo: Supervisor + Runner + Client ║");
|
|
println!("╚════════════════════════════════════════════════════════════╝\n");
|
|
|
|
let config = DemoConfig::default();
|
|
|
|
// Step 1: Start supervisor
|
|
println!("📋 Step 1: Starting Supervisor");
|
|
println!("─────────────────────────────────────────────────────────────");
|
|
let _supervisor = SupervisorProcess::start(&config)?;
|
|
sleep(Duration::from_secs(3)).await;
|
|
println!("✅ Supervisor started on port {}\n", config.supervisor_port);
|
|
|
|
// Step 2: Start runner
|
|
println!("📋 Step 2: Starting OSIS Runner");
|
|
println!("─────────────────────────────────────────────────────────────");
|
|
let _runner = RunnerProcess::start(&config)?;
|
|
sleep(Duration::from_secs(3)).await;
|
|
println!("✅ Runner '{}' started\n", config.runner_id);
|
|
|
|
// Step 3: Create client and register runner
|
|
println!("📋 Step 3: Registering Runner with Supervisor");
|
|
println!("─────────────────────────────────────────────────────────────");
|
|
let client = SupervisorClient::new(&format!("http://localhost:{}", config.supervisor_port))?;
|
|
register_runner_helper(&client, &config.runner_id, "admin_secret").await?;
|
|
println!("✅ Runner registered\n");
|
|
|
|
sleep(Duration::from_secs(2)).await;
|
|
|
|
// Step 4: Run blocking jobs (job.run)
|
|
println!("📋 Step 4: Running Blocking Jobs (job.run)");
|
|
println!("─────────────────────────────────────────────────────────────");
|
|
|
|
// Job 1: Simple calculation
|
|
println!("\n🔹 Job 1: Simple Calculation");
|
|
let job1 = JobBuilder::new()
|
|
.caller_id("demo_client")
|
|
.context_id("demo_context")
|
|
.payload("let result = 2 + 2; to_json(result)")
|
|
.runner(&config.runner_id)
|
|
.executor("rhai")
|
|
.timeout(30)
|
|
.build()?;
|
|
|
|
let result1 = run_job_helper(&client, job1, "admin_secret", 30).await?;
|
|
println!(" Result: {}", result1);
|
|
|
|
// Job 2: String manipulation
|
|
println!("\n🔹 Job 2: String Manipulation");
|
|
let job2 = JobBuilder::new()
|
|
.caller_id("demo_client")
|
|
.context_id("demo_context")
|
|
.payload(r#"let msg = "Hello from OSIS Runner!"; to_json(msg)"#)
|
|
.runner(&config.runner_id)
|
|
.executor("rhai")
|
|
.timeout(30)
|
|
.build()?;
|
|
|
|
let result2 = run_job_helper(&client, job2, "admin_secret", 30).await?;
|
|
println!(" Result: {}", result2);
|
|
|
|
// Job 3: Array operations
|
|
println!("\n🔹 Job 3: Array Operations");
|
|
let job3 = JobBuilder::new()
|
|
.caller_id("demo_client")
|
|
.context_id("demo_context")
|
|
.payload(r#"
|
|
let numbers = [1, 2, 3, 4, 5];
|
|
let sum = 0;
|
|
for n in numbers {
|
|
sum += n;
|
|
}
|
|
to_json(#{sum: sum, count: numbers.len()})
|
|
"#)
|
|
.runner(&config.runner_id)
|
|
.executor("rhai")
|
|
.timeout(30)
|
|
.build()?;
|
|
|
|
let result3 = run_job_helper(&client, job3, "admin_secret", 30).await?;
|
|
println!(" Result: {}", result3);
|
|
|
|
println!("\n✅ All blocking jobs completed successfully\n");
|
|
|
|
// Step 5: Start non-blocking jobs (job.start)
|
|
println!("📋 Step 5: Starting Non-Blocking Jobs (job.start)");
|
|
println!("─────────────────────────────────────────────────────────────");
|
|
|
|
println!("\n🔹 Job 4: Background Task");
|
|
let job4 = JobBuilder::new()
|
|
.caller_id("demo_client")
|
|
.context_id("demo_context")
|
|
.payload(r#"
|
|
let result = "Background task completed";
|
|
to_json(result)
|
|
"#)
|
|
.runner(&config.runner_id)
|
|
.executor("rhai")
|
|
.timeout(30)
|
|
.build()?;
|
|
|
|
let job4_id = start_job_helper(&client, job4, "admin_secret").await?;
|
|
println!(" Job ID: {} (running in background)", job4_id);
|
|
|
|
println!("\n✅ Non-blocking job started\n");
|
|
|
|
// Step 6: Summary
|
|
println!("📋 Step 6: Demo Summary");
|
|
println!("─────────────────────────────────────────────────────────────");
|
|
println!("✅ Supervisor: Running on port {}", config.supervisor_port);
|
|
println!("✅ Runner: '{}' registered and processing jobs", config.runner_id);
|
|
println!("✅ Blocking jobs: 3 completed successfully");
|
|
println!("✅ Non-blocking jobs: 1 started");
|
|
println!("\n🎉 Demo completed successfully!");
|
|
|
|
// Keep processes running for a bit to see logs
|
|
println!("\n⏳ Keeping processes running for 5 seconds...");
|
|
sleep(Duration::from_secs(5)).await;
|
|
|
|
println!("\n🛑 Shutting down...");
|
|
|
|
Ok(())
|
|
}
|