//! OpenRPC server implementation. use jsonrpsee::{ core::{RpcResult, async_trait}, proc_macros::rpc, server::{Server, ServerHandle}, types::{ErrorObject, ErrorObjectOwned}, }; use tower_http::cors::{CorsLayer, Any}; use anyhow; use log::{debug, info, error}; use crate::supervisor::Supervisor; use crate::runner::{Runner, RunnerError}; use crate::runner::{ProcessManagerError, ProcessStatus, LogInfo}; use crate::job::Job; use crate::ProcessManagerType; use serde::{Deserialize, Serialize}; use std::net::SocketAddr; use std::sync::Arc; use std::fs; use tokio::sync::Mutex; /// Load OpenRPC specification from docs/openrpc.json fn load_openrpc_spec() -> Result> { // Try to find the openrpc.json file relative to the current working directory let possible_paths = [ "docs/openrpc.json", "../docs/openrpc.json", "../../docs/openrpc.json", "./supervisor/docs/openrpc.json", ]; for path in &possible_paths { if let Ok(content) = fs::read_to_string(path) { match serde_json::from_str(&content) { Ok(spec) => { debug!("Loaded OpenRPC specification from: {}", path); return Ok(spec); } Err(e) => { error!("Failed to parse OpenRPC JSON from {}: {}", path, e); } } } } Err("Could not find or parse docs/openrpc.json".into()) } /// Helper function to convert RunnerError to RPC error fn runner_error_to_rpc_error(err: RunnerError) -> ErrorObject<'static> { ErrorObject::owned( -32603, // Internal error code format!("Supervisor error: {}", err), None::<()>, ) } /// Helper function to create invalid params error fn invalid_params_error(msg: &str) -> ErrorObject<'static> { ErrorObject::owned( -32602, // Invalid params error code format!("Invalid parameters: {}", msg), None::<()>, ) } /// Request parameters for registering a new runner #[derive(Debug, Deserialize, Serialize)] pub struct RegisterRunnerParams { pub secret: String, pub name: String, pub queue: String, } /// Request parameters for running a job #[derive(Debug, Deserialize, Serialize)] pub struct RunJobParams { pub secret: String, pub job: Job, } /// Request parameters for starting a job #[derive(Debug, Deserialize, Serialize)] pub struct StartJobParams { pub secret: String, pub job_id: String, } /// Job result response #[derive(Debug, Serialize, Clone)] #[serde(untagged)] pub enum JobResult { Success { success: String }, Error { error: String }, } /// Job status response #[derive(Debug, Clone, Serialize, Deserialize)] pub struct JobStatusResponse { pub job_id: String, pub status: String, pub created_at: String, pub started_at: Option, pub completed_at: Option, } /// Request parameters for adding a new runner #[derive(Debug, Deserialize, Serialize)] pub struct AddRunnerParams { pub actor_id: String, pub binary_path: String, pub db_path: String, pub redis_url: String, pub process_manager_type: String, // "simple" or "tmux" pub tmux_session_name: Option, // required if process_manager_type is "tmux" } /// Request parameters for queuing a job #[derive(Debug, Deserialize, Serialize)] pub struct QueueJobParams { pub runner: String, pub job: Job, } /// Request parameters for queue and wait operation #[derive(Debug, Deserialize, Serialize)] pub struct QueueAndWaitParams { pub runner: String, pub job: Job, pub timeout_secs: u64, } /// Request parameters for stopping a job #[derive(Debug, Deserialize, Serialize)] pub struct StopJobParams { pub secret: String, pub job_id: String, } /// Request parameters for deleting a job #[derive(Debug, Deserialize, Serialize)] pub struct DeleteJobParams { pub secret: String, pub job_id: String, } /// Request parameters for getting runner logs #[derive(Debug, Deserialize, Serialize)] pub struct GetLogsParams { pub actor_id: String, pub lines: Option, pub follow: bool, } /// Request parameters for adding secrets #[derive(Debug, Deserialize, Serialize)] pub struct AddSecretParams { pub admin_secret: String, pub secret_type: String, // "admin", "user", or "register" pub secret_value: String, } /// Request parameters for removing secrets #[derive(Debug, Deserialize, Serialize)] pub struct RemoveSecretParams { pub admin_secret: String, pub secret_type: String, // "admin", "user", or "register" pub secret_value: String, } /// Request parameters for listing secrets #[derive(Debug, Deserialize, Serialize)] pub struct ListSecretsParams { pub admin_secret: String, } /// Serializable wrapper for ProcessStatus #[derive(Debug, Serialize, Clone)] pub enum ProcessStatusWrapper { Running, Stopped, Starting, Stopping, Error(String), } impl From for ProcessStatusWrapper { fn from(status: ProcessStatus) -> Self { match status { ProcessStatus::NotStarted => ProcessStatusWrapper::Stopped, ProcessStatus::Starting => ProcessStatusWrapper::Starting, ProcessStatus::Running => ProcessStatusWrapper::Running, ProcessStatus::Stopping => ProcessStatusWrapper::Stopping, ProcessStatus::Stopped => ProcessStatusWrapper::Stopped, ProcessStatus::Failed => ProcessStatusWrapper::Error("Process failed".to_string()), ProcessStatus::Error(msg) => ProcessStatusWrapper::Error(msg), } } } // Note: RunnerStatus is just an alias for ProcessStatus, so we don't need a separate impl /// Serializable wrapper for Runner #[derive(Debug, Serialize, Clone)] pub struct RunnerWrapper { pub id: String, pub name: String, pub command: String, pub redis_url: String, } impl From<&Runner> for RunnerWrapper { fn from(runner: &Runner) -> Self { RunnerWrapper { id: runner.id.clone(), name: runner.name.clone(), command: runner.command.to_string_lossy().to_string(), redis_url: runner.redis_url.clone(), } } } /// Serializable wrapper for LogInfo #[derive(Debug, Serialize, Clone)] pub struct LogInfoWrapper { pub timestamp: String, pub level: String, pub message: String, } impl From for LogInfoWrapper { fn from(log: crate::runner::LogInfo) -> Self { LogInfoWrapper { timestamp: log.timestamp, level: log.level, message: log.message, } } } /// Response for runner status queries #[derive(Debug, Serialize, Clone)] pub struct RunnerStatusResponse { pub actor_id: String, pub status: ProcessStatusWrapper, } /// Response for supervisor info #[derive(Debug, Serialize, Clone)] pub struct SupervisorInfoResponse { pub server_url: String, pub admin_secrets_count: usize, pub user_secrets_count: usize, pub register_secrets_count: usize, pub runners_count: usize, } /// OpenRPC trait defining all supervisor methods #[rpc(server)] pub trait SupervisorRpc { /// Register a new runner with secret-based authentication #[method(name = "register_runner")] async fn register_runner(&self, params: RegisterRunnerParams) -> RpcResult; /// Create a job without queuing it to a runner #[method(name = "jobs.create")] async fn jobs_create(&self, params: RunJobParams) -> RpcResult; /// List all jobs #[method(name = "jobs.list")] async fn jobs_list(&self) -> RpcResult>; /// Run a job on the appropriate runner and return the result #[method(name = "job.run")] async fn job_run(&self, params: RunJobParams) -> RpcResult; /// Start a previously created job by queuing it to its assigned runner #[method(name = "job.start")] async fn job_start(&self, params: StartJobParams) -> RpcResult<()>; /// Get the current status of a job #[method(name = "job.status")] async fn job_status(&self, job_id: String) -> RpcResult; /// Get the result of a completed job (blocks until result is available) #[method(name = "job.result")] async fn job_result(&self, job_id: String) -> RpcResult; /// Stop a running job #[method(name = "job.stop")] async fn job_stop(&self, params: StopJobParams) -> RpcResult<()>; /// Delete a job from the system #[method(name = "job.delete")] async fn job_delete(&self, params: DeleteJobParams) -> RpcResult<()>; /// Remove a runner from the supervisor #[method(name = "remove_runner")] async fn remove_runner(&self, actor_id: String) -> RpcResult<()>; /// List all runner IDs #[method(name = "list_runners")] async fn list_runners(&self) -> RpcResult>; /// Start a specific runner #[method(name = "start_runner")] async fn start_runner(&self, actor_id: String) -> RpcResult<()>; /// Stop a specific runner #[method(name = "stop_runner")] async fn stop_runner(&self, actor_id: String, force: bool) -> RpcResult<()>; /// Get a specific runner by ID #[method(name = "get_runner")] async fn get_runner(&self, actor_id: String) -> RpcResult; /// Get the status of a specific runner #[method(name = "get_runner_status")] async fn get_runner_status(&self, actor_id: String) -> RpcResult; /// Get logs for a specific runner #[method(name = "get_runner_logs")] async fn get_runner_logs(&self, params: GetLogsParams) -> RpcResult>; /// Queue a job to a specific runner #[method(name = "queue_job_to_runner")] async fn queue_job_to_runner(&self, params: QueueJobParams) -> RpcResult<()>; /// Get a job by job ID #[method(name = "get_job")] async fn get_job(&self, job_id: String) -> RpcResult; /// Ping a runner (dispatch a ping job) #[method(name = "ping_runner")] async fn ping_runner(&self, runner_id: String) -> RpcResult; /// Stop a job #[method(name = "stop_job")] async fn stop_job(&self, job_id: String) -> RpcResult<()>; /// Delete a job #[method(name = "delete_job")] async fn delete_job(&self, job_id: String) -> RpcResult<()>; /// Queue a job to a specific runner and wait for the result #[method(name = "queue_and_wait")] async fn queue_and_wait(&self, params: QueueAndWaitParams) -> RpcResult>; /// Get status of all runners #[method(name = "get_all_runner_status")] async fn get_all_runner_status(&self) -> RpcResult>; /// Start all runners #[method(name = "start_all")] async fn start_all(&self) -> RpcResult>; /// Stop all runners #[method(name = "stop_all")] async fn stop_all(&self, force: bool) -> RpcResult>; /// Get status of all runners (alternative format) #[method(name = "get_all_status")] async fn get_all_status(&self) -> RpcResult>; /// Add a secret to the supervisor (admin, user, or register) #[method(name = "add_secret")] async fn add_secret(&self, params: AddSecretParams) -> RpcResult<()>; /// Remove a secret from the supervisor #[method(name = "remove_secret")] async fn remove_secret(&self, params: RemoveSecretParams) -> RpcResult<()>; /// List all secrets (returns counts only for security) #[method(name = "list_secrets")] async fn list_secrets(&self, params: ListSecretsParams) -> RpcResult; /// List admin secrets (returns actual secret values) #[method(name = "list_admin_secrets")] async fn list_admin_secrets(&self, admin_secret: String) -> RpcResult>; /// List user secrets (returns actual secret values) #[method(name = "list_user_secrets")] async fn list_user_secrets(&self, admin_secret: String) -> RpcResult>; /// List register secrets (returns actual secret values) #[method(name = "list_register_secrets")] async fn list_register_secrets(&self, admin_secret: String) -> RpcResult>; /// Get supervisor information and statistics #[method(name = "get_supervisor_info")] async fn get_supervisor_info(&self, admin_secret: String) -> RpcResult; /// OpenRPC discovery method - returns the OpenRPC document describing this API #[method(name = "rpc.discover")] async fn rpc_discover(&self) -> RpcResult; } /// Helper function to parse process manager type from string fn parse_process_manager_type(pm_type: &str, session_name: Option) -> Result> { match pm_type.to_lowercase().as_str() { "simple" => Ok(ProcessManagerType::Simple), "tmux" => { let session = session_name.unwrap_or_else(|| "default_session".to_string()); Ok(ProcessManagerType::Tmux(session)) }, _ => Err(invalid_params_error(&format!( "Invalid process manager type: {}. Must be 'simple' or 'tmux'", pm_type ))), } } /// Direct RPC implementation on Arc> /// This eliminates the need for a wrapper struct #[async_trait] impl SupervisorRpcServer for Arc> { async fn register_runner(&self, params: RegisterRunnerParams) -> RpcResult { debug!("OpenRPC request: register_runner with params: {:?}", params); let mut supervisor = self.lock().await; supervisor .register_runner(¶ms.secret, ¶ms.name, ¶ms.queue) .await .map_err(runner_error_to_rpc_error)?; // Return the runner name that was registered Ok(params.name) } async fn jobs_create(&self, params: RunJobParams) -> RpcResult { debug!("OpenRPC request: jobs.create with params: {:?}", params); let mut supervisor = self.lock().await; let job_id = supervisor .create_job(¶ms.secret, params.job) .await .map_err(runner_error_to_rpc_error)?; Ok(job_id) } async fn jobs_list(&self) -> RpcResult> { debug!("OpenRPC request: jobs.list"); let supervisor = self.lock().await; supervisor .list_all_jobs() .await .map_err(runner_error_to_rpc_error) } async fn job_run(&self, params: RunJobParams) -> RpcResult { debug!("OpenRPC request: job.run with params: {:?}", params); let mut supervisor = self.lock().await; match supervisor .run_job(¶ms.secret, params.job) .await .map_err(runner_error_to_rpc_error)? { Some(output) => Ok(JobResult::Success { success: output }), None => Ok(JobResult::Error { error: "Job execution failed".to_string() }) } } async fn job_start(&self, params: StartJobParams) -> RpcResult<()> { debug!("OpenRPC request: job.start with params: {:?}", params); let mut supervisor = self.lock().await; supervisor .start_job(¶ms.secret, ¶ms.job_id) .await .map_err(runner_error_to_rpc_error) } async fn job_status(&self, job_id: String) -> RpcResult { debug!("OpenRPC request: job.status with job_id: {}", job_id); let supervisor = self.lock().await; let status = supervisor .get_job_status(&job_id) .await .map_err(runner_error_to_rpc_error)?; Ok(status) } async fn job_result(&self, job_id: String) -> RpcResult { debug!("OpenRPC request: job.result with job_id: {}", job_id); let supervisor = self.lock().await; match supervisor .get_job_result(&job_id) .await .map_err(runner_error_to_rpc_error)? { Some(result) => { if result.starts_with("Error:") { Ok(JobResult::Error { error: result }) } else { Ok(JobResult::Success { success: result }) } }, None => Ok(JobResult::Error { error: "Job result not available".to_string() }) } } async fn job_stop(&self, params: StopJobParams) -> RpcResult<()> { debug!("OpenRPC request: job.stop with params: {:?}", params); let mut supervisor = self.lock().await; supervisor .stop_job(¶ms.job_id) .await .map_err(runner_error_to_rpc_error) } async fn job_delete(&self, params: DeleteJobParams) -> RpcResult<()> { debug!("OpenRPC request: job.delete with params: {:?}", params); let mut supervisor = self.lock().await; supervisor .delete_job(¶ms.job_id) .await .map_err(runner_error_to_rpc_error) } async fn remove_runner(&self, actor_id: String) -> RpcResult<()> { debug!("OpenRPC request: remove_runner with actor_id: {}", actor_id); let mut supervisor = self.lock().await; supervisor .remove_runner(&actor_id) .await .map_err(runner_error_to_rpc_error) } async fn list_runners(&self) -> RpcResult> { debug!("OpenRPC request: list_runners"); let supervisor = self.lock().await; Ok(supervisor.list_runners().into_iter().map(|s| s.to_string()).collect()) } async fn start_runner(&self, actor_id: String) -> RpcResult<()> { debug!("OpenRPC request: start_runner with actor_id: {}", actor_id); let mut supervisor = self.lock().await; supervisor .start_runner(&actor_id) .await .map_err(runner_error_to_rpc_error) } async fn stop_runner(&self, actor_id: String, force: bool) -> RpcResult<()> { debug!("OpenRPC request: stop_runner with actor_id: {}, force: {}", actor_id, force); let mut supervisor = self.lock().await; supervisor .stop_runner(&actor_id, force) .await .map_err(runner_error_to_rpc_error) } async fn get_runner(&self, actor_id: String) -> RpcResult { debug!("OpenRPC request: get_runner with actor_id: {}", actor_id); let supervisor = self.lock().await; match supervisor.get_runner(&actor_id) { Some(runner) => Ok(RunnerWrapper::from(runner)), None => Err(ErrorObjectOwned::owned(-32000, format!("Runner not found: {}", actor_id), None::<()>)), } } async fn get_runner_status(&self, actor_id: String) -> RpcResult { debug!("OpenRPC request: get_runner_status with actor_id: {}", actor_id); let supervisor = self.lock().await; let status = supervisor .get_runner_status(&actor_id) .await .map_err(runner_error_to_rpc_error)?; Ok(status.into()) } async fn get_runner_logs(&self, params: GetLogsParams) -> RpcResult> { debug!("OpenRPC request: get_runner_logs with params: {:?}", params); let supervisor = self.lock().await; let logs = supervisor .get_runner_logs(¶ms.actor_id, params.lines, params.follow) .await .map_err(runner_error_to_rpc_error)?; Ok(logs.into_iter().map(LogInfoWrapper::from).collect()) } async fn queue_job_to_runner(&self, params: QueueJobParams) -> RpcResult<()> { debug!("OpenRPC request: queue_job_to_runner with params: {:?}", params); let mut supervisor = self.lock().await; supervisor .queue_job_to_runner(¶ms.runner, params.job) .await .map_err(runner_error_to_rpc_error) } async fn get_job(&self, job_id: String) -> RpcResult { debug!("OpenRPC request: get_job with job_id: {}", job_id); let supervisor = self.lock().await; supervisor .get_job(&job_id) .await .map_err(runner_error_to_rpc_error) } async fn ping_runner(&self, runner_id: String) -> RpcResult { debug!("OpenRPC request: ping_runner with runner_id: {}", runner_id); let mut supervisor = self.lock().await; supervisor .ping_runner(&runner_id) .await .map_err(runner_error_to_rpc_error) } async fn stop_job(&self, job_id: String) -> RpcResult<()> { debug!("OpenRPC request: stop_job with job_id: {}", job_id); let mut supervisor = self.lock().await; supervisor .stop_job(&job_id) .await .map_err(runner_error_to_rpc_error) } async fn delete_job(&self, job_id: String) -> RpcResult<()> { debug!("OpenRPC request: delete_job with job_id: {}", job_id); let mut supervisor = self.lock().await; supervisor .delete_job(&job_id) .await .map_err(runner_error_to_rpc_error) } async fn queue_and_wait(&self, params: QueueAndWaitParams) -> RpcResult> { debug!("OpenRPC request: queue_and_wait with params: {:?}", params); let mut supervisor = self.lock().await; supervisor .queue_and_wait(¶ms.runner, params.job, params.timeout_secs) .await .map_err(runner_error_to_rpc_error) } async fn get_all_runner_status(&self) -> RpcResult> { debug!("OpenRPC request: get_all_runner_status"); let supervisor = self.lock().await; let statuses = supervisor.get_all_runner_status().await .map_err(runner_error_to_rpc_error)?; Ok(statuses .into_iter() .map(|(actor_id, status)| RunnerStatusResponse { actor_id, status: ProcessStatusWrapper::from(status), }) .collect()) } async fn start_all(&self) -> RpcResult> { debug!("OpenRPC request: start_all"); let mut supervisor = self.lock().await; let results = supervisor.start_all().await; Ok(results .into_iter() .map(|(actor_id, result)| { let status = match result { Ok(_) => "Success".to_string(), Err(e) => format!("Error: {}", e), }; (actor_id, status) }) .collect()) } async fn stop_all(&self, force: bool) -> RpcResult> { debug!("OpenRPC request: stop_all with force: {}", force); let mut supervisor = self.lock().await; let results = supervisor.stop_all(force).await; Ok(results .into_iter() .map(|(actor_id, result)| { let status = match result { Ok(_) => "Success".to_string(), Err(e) => format!("Error: {}", e), }; (actor_id, status) }) .collect()) } async fn get_all_status(&self) -> RpcResult> { debug!("OpenRPC request: get_all_status"); let supervisor = self.lock().await; let statuses = supervisor.get_all_runner_status().await .map_err(runner_error_to_rpc_error)?; Ok(statuses .into_iter() .map(|(actor_id, status)| { let status_str = format!("{:?}", status); (actor_id, status_str) }) .collect()) } async fn add_secret(&self, params: AddSecretParams) -> RpcResult<()> { debug!("OpenRPC request: add_secret, type: {}", params.secret_type); let mut supervisor = self.lock().await; // Verify admin secret if !supervisor.has_admin_secret(¶ms.admin_secret) { return Err(ErrorObject::owned(-32602, "Invalid admin secret", None::<()>)); } match params.secret_type.as_str() { "admin" => { supervisor.add_admin_secret(params.secret_value); } "user" => { supervisor.add_user_secret(params.secret_value); } "register" => { supervisor.add_register_secret(params.secret_value); } _ => { return Err(ErrorObject::owned(-32602, "Invalid secret type. Must be 'admin', 'user', or 'register'", None::<()>)); } } Ok(()) } async fn remove_secret(&self, params: RemoveSecretParams) -> RpcResult<()> { debug!("OpenRPC request: remove_secret, type: {}", params.secret_type); let mut supervisor = self.lock().await; // Verify admin secret if !supervisor.has_admin_secret(¶ms.admin_secret) { return Err(ErrorObject::owned(-32602, "Invalid admin secret", None::<()>)); } match params.secret_type.as_str() { "admin" => { supervisor.remove_admin_secret(¶ms.secret_value); } "user" => { supervisor.remove_user_secret(¶ms.secret_value); } "register" => { supervisor.remove_register_secret(¶ms.secret_value); } _ => { return Err(ErrorObject::owned(-32602, "Invalid secret type. Must be 'admin', 'user', or 'register'", None::<()>)); } } Ok(()) } async fn list_secrets(&self, params: ListSecretsParams) -> RpcResult { debug!("OpenRPC request: list_secrets"); let supervisor = self.lock().await; // Verify admin secret if !supervisor.has_admin_secret(¶ms.admin_secret) { return Err(ErrorObject::owned(-32602, "Invalid admin secret", None::<()>)); } Ok(SupervisorInfoResponse { server_url: "http://127.0.0.1:3030".to_string(), admin_secrets_count: supervisor.admin_secrets_count(), user_secrets_count: supervisor.user_secrets_count(), register_secrets_count: supervisor.register_secrets_count(), runners_count: supervisor.runners_count(), }) } async fn list_admin_secrets(&self, admin_secret: String) -> RpcResult> { debug!("OpenRPC request: list_admin_secrets"); let supervisor = self.lock().await; // Verify admin secret if !supervisor.has_admin_secret(&admin_secret) { return Err(ErrorObject::owned(-32602, "Invalid admin secret", None::<()>)); } Ok(supervisor.get_admin_secrets()) } async fn list_user_secrets(&self, admin_secret: String) -> RpcResult> { debug!("OpenRPC request: list_user_secrets"); let supervisor = self.lock().await; // Verify admin secret if !supervisor.has_admin_secret(&admin_secret) { return Err(ErrorObject::owned(-32602, "Invalid admin secret", None::<()>)); } Ok(supervisor.get_user_secrets()) } async fn list_register_secrets(&self, admin_secret: String) -> RpcResult> { debug!("OpenRPC request: list_register_secrets"); let supervisor = self.lock().await; // Verify admin secret if !supervisor.has_admin_secret(&admin_secret) { return Err(ErrorObject::owned(-32602, "Invalid admin secret", None::<()>)); } Ok(supervisor.get_register_secrets()) } async fn get_supervisor_info(&self, admin_secret: String) -> RpcResult { debug!("OpenRPC request: get_supervisor_info"); let supervisor = self.lock().await; // Verify admin secret if !supervisor.has_admin_secret(&admin_secret) { return Err(ErrorObject::owned(-32602, "Invalid admin secret", None::<()>)); } Ok(SupervisorInfoResponse { server_url: "http://127.0.0.1:3030".to_string(), admin_secrets_count: supervisor.admin_secrets_count(), user_secrets_count: supervisor.user_secrets_count(), register_secrets_count: supervisor.register_secrets_count(), runners_count: supervisor.runners_count(), }) } async fn rpc_discover(&self) -> RpcResult { debug!("OpenRPC request: rpc.discover"); // Read OpenRPC specification from docs/openrpc.json match load_openrpc_spec() { Ok(spec) => Ok(spec), Err(e) => { error!("Failed to load OpenRPC specification: {}", e); // Fallback to a minimal spec if file loading fails Ok(serde_json::json!({ "openrpc": "1.3.2", "info": { "title": "Hero Supervisor OpenRPC API", "version": "1.0.0", "description": "OpenRPC API for managing Hero Supervisor runners and jobs" }, "methods": [], "error": "Failed to load full specification" })) } } } } /// Start the OpenRPC server with a default supervisor pub async fn start_server(addr: SocketAddr) -> anyhow::Result { let supervisor = Arc::new(Mutex::new(Supervisor::default())); start_server_with_supervisor(addr, supervisor).await } /// Start the OpenRPC server with an existing supervisor instance pub async fn start_server_with_supervisor( addr: SocketAddr, supervisor: Arc>, ) -> anyhow::Result { let server = Server::builder().build(addr).await?; let handle = server.start(supervisor.into_rpc()); Ok(handle) } /// Start HTTP OpenRPC server (Unix socket support would require additional dependencies) pub async fn start_http_openrpc_server( supervisor: Arc>, bind_address: &str, port: u16, ) -> anyhow::Result { let http_addr: SocketAddr = format!("{}:{}", bind_address, port).parse()?; // Configure CORS to allow requests from the admin UI let cors = CorsLayer::new() .allow_origin(Any) .allow_headers(Any) .allow_methods(Any); // Start HTTP server with CORS let http_server = Server::builder() .set_http_middleware(tower::ServiceBuilder::new().layer(cors)) .build(http_addr) .await?; let http_handle = http_server.start(supervisor.into_rpc()); info!("OpenRPC HTTP server running at http://{} with CORS enabled", http_addr); Ok(http_handle) } /// Simplified server startup function for supervisor binary pub async fn start_openrpc_servers( supervisor: Arc>, bind_address: &str, port: u16, ) -> Result<(), Box> { let bind_address = bind_address.to_string(); tokio::spawn(async move { match start_http_openrpc_server(supervisor, &bind_address, port).await { Ok(http_handle) => { info!("OpenRPC server started successfully"); // Keep the server running http_handle.stopped().await; error!("OpenRPC server stopped unexpectedly"); } Err(e) => { error!("Failed to start OpenRPC server: {}", e); } } }); // Give the server a moment to start up tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; Ok(()) } #[cfg(test)] mod tests { use super::*; use crate::supervisor::Supervisor; #[tokio::test] async fn test_supervisor_rpc_creation() { // Test that we can create a supervisor and use it with RPC use crate::supervisor::SupervisorBuilder; let supervisor = SupervisorBuilder::new() .redis_url("redis://localhost:6379") .namespace("test") .build() .await; // Just test that we can build a supervisor assert!(supervisor.is_ok() || supervisor.is_err()); // Either way is fine for this test } #[test] fn test_process_manager_type_parsing() { assert!(parse_process_manager_type("simple", None).is_ok()); assert!(parse_process_manager_type("tmux", Some("session".to_string())).is_ok()); assert!(parse_process_manager_type("Simple", None).is_ok()); assert!(parse_process_manager_type("TMUX", Some("session".to_string())).is_ok()); assert!(parse_process_manager_type("invalid", None).is_err()); } #[tokio::test] async fn test_job_api_methods() { let supervisor = Arc::new(Mutex::new(Supervisor::default())); let mut sup = supervisor.lock().await; sup.add_user_secret("test-secret".to_string()); drop(sup); // Test jobs.create let job = crate::job::JobBuilder::new() .caller_id("test") .context_id("test") .payload("test") .runner("test_runner") .executor("osis") .build() .unwrap(); let params = RunJobParams { secret: "test-secret".to_string(), job: job.clone(), }; let result = supervisor.jobs_create(params).await; // Should work or fail gracefully without Redis assert!(result.is_ok() || result.is_err()); // Test job.start let start_params = StartJobParams { secret: "test-secret".to_string(), job_id: "test-job".to_string(), }; let result = supervisor.job_start(start_params).await; // Should fail gracefully without Redis/job assert!(result.is_err()); // Test invalid secret let invalid_params = StartJobParams { secret: "invalid".to_string(), job_id: "test-job".to_string(), }; let result = supervisor.job_start(invalid_params).await; assert!(result.is_err()); } #[test] fn test_job_result_serialization() { let success = JobResult::Success { success: "test output".to_string() }; let json = serde_json::to_string(&success).unwrap(); assert!(json.contains("success")); assert!(json.contains("test output")); let error = JobResult::Error { error: "test error".to_string() }; let json = serde_json::to_string(&error).unwrap(); assert!(json.contains("error")); assert!(json.contains("test error")); } #[test] fn test_job_status_response_serialization() { let status = JobStatusResponse { job_id: "test-job".to_string(), status: "running".to_string(), created_at: "2023-01-01T00:00:00Z".to_string(), started_at: Some("2023-01-01T00:00:05Z".to_string()), completed_at: None, }; let json = serde_json::to_string(&status).unwrap(); assert!(json.contains("test-job")); assert!(json.contains("running")); assert!(json.contains("2023-01-01T00:00:00Z")); let deserialized: JobStatusResponse = serde_json::from_str(&json).unwrap(); assert_eq!(deserialized.job_id, "test-job"); assert_eq!(deserialized.status, "running"); } }