Files
home/examples/supervisor_client_demo.rs
2025-09-02 09:14:05 +02:00

308 lines
11 KiB
Rust
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! # Hero Supervisor Client Demo (Limited)
//!
//! ⚠️ **Note**: This example has limitations due to Mycelium local loopback routing issues.
//! For a working example, use `mycelium_two_node_test.rs` instead.
//!
//! This example demonstrates SupervisorClient usage but is affected by Mycelium's
//! inability to route messages properly when both client and supervisor run on the same node.
//!
//! For production use, deploy client and supervisor on separate Mycelium nodes.
//!
//! To run this example:
//! ```bash
//! cd /Users/timurgordon/code/git.ourworld.tf/herocode/home/examples
//! cargo run --example supervisor_client_demo
//! ```
use herocoordinator::clients::{Destination, SupervisorClient};
use std::net::IpAddr;
use std::time::Duration;
use escargot::CargoBuild;
use std::process::{Stdio, Child};
use tokio::time::sleep;
use serde_json::json;
use log::{info, warn, error};
struct SupervisorProcess {
process: Child,
mycelium_url: String,
topic: String,
}
impl SupervisorProcess {
async fn start() -> Result<Self, Box<dyn std::error::Error>> {
info!("Building supervisor binary with escargot...");
// Build the supervisor binary from the supervisor crate
let supervisor_binary = CargoBuild::new()
.bin("supervisor")
.manifest_path("../../supervisor/Cargo.toml")
.current_release()
.run()?;
info!("Supervisor binary built successfully");
let mycelium_url = "http://127.0.0.1:8990".to_string();
let topic = "supervisor.rpc".to_string();
info!("Starting supervisor process with Mycelium integration...");
// Start supervisor with admin secrets for full access
let process = supervisor_binary
.command()
.args(&[
"--admin-secret", "admin123",
"--user-secret", "user456",
"--register-secret", "register789",
"--mycelium-url", &mycelium_url,
"--topic", &topic,
])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
info!("Supervisor process started (PID: {})", process.id());
// Wait for server to start up
sleep(Duration::from_secs(3)).await;
Ok(SupervisorProcess { process, mycelium_url, topic })
}
fn shutdown(mut self) -> Result<(), Box<dyn std::error::Error>> {
info!("Shutting down supervisor process...");
self.process.kill()?;
let status = self.process.wait()?;
info!("Supervisor exited with status: {}", status);
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
println!("🚀 Hero Supervisor Client Demo");
println!("==============================");
// Start the supervisor process
let supervisor = SupervisorProcess::start().await?;
// Create supervisor client using IP destination from the running Mycelium node
let client = SupervisorClient::new(
"http://127.0.0.1:8990", // Mycelium base URL
Destination::Ip("56d:524:53e6:1e4b::1".parse::<IpAddr>()?), // Current node IP
"supervisor.rpc",
Some("admin123".to_string()), // Admin secret for full access
)?;
println!("✅ Supervisor client created");
// Test connectivity with retries
println!("\n🔍 Testing supervisor connectivity...");
let mut attempts = 0;
let max_attempts = 10;
loop {
attempts += 1;
match client.call_sync("list_runners", json!([]), 5).await {
Ok(result) => {
println!("✅ Supervisor is responsive");
println!("📋 Current runners: {}", result);
break;
}
Err(e) if attempts < max_attempts => {
println!("⏳ Attempt {}/{}: Supervisor not ready, retrying...", attempts, max_attempts);
sleep(Duration::from_secs(1)).await;
continue;
}
Err(e) => {
error!("❌ Failed to connect after {} attempts: {}", max_attempts, e);
supervisor.shutdown()?;
return Err(e.into());
}
}
}
// Register a runner
println!("\n Registering a test runner...");
match client.register_runner("demo_runner", "demo_queue").await {
Ok(message_id) => println!("✅ Runner registration sent (message ID: {})", message_id),
Err(e) => {
warn!("⚠️ Runner registration failed: {}", e);
println!(" This might be expected if runner management is not fully implemented");
}
}
// Create and submit multiple jobs
println!("\n📤 Submitting multiple jobs...");
let jobs = vec![
("Hello World Job", json!({
"id": "job_1",
"runner_name": "demo_runner",
"command": "echo 'Hello from Hero Supervisor!'",
"timeout": 30,
"env": {}
})),
("Math Calculation", json!({
"id": "job_2",
"runner_name": "demo_runner",
"command": "python3 -c 'print(f\"The answer is: {42 * 2}\")'",
"timeout": 30,
"env": {}
})),
("System Info", json!({
"id": "job_3",
"runner_name": "demo_runner",
"command": "uname -a && date",
"timeout": 30,
"env": {}
})),
("File Listing", json!({
"id": "job_4",
"runner_name": "demo_runner",
"command": "ls -la /tmp",
"timeout": 30,
"env": {}
})),
("JSON Processing", json!({
"id": "job_5",
"runner_name": "demo_runner",
"command": "echo '{\"status\": \"success\", \"data\": [1,2,3]}' | python3 -m json.tool",
"timeout": 30,
"env": {}
})),
];
let mut job_message_ids = Vec::new();
for (description, job_data) in jobs {
println!("🚀 Submitting job: {}", description);
match client.job_run(job_data.clone()).await {
Ok(message_id) => {
println!("✅ Job '{}' submitted (message ID: {})", description, message_id);
job_message_ids.push((description.to_string(), message_id));
}
Err(e) => {
println!("❌ Failed to submit job '{}': {}", description, e);
}
}
// Small delay between job submissions
sleep(Duration::from_millis(500)).await;
}
// Demonstrate job status monitoring
println!("\n📊 Monitoring job status...");
for (description, _message_id) in &job_message_ids {
let job_id = match description.as_str() {
"Hello World Job" => "job_1",
"Math Calculation" => "job_2",
"System Info" => "job_3",
"File Listing" => "job_4",
"JSON Processing" => "job_5",
_ => continue,
};
println!("🔍 Checking status for job: {}", description);
match client.job_status_sync(job_id, 10).await {
Ok(status) => {
println!("✅ Job '{}' status: {}", description, status);
}
Err(e) => {
println!("⚠️ Could not get status for job '{}': {}", description, e);
}
}
// Try to get job result
match client.job_result_sync(job_id, 10).await {
Ok(result) => {
if let Some(success) = result.get("success") {
println!("📋 Job '{}' result: {}", description, success);
} else if let Some(error) = result.get("error") {
println!("❌ Job '{}' error: {}", description, error);
}
}
Err(e) => {
println!("⚠️ Could not get result for job '{}': {}", description, e);
}
}
sleep(Duration::from_millis(300)).await;
}
// Demonstrate bulk operations
println!("\n📊 Demonstrating bulk operations...");
// Get all runner status
println!("📋 Getting all runner statuses...");
match client.get_all_runner_status().await {
Ok(message_id) => {
println!("✅ All runner status request sent (message ID: {})", message_id);
// Try to get the sync result
match client.call_sync("get_all_runner_status", json!([]), 5).await {
Ok(result) => println!("📊 All runner statuses: {}", result),
Err(e) => println!("⚠️ Could not get sync result: {}", e),
}
}
Err(e) => println!("❌ Failed to request all runner status: {}", e),
}
// List all jobs
println!("\n📋 Listing all jobs...");
match client.jobs_list().await {
Ok(message_id) => {
println!("✅ Jobs list request sent (message ID: {})", message_id);
// Try to get the sync result
match client.call_sync("jobs.list", json!([]), 5).await {
Ok(result) => println!("📋 All jobs: {}", result),
Err(e) => println!("⚠️ Could not get sync jobs list: {}", e),
}
}
Err(e) => println!("❌ Failed to list jobs: {}", e),
}
// Test RPC discovery
println!("\n🔍 Testing RPC discovery...");
match client.rpc_discover().await {
Ok(message_id) => {
println!("✅ RPC discovery request sent (message ID: {})", message_id);
// Try to get the sync result
match client.call_sync("rpc.discover", json!([]), 5).await {
Ok(result) => println!("🔍 Available RPC methods: {}", result),
Err(e) => println!("⚠️ Could not get sync discovery result: {}", e),
}
}
Err(e) => println!("❌ Failed to discover RPC methods: {}", e),
}
// Wait a bit for any remaining operations
println!("\n⏳ Waiting for operations to complete...");
sleep(Duration::from_secs(2)).await;
// Shutdown
println!("\n🛑 Shutting down supervisor...");
supervisor.shutdown()?;
println!("\n🎉 Hero Supervisor Client Demo Complete!");
println!("========================================");
println!("✅ Successfully demonstrated:");
println!(" - Building supervisor binary with escargot");
println!(" - Starting supervisor with admin secrets");
println!(" - Creating SupervisorClient with Mycelium transport");
println!(" - Submitting multiple jobs with different commands");
println!(" - Monitoring job status and results");
println!(" - Bulk operations (runner status, job listing)");
println!(" - RPC method discovery");
println!(" - Graceful supervisor shutdown");
println!("\n🚀 The Hero Supervisor client integration is working!");
Ok(())
}