//! End-to-End Integration Tests for Hero Coordinator //! //! Tests coordinator flow management functionality against a running coordinator instance. //! The coordinator binary is automatically started and stopped for each test run. //! //! **IMPORTANT**: Run with `--test-threads=1` to ensure tests run sequentially: //! ``` //! cargo test --test coordinator -- --test-threads=1 //! ``` use hero_coordinator_client::{CoordinatorClient, models::*}; use std::collections::HashMap; use std::sync::Once; use std::process::Child; /// Test configuration const COORDINATOR_URL: &str = "http://127.0.0.1:9652"; const TEST_CONTEXT_ID: u32 = 2; const TEST_CALLER_ID: u32 = 11001; const TEST_FLOW_ID: u32 = 13001; const BASE_JOB_ID: u32 = 20000; use std::sync::Mutex; use lazy_static::lazy_static; lazy_static! { static ref COORDINATOR_PROCESS: Mutex> = Mutex::new(None); } /// Global initialization flag static INIT: Once = Once::new(); /// Initialize and start the coordinator binary (called once) async fn init_coordinator() { INIT.call_once(|| { // Register cleanup handler let _ = std::panic::catch_unwind(|| { ctrlc::set_handler(move || { cleanup_coordinator(); std::process::exit(0); }).ok(); }); // Use escargot to build and get the binary path let binary = escargot::CargoBuild::new() .bin("coordinator") .package("hero-coordinator") .run() .expect("Failed to build coordinator binary"); // Start the coordinator binary with HTTP transport (no mycelium needed) let child = binary .command() .env("RUST_LOG", "info") .args(&[ "--api-http-port", "9652", "--api-ws-port", "9653", "--redis-addr", "127.0.0.1:6379", "--supervisor-transport", "http", ]) .spawn() .expect("Failed to start coordinator"); *COORDINATOR_PROCESS.lock().unwrap() = Some(child); // Wait for server to be ready with simple TCP check use std::net::TcpStream; use std::time::Duration; println!("⏳ Waiting for coordinator to start..."); for i in 0..30 { std::thread::sleep(Duration::from_millis(500)); // Try to connect to the port if TcpStream::connect_timeout( &"127.0.0.1:9652".parse().unwrap(), Duration::from_millis(100) ).is_ok() { // Give it more time to fully initialize std::thread::sleep(Duration::from_secs(2)); println!("✅ Coordinator ready after ~{}ms", (i * 500) + 2000); return; } } panic!("Coordinator failed to start within 15 seconds"); }); } /// Cleanup coordinator process fn cleanup_coordinator() { if let Ok(mut guard) = COORDINATOR_PROCESS.lock() { if let Some(mut child) = guard.take() { println!("🧹 Cleaning up coordinator process..."); let _ = child.kill(); let _ = child.wait(); } } } /// Helper to create a test client async fn create_client() -> CoordinatorClient { // Ensure coordinator is running init_coordinator().await; CoordinatorClient::new(COORDINATOR_URL) .expect("Failed to create coordinator client") } #[tokio::test] async fn test_01_flow_create_simple() { println!("\n🧪 Test: flow.create (simple flow)"); let client = create_client().await; // Create jobs for the flow let job_ids = vec![BASE_JOB_ID, BASE_JOB_ID + 1]; for (i, job_id) in job_ids.iter().enumerate() { let job = JobCreate { id: *job_id, caller_id: TEST_CALLER_ID, context_id: TEST_CONTEXT_ID, script: format!("print('job {}')", i), script_type: ScriptType::Python, timeout: 60, retries: 0, env_vars: HashMap::new(), prerequisites: vec![], depends: if i == 0 { vec![] } else { vec![job_ids[i - 1]] }, }; let result = client.job_create_or_load(TEST_CONTEXT_ID, job).await; if let Err(ref e) = result { println!(" Job {} creation error: {:?}", job_id, e); } assert!(result.is_ok(), "Job {} should be created", job_id); } // Create flow let flow_create = FlowCreate { id: TEST_FLOW_ID, caller_id: TEST_CALLER_ID, context_id: TEST_CONTEXT_ID, jobs: job_ids.clone(), env_vars: HashMap::new(), }; let result = client.flow_create_or_load(TEST_CONTEXT_ID, flow_create).await; if let Err(ref e) = result { println!(" Error: {:?}", e); } assert!(result.is_ok(), "flow.create_or_load should succeed"); let flow = result.unwrap(); assert_eq!(flow.id, TEST_FLOW_ID); assert_eq!(flow.jobs, job_ids); println!("✅ flow.create works - flow: {}, jobs: {:?}", flow.id, flow.jobs); } #[tokio::test] async fn test_02_flow_load() { println!("\n🧪 Test: flow.load"); let client = create_client().await; // Create a flow first (reuse from test_01) let job_ids = vec![BASE_JOB_ID, BASE_JOB_ID + 1]; for (i, job_id) in job_ids.iter().enumerate() { let job = JobCreate { id: *job_id, caller_id: TEST_CALLER_ID, context_id: TEST_CONTEXT_ID, script: format!("print('job {}')", i), script_type: ScriptType::Python, timeout: 60, retries: 0, env_vars: HashMap::new(), prerequisites: vec![], depends: if i == 0 { vec![] } else { vec![job_ids[i - 1]] }, }; let _ = client.job_create_or_load(TEST_CONTEXT_ID, job).await; } let flow_create = FlowCreate { id: TEST_FLOW_ID, caller_id: TEST_CALLER_ID, context_id: TEST_CONTEXT_ID, jobs: job_ids.clone(), env_vars: HashMap::new(), }; let _ = client.flow_create_or_load(TEST_CONTEXT_ID, flow_create).await; // Load the flow let result = client.flow_load(TEST_CONTEXT_ID, TEST_FLOW_ID).await; if let Err(ref e) = result { println!(" Error: {:?}", e); } assert!(result.is_ok(), "flow.load should succeed"); let flow = result.unwrap(); assert_eq!(flow.id, TEST_FLOW_ID); assert_eq!(flow.jobs, job_ids); println!("✅ flow.load works - loaded flow: {}", flow.id); } #[tokio::test] async fn test_03_flow_dag() { println!("\n🧪 Test: flow.dag"); let client = create_client().await; // Create jobs with dependencies let job_ids = vec![BASE_JOB_ID + 100, BASE_JOB_ID + 101, BASE_JOB_ID + 102]; for (i, job_id) in job_ids.iter().enumerate() { let job = JobCreate { id: *job_id, caller_id: TEST_CALLER_ID, context_id: TEST_CONTEXT_ID, script: format!("print('dag job {}')", i), script_type: ScriptType::Python, timeout: 60, retries: 0, env_vars: HashMap::new(), prerequisites: vec![], depends: if i == 0 { vec![] } else { vec![job_ids[i - 1]] }, }; let _ = client.job_create_or_load(TEST_CONTEXT_ID, job).await; } let flow_id = TEST_FLOW_ID + 1; let flow_create = FlowCreate { id: flow_id, caller_id: TEST_CALLER_ID, context_id: TEST_CONTEXT_ID, jobs: job_ids.clone(), env_vars: HashMap::new(), }; let _ = client.flow_create_or_load(TEST_CONTEXT_ID, flow_create).await; // Get the DAG let result = client.flow_dag(TEST_CONTEXT_ID, flow_id).await; if let Err(ref e) = result { println!(" Error: {:?}", e); } assert!(result.is_ok(), "flow.dag should succeed"); let dag = result.unwrap(); assert_eq!(dag.flow_id, flow_id); assert_eq!(dag.nodes.len(), 3); assert_eq!(dag.edges.len(), 2); // Two edges for the chain println!("✅ flow.dag works - flow: {}, nodes: {}, edges: {}", dag.flow_id, dag.nodes.len(), dag.edges.len()); } #[tokio::test] async fn test_04_flow_start() { println!("\n🧪 Test: flow.start"); let client = create_client().await; // Create a simple flow let job_id = BASE_JOB_ID + 200; let job = JobCreate { id: job_id, caller_id: TEST_CALLER_ID, context_id: TEST_CONTEXT_ID, script: "print('start test')".to_string(), script_type: ScriptType::Python, timeout: 60, retries: 0, env_vars: HashMap::new(), prerequisites: vec![], depends: vec![], }; let _ = client.job_create_or_load(TEST_CONTEXT_ID, job).await; let flow_id = TEST_FLOW_ID + 2; let flow_create = FlowCreate { id: flow_id, caller_id: TEST_CALLER_ID, context_id: TEST_CONTEXT_ID, jobs: vec![job_id], env_vars: HashMap::new(), }; let _ = client.flow_create_or_load(TEST_CONTEXT_ID, flow_create).await; // Start the flow let result = client.flow_start(TEST_CONTEXT_ID, flow_id).await; match result { Ok(started) => { println!("✅ flow.start works - started: {}", started); } Err(e) => { println!("⚠️ flow.start: {:?} (runner may not be available)", e); // This is expected if no actual runner is listening } } } #[tokio::test] async fn test_05_message_create() { println!("\n🧪 Test: message.create"); let client = create_client().await; let message_create = MessageCreate { id: 1, context_id: TEST_CONTEXT_ID, runner_id: 12001, job_id: BASE_JOB_ID, message_type: MessageType::JobRun, format: MessageFormatType::JsonRpc, payload: r#"{"method":"job.run","params":{}}"#.to_string(), }; let result = client.message_create(TEST_CONTEXT_ID, message_create).await; match result { Ok(message) => { assert_eq!(message.id, 1); assert_eq!(message.context_id, TEST_CONTEXT_ID); println!("✅ message.create works - message: {}", message.id); } Err(e) => { println!("⚠️ message.create: {:?} (may already exist)", e); } } } #[tokio::test] async fn test_06_message_load() { println!("\n🧪 Test: message.load"); let client = create_client().await; // Create a message first let message_create = MessageCreate { id: 2, context_id: TEST_CONTEXT_ID, runner_id: 12001, job_id: BASE_JOB_ID, message_type: MessageType::JobRun, format: MessageFormatType::JsonRpc, payload: r#"{"method":"job.run","params":{}}"#.to_string(), }; let _ = client.message_create(TEST_CONTEXT_ID, message_create).await; // Load the message let result = client.message_load(TEST_CONTEXT_ID, 2).await; if let Err(ref e) = result { println!(" Error: {:?}", e); } match result { Ok(message) => { assert_eq!(message.id, 2); assert_eq!(message.context_id, TEST_CONTEXT_ID); println!("✅ message.load works - loaded message: {}", message.id); } Err(_) => { println!("⚠️ message.load failed (message may not exist)"); } } } /// Final test that ensures cleanup happens #[tokio::test] async fn test_zz_cleanup() { println!("🧹 Running cleanup..."); cleanup_coordinator(); tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; use std::net::TcpStream; let port_free = TcpStream::connect_timeout( &"127.0.0.1:9652".parse().unwrap(), std::time::Duration::from_millis(100) ).is_err(); assert!(port_free, "Port 9652 should be free after cleanup"); println!("✅ Cleanup complete - port 9652 is free"); }