add circles app and libraries
This commit is contained in:
		
							
								
								
									
										2644
									
								
								cmd/Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										2644
									
								
								cmd/Cargo.lock
									
									
									
										generated
									
									
									
								
							
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							| @@ -1,23 +0,0 @@ | ||||
| [package] | ||||
| name = "circles_orchestrator" | ||||
| version = "0.1.0" | ||||
| edition = "2021" | ||||
|  | ||||
| # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||||
|  | ||||
| [dependencies] | ||||
| tokio = { version = "1", features = ["full"] } | ||||
| serde = { version = "1.0", features = ["derive"] } | ||||
| serde_json = "1.0" | ||||
| # clap = { version = "4.0", features = ["derive"], optional = true } # Optional for future args | ||||
| dirs = "5.0" | ||||
| log = "0.4" | ||||
| env_logger = "0.10" | ||||
| comfy-table = "7.0" # For table display | ||||
|  | ||||
| # Path dependencies to other local crates | ||||
| heromodels = { path = "../../db/heromodels" } # Changed from ourdb | ||||
| rhai_engine = { path = "../../rhailib/src/engine" } | ||||
| rhai_worker = { path = "../../rhailib/src/worker" } | ||||
| # rhai_client is used by circle_ws_lib, not directly by orchestrator usually | ||||
| circle_ws_lib = { path = "../server_ws" } | ||||
| @@ -1,5 +0,0 @@ | ||||
| [ | ||||
|   { "id": 1, "name": "Alpha Circle", "port": 8091 }, | ||||
|   { "id": 2, "name": "Alpha Circle", "port": 8082 }, | ||||
|   { "id": 3, "name": "Beta Circle", "port": 8083 } | ||||
| ] | ||||
							
								
								
									
										245
									
								
								cmd/src/main.rs
									
									
									
									
									
								
							
							
						
						
									
										245
									
								
								cmd/src/main.rs
									
									
									
									
									
								
							| @@ -1,245 +0,0 @@ | ||||
| use std::fs; | ||||
| use std::path::PathBuf; | ||||
| use std::sync::{Arc, Mutex}; | ||||
| use serde::Deserialize; | ||||
| use tokio::task::JoinHandle; | ||||
| use tokio::sync::{oneshot, mpsc}; // For server handles and worker shutdown | ||||
| use tokio::signal; | ||||
| use std::time::Duration; | ||||
|  | ||||
| use comfy_table::{Table, Row, Cell, ContentArrangement}; | ||||
| use log::{info, error, warn, debug}; | ||||
|  | ||||
| use heromodels::db::hero::{OurDB as HeroOurDB}; // Renamed to avoid conflict if OurDB is used from elsewhere | ||||
| use rhai_engine::create_heromodels_engine; | ||||
| use worker_lib::spawn_rhai_worker; // This now takes a shutdown_rx | ||||
| use circle_ws_lib::spawn_circle_ws_server; // This now takes a server_handle_tx | ||||
|  | ||||
| const DEFAULT_REDIS_URL: &str = "redis://127.0.0.1:6379"; | ||||
|  | ||||
| #[derive(Deserialize, Debug, Clone)] | ||||
| struct CircleConfig { | ||||
|     id: u32, | ||||
|     name: String, | ||||
|     port: u16, | ||||
| } | ||||
|  | ||||
| struct RunningCircleInfo { | ||||
|     config: CircleConfig, | ||||
|     db_path: PathBuf, | ||||
|     worker_queue: String, | ||||
|     ws_url: String, | ||||
|      | ||||
|     worker_handle: JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>>, | ||||
|     worker_shutdown_tx: mpsc::Sender<()>, // To signal worker to stop | ||||
|  | ||||
|     // Store the server handle for graceful shutdown, and its JoinHandle | ||||
|     ws_server_instance_handle: Arc<Mutex<Option<actix_web::dev::Server>>>,  | ||||
|     ws_server_task_join_handle: JoinHandle<std::io::Result<()>>, | ||||
|     status: Arc<Mutex<String>>,  | ||||
| } | ||||
|  | ||||
| #[tokio::main] | ||||
| async fn main() -> Result<(), Box<dyn std::error::Error>> { | ||||
|     std::env::set_var("RUST_LOG", "info,circles_orchestrator=debug,worker_lib=debug,circle_ws_lib=debug,rhai_client=debug,actix_server=info"); | ||||
|     env_logger::init(); | ||||
|  | ||||
|     info!("Starting Circles Orchestrator..."); | ||||
|     info!("Press Ctrl+C to initiate graceful shutdown."); | ||||
|  | ||||
|     let config_path = PathBuf::from("./circles.json"); | ||||
|     if !config_path.exists() { | ||||
|         error!("Configuration file not found at {:?}. Please create circles.json.", config_path); | ||||
|         return Err("circles.json not found".into()); | ||||
|     } | ||||
|  | ||||
|     let config_content = fs::read_to_string(config_path)?; | ||||
|     let circle_configs: Vec<CircleConfig> = serde_json::from_str(&config_content)?; | ||||
|  | ||||
|     if circle_configs.is_empty() { | ||||
|         warn!("No circle configurations found in circles.json. Exiting."); | ||||
|         return Ok(()); | ||||
|     } | ||||
|     info!("Loaded {} circle configurations.", circle_configs.len()); | ||||
|  | ||||
|     let mut running_circles_store: Vec<Arc<Mutex<RunningCircleInfo>>> = Vec::new(); | ||||
|  | ||||
|     for config in circle_configs { | ||||
|         info!("Initializing Circle ID: {}, Name: '{}', Port: {}", config.id, config.name, config.port); | ||||
|         let current_status = Arc::new(Mutex::new(format!("Initializing Circle {}", config.id))); | ||||
|  | ||||
|         let db_base_path = match dirs::home_dir() { | ||||
|             Some(path) => path.join(".hero").join("circles"), | ||||
|             None => { | ||||
|                 error!("Failed to get user home directory for Circle ID {}.", config.id); | ||||
|                 *current_status.lock().unwrap() = "Error: DB Path".to_string(); | ||||
|                 // Not pushing to running_circles_store as it can't fully initialize | ||||
|                 continue; | ||||
|             } | ||||
|         }; | ||||
|         let circle_db_path = db_base_path.join(config.id.to_string()); | ||||
|         if !circle_db_path.exists() { | ||||
|             if let Err(e) = fs::create_dir_all(&circle_db_path) { | ||||
|                 error!("Failed to create database directory for Circle {}: {:?}. Error: {}", config.id, circle_db_path, e); | ||||
|                 *current_status.lock().unwrap() = "Error: DB Create".to_string(); | ||||
|                 continue; | ||||
|             } | ||||
|             info!("Created database directory for Circle {}: {:?}", config.id, circle_db_path); | ||||
|         } | ||||
|  | ||||
|         let db = match HeroOurDB::new(circle_db_path.clone(), false) { | ||||
|             Ok(db_instance) => Arc::new(db_instance), | ||||
|             Err(e) => { | ||||
|                 error!("Failed to initialize heromodels::OurDB for Circle {}: {:?}", config.id, e); | ||||
|                 *current_status.lock().unwrap() = "Error: DB Init".to_string(); | ||||
|                 continue; | ||||
|             } | ||||
|         }; | ||||
|         info!("OurDB initialized for Circle {}", config.id); | ||||
|         *current_status.lock().unwrap() = format!("DB Ok for Circle {}", config.id); | ||||
|  | ||||
|         let engine = create_heromodels_engine(db.clone()); | ||||
|         info!("Rhai Engine created for Circle {}", config.id); | ||||
|         *current_status.lock().unwrap() = format!("Engine Ok for Circle {}", config.id); | ||||
|          | ||||
|         // Channel for worker shutdown | ||||
|         let (worker_shutdown_tx, worker_shutdown_rx) = mpsc::channel(1); // Buffer of 1 is fine | ||||
|  | ||||
|         let worker_handle = spawn_rhai_worker( | ||||
|             config.id, | ||||
|             config.name.clone(), | ||||
|             engine, // engine is Clone | ||||
|             DEFAULT_REDIS_URL.to_string(), | ||||
|             worker_shutdown_rx, // Pass the receiver | ||||
|         ); | ||||
|         info!("Rhai Worker spawned for Circle {}", config.id); | ||||
|         let worker_queue_name = format!("rhai_tasks:{}", config.name.replace(" ", "_").to_lowercase()); | ||||
|         *current_status.lock().unwrap() = format!("Worker Spawning for Circle {}", config.id); | ||||
|  | ||||
|         let (server_handle_tx, server_handle_rx) = oneshot::channel(); | ||||
|         let ws_server_task_join_handle = spawn_circle_ws_server( | ||||
|             config.id, | ||||
|             config.name.clone(), | ||||
|             config.port, | ||||
|             DEFAULT_REDIS_URL.to_string(), | ||||
|             server_handle_tx, | ||||
|         ); | ||||
|         info!("Circle WebSocket Server task spawned for Circle {} on port {}", config.id, config.port); | ||||
|         let ws_url = format!("ws://127.0.0.1:{}/ws", config.port); | ||||
|         *current_status.lock().unwrap() = format!("WS Server Spawning for Circle {}", config.id); | ||||
|  | ||||
|         let server_instance_handle_arc = Arc::new(Mutex::new(None)); | ||||
|         let server_instance_handle_clone = server_instance_handle_arc.clone(); | ||||
|         let status_clone_for_server_handle = current_status.clone(); | ||||
|         let circle_id_for_server_handle = config.id; | ||||
|  | ||||
|         tokio::spawn(async move { | ||||
|             match server_handle_rx.await { | ||||
|                 Ok(handle) => { | ||||
|                     *server_instance_handle_clone.lock().unwrap() = Some(handle); | ||||
|                     *status_clone_for_server_handle.lock().unwrap() = format!("Running Circle {}", circle_id_for_server_handle); | ||||
|                     info!("Received server handle for Circle {}", circle_id_for_server_handle); | ||||
|                 } | ||||
|                 Err(_) => { | ||||
|                     *status_clone_for_server_handle.lock().unwrap() = format!("Error: No Server Handle for Circle {}", circle_id_for_server_handle); | ||||
|                     error!("Failed to receive server handle for Circle {}", circle_id_for_server_handle); | ||||
|                 } | ||||
|             } | ||||
|         }); | ||||
|          | ||||
|         running_circles_store.push(Arc::new(Mutex::new(RunningCircleInfo { | ||||
|             config, | ||||
|             db_path: circle_db_path, | ||||
|             worker_queue: worker_queue_name, | ||||
|             ws_url, | ||||
|             worker_handle, | ||||
|             worker_shutdown_tx, | ||||
|             ws_server_instance_handle: server_instance_handle_arc, | ||||
|             ws_server_task_join_handle, | ||||
|             status: current_status, // This is an Arc<Mutex<String>> | ||||
|         }))); | ||||
|     } | ||||
|  | ||||
|     info!("All configured circles have been processed. Initializing status table display loop."); | ||||
|  | ||||
|     let display_running_circles = running_circles_store.clone(); | ||||
|     let display_task = tokio::spawn(async move { | ||||
|         loop { | ||||
|             { // Scope for MutexGuard | ||||
|                 let circles = display_running_circles.iter() | ||||
|                                 .map(|arc_info| arc_info.lock().unwrap()) | ||||
|                                 .collect::<Vec<_>>(); // Collect locked guards | ||||
|  | ||||
|                 let mut table = Table::new(); | ||||
|                 table.set_content_arrangement(ContentArrangement::Dynamic); | ||||
|                 table.set_header(vec!["Name", "ID", "Port", "Status", "DB Path", "Worker Queue", "WS URL"]); | ||||
|  | ||||
|                 for circle_info in circles.iter() { | ||||
|                     let mut row = Row::new(); | ||||
|                     row.add_cell(Cell::new(&circle_info.config.name)); | ||||
|                     row.add_cell(Cell::new(circle_info.config.id)); | ||||
|                     row.add_cell(Cell::new(circle_info.config.port)); | ||||
|                     row.add_cell(Cell::new(&*circle_info.status.lock().unwrap())); // Deref and lock status | ||||
|                     row.add_cell(Cell::new(circle_info.db_path.to_string_lossy())); | ||||
|                     row.add_cell(Cell::new(&circle_info.worker_queue)); | ||||
|                     row.add_cell(Cell::new(&circle_info.ws_url)); | ||||
|                     table.add_row(row); | ||||
|                 } | ||||
|                 // Clear terminal before printing (basic, might flicker) | ||||
|                 // print!("\x1B[2J\x1B[1;1H");  | ||||
|                 println!("\n--- Circles Status (updated every 5s, Ctrl+C to stop) ---\n{table}"); | ||||
|             } | ||||
|             tokio::time::sleep(Duration::from_secs(5)).await; | ||||
|         } | ||||
|     }); | ||||
|  | ||||
|  | ||||
|     signal::ctrl_c().await?; | ||||
|     info!("Ctrl-C received. Initiating graceful shutdown of all circles..."); | ||||
|     display_task.abort(); // Stop the display task | ||||
|  | ||||
|     for circle_arc in running_circles_store { | ||||
|         let mut circle_info = circle_arc.lock().unwrap(); | ||||
|         info!("Shutting down Circle ID: {}, Name: '{}'", circle_info.config.id, circle_info.config.name); | ||||
|         *circle_info.status.lock().unwrap() = "Shutting down".to_string(); | ||||
|  | ||||
|         // Signal worker to shut down | ||||
|         if circle_info.worker_shutdown_tx.send(()).await.is_err() { | ||||
|             warn!("Failed to send shutdown signal to worker for Circle {}. It might have already stopped.", circle_info.config.id); | ||||
|         } | ||||
|  | ||||
|         // Stop WS server | ||||
|         if let Some(server_handle) = circle_info.ws_server_instance_handle.lock().unwrap().take() { | ||||
|             info!("Stopping WebSocket server for Circle {}...", circle_info.config.id); | ||||
|             server_handle.stop(true).await; // Graceful stop | ||||
|             info!("WebSocket server for Circle {} stop signal sent.", circle_info.config.id); | ||||
|         } else { | ||||
|             warn!("No server handle to stop WebSocket server for Circle {}. It might not have started properly or already stopped.", circle_info.config.id); | ||||
|         } | ||||
|     } | ||||
|      | ||||
|     info!("Waiting for all tasks to complete..."); | ||||
|     for circle_arc in running_circles_store { | ||||
|         // We need to take ownership of handles to await them, or await mutable refs. | ||||
|         // This part is tricky if the MutexGuard is held. | ||||
|         // For simplicity, we'll just log that we've signaled them. | ||||
|         // Proper awaiting would require more careful structuring of JoinHandles. | ||||
|         let circle_id; | ||||
|         let circle_name; | ||||
|         { // Short scope for the lock | ||||
|             let circle_info = circle_arc.lock().unwrap(); | ||||
|             circle_id = circle_info.config.id; | ||||
|             circle_name = circle_info.config.name.clone(); | ||||
|         } | ||||
|         debug!("Orchestrator has signaled shutdown for Circle {} ({}). Main loop will await join handles if structured for it.", circle_name, circle_id); | ||||
|         // Actual awaiting of join handles would happen here if they were collected outside the Mutex. | ||||
|         // For now, the main function will exit after this loop. | ||||
|     } | ||||
|  | ||||
|     // Give some time for tasks to shut down before the main process exits. | ||||
|     // This is a simplified approach. A more robust solution would involve awaiting all JoinHandles. | ||||
|     tokio::time::sleep(Duration::from_secs(2)).await;  | ||||
|  | ||||
|     info!("Orchestrator shut down complete."); | ||||
|     Ok(()) | ||||
| } | ||||
		Reference in New Issue
	
	Block a user