Files
supervisor/src/openrpc.rs

1300 lines
45 KiB
Rust

//! OpenRPC server implementation.
use jsonrpsee::{
core::{RpcResult, async_trait},
proc_macros::rpc,
server::{Server, ServerHandle},
types::{ErrorObject, ErrorObjectOwned},
};
use tower_http::cors::{CorsLayer, Any};
use anyhow;
use log::{debug, info, error};
use crate::supervisor::Supervisor;
use crate::runner::{Runner, RunnerError};
use crate::runner::{ProcessStatus, LogInfo};
use crate::job::Job;
use crate::ProcessManagerType;
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::sync::Arc;
use std::fs;
use tokio::sync::Mutex;
/// Load OpenRPC specification from docs/openrpc.json
fn load_openrpc_spec() -> Result<serde_json::Value, Box<dyn std::error::Error>> {
// Try to find the openrpc.json file relative to the current working directory
let possible_paths = [
"docs/openrpc.json",
"../docs/openrpc.json",
"../../docs/openrpc.json",
"./supervisor/docs/openrpc.json",
];
for path in &possible_paths {
if let Ok(content) = fs::read_to_string(path) {
match serde_json::from_str(&content) {
Ok(spec) => {
debug!("Loaded OpenRPC specification from: {}", path);
return Ok(spec);
}
Err(e) => {
error!("Failed to parse OpenRPC JSON from {}: {}", path, e);
}
}
}
}
Err("Could not find or parse docs/openrpc.json".into())
}
/// Helper function to convert RunnerError to RPC error
fn runner_error_to_rpc_error(err: RunnerError) -> ErrorObject<'static> {
ErrorObject::owned(
-32603, // Internal error code
format!("Supervisor error: {}", err),
None::<()>,
)
}
/// Helper function to create invalid params error
fn invalid_params_error(msg: &str) -> ErrorObject<'static> {
ErrorObject::owned(
-32602, // Invalid params error code
format!("Invalid parameters: {}", msg),
None::<()>,
)
}
/// Request parameters for registering a new runner
/// The secret is extracted from Authorization header
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct RegisterRunnerParams {
pub name: String,
}
/// Request parameters for runner management operations
/// TODO: Move secret to HTTP Authorization header for better security
#[derive(Debug, Deserialize, Serialize)]
pub struct RunnerManagementParams {
pub secret: String,
pub actor_id: String,
}
/// Request parameters for stopping a runner
/// TODO: Move secret to HTTP Authorization header for better security
#[derive(Debug, Deserialize, Serialize)]
pub struct StopRunnerParams {
pub secret: String,
pub actor_id: String,
pub force: bool,
}
/// Request parameters for adding a runner with configuration
/// TODO: Move secret to HTTP Authorization header for better security
#[derive(Debug, Deserialize, Serialize)]
pub struct AddRunnerParams {
pub secret: String,
pub config: RunnerConfig,
}
/// Runner configuration
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct RunnerConfig {
pub name: String,
pub command: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub env: Option<std::collections::HashMap<String, String>>,
}
/// Request parameters for running a job
/// The secret is extracted from Authorization header
#[derive(Debug, Deserialize, Serialize)]
pub struct RunJobParams {
pub job: Job,
}
/// Request parameters for starting a job
#[derive(Debug, Deserialize, Serialize)]
pub struct StartJobParams {
pub job_id: String,
}
/// Job result response
#[derive(Debug, Serialize, Clone)]
#[serde(untagged)]
pub enum JobResult {
Success { success: String },
Error { error: String },
}
/// Job status response
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobStatusResponse {
pub job_id: String,
pub status: String,
}
/// Request parameters for queuing a job
#[derive(Debug, Deserialize, Serialize)]
pub struct QueueJobParams {
pub runner: String,
pub job: Job,
}
/// Request parameters for queue and wait operation
#[derive(Debug, Deserialize, Serialize)]
pub struct QueueAndWaitParams {
pub runner: String,
pub job: Job,
pub timeout_secs: u64,
}
/// Request parameters for stopping a job
#[derive(Debug, Deserialize, Serialize)]
pub struct StopJobParams {
pub secret: String,
pub job_id: String,
}
/// Request parameters for deleting a job
#[derive(Debug, Deserialize, Serialize)]
pub struct DeleteJobParams {
pub job_id: String,
}
/// Request parameters for getting runner logs
#[derive(Debug, Deserialize, Serialize)]
pub struct GetLogsParams {
pub actor_id: String,
pub lines: Option<usize>,
pub follow: bool,
}
/// Request parameters for adding secrets
#[derive(Debug, Deserialize, Serialize)]
pub struct AddSecretParams {
pub admin_secret: String,
pub secret_type: String, // "admin", "user", or "register"
pub secret_value: String,
}
/// Request parameters for removing secrets
#[derive(Debug, Deserialize, Serialize)]
pub struct RemoveSecretParams {
pub admin_secret: String,
pub secret_type: String, // "admin", "user", or "register"
pub secret_value: String,
}
/// Request parameters for listing secrets
#[derive(Debug, Deserialize, Serialize)]
pub struct ListSecretsParams {
pub admin_secret: String,
}
/// Serializable wrapper for ProcessStatus
#[derive(Debug, Serialize, Clone)]
pub enum ProcessStatusWrapper {
Running,
Stopped,
Starting,
Stopping,
Error(String),
}
impl From<ProcessStatus> for ProcessStatusWrapper {
fn from(status: ProcessStatus) -> Self {
match status {
ProcessStatus::NotStarted => ProcessStatusWrapper::Stopped,
ProcessStatus::Starting => ProcessStatusWrapper::Starting,
ProcessStatus::Running => ProcessStatusWrapper::Running,
ProcessStatus::Stopping => ProcessStatusWrapper::Stopping,
ProcessStatus::Stopped => ProcessStatusWrapper::Stopped,
ProcessStatus::Failed => ProcessStatusWrapper::Error("Process failed".to_string()),
ProcessStatus::Error(msg) => ProcessStatusWrapper::Error(msg),
}
}
}
// Note: RunnerStatus is just an alias for ProcessStatus, so we don't need a separate impl
/// Serializable wrapper for Runner
#[derive(Debug, Serialize, Clone)]
pub struct RunnerWrapper {
pub id: String,
pub name: String,
pub command: String,
pub redis_url: String,
}
impl From<&Runner> for RunnerWrapper {
fn from(runner: &Runner) -> Self {
RunnerWrapper {
id: runner.id.clone(),
name: runner.name.clone(),
command: runner.command.to_string_lossy().to_string(),
redis_url: runner.redis_url.clone(),
}
}
}
/// Serializable wrapper for LogInfo
#[derive(Debug, Serialize, Clone)]
pub struct LogInfoWrapper {
pub timestamp: String,
pub level: String,
pub message: String,
}
/// Thread-local storage for the current request's API key
thread_local! {
static CURRENT_API_KEY: std::cell::RefCell<Option<String>> = std::cell::RefCell::new(None);
}
/// Set the current API key for this request
pub fn set_current_api_key(key: Option<String>) {
CURRENT_API_KEY.with(|k| {
*k.borrow_mut() = key;
});
}
/// Get the current API key for this request
pub fn get_current_api_key() -> Option<String> {
CURRENT_API_KEY.with(|k| k.borrow().clone())
}
impl From<LogInfo> for LogInfoWrapper {
fn from(log: crate::runner::LogInfo) -> Self {
LogInfoWrapper {
timestamp: log.timestamp,
level: log.level,
message: log.message,
}
}
}
/// Response for runner status queries
#[derive(Debug, Serialize, Clone)]
pub struct RunnerStatusResponse {
pub actor_id: String,
pub status: ProcessStatusWrapper,
}
/// Response for supervisor info
#[derive(Debug, Serialize, Clone)]
pub struct SupervisorInfoResponse {
pub server_url: String,
pub admin_secrets_count: usize,
pub user_secrets_count: usize,
pub register_secrets_count: usize,
pub runners_count: usize,
}
/// Request parameters for auth verification
/// Empty - the key is extracted from Authorization header
#[derive(Debug, Deserialize, Serialize, Default)]
pub struct AuthVerifyParams {}
/// Request parameters for creating API keys
#[derive(Debug, Deserialize, Serialize)]
pub struct CreateApiKeyParams {
pub name: String,
pub scope: String, // "admin", "registrar", or "user"
}
/// Request parameters for removing API keys
#[derive(Debug, Deserialize, Serialize)]
pub struct RemoveApiKeyParams {
pub key: String,
}
/// Request parameters for listing API keys - empty, uses header auth
#[derive(Debug, Deserialize, Serialize, Default)]
pub struct ListApiKeysParams {}
/// OpenRPC trait defining all supervisor methods
#[rpc(server)]
pub trait SupervisorRpc {
/// Register a new runner with secret-based authentication
#[method(name = "register_runner")]
async fn register_runner(&self, name: String) -> RpcResult<String>;
/// Create a job without queuing it to a runner
#[method(name = "jobs.create")]
async fn jobs_create(&self, params: RunJobParams) -> RpcResult<String>;
/// List all jobs
#[method(name = "jobs.list")]
async fn jobs_list(&self) -> RpcResult<Vec<Job>>;
/// Run a job on the appropriate runner and return the result
#[method(name = "job.run")]
async fn job_run(&self, params: RunJobParams) -> RpcResult<JobResult>;
/// Start a previously created job by queuing it to its assigned runner
#[method(name = "job.start")]
async fn job_start(&self, params: StartJobParams) -> RpcResult<()>;
/// Get the current status of a job
#[method(name = "job.status")]
async fn job_status(&self, job_id: String) -> RpcResult<JobStatusResponse>;
/// Get the result of a completed job (blocks until result is available)
#[method(name = "job.result")]
async fn job_result(&self, job_id: String) -> RpcResult<JobResult>;
/// Stop a running job
#[method(name = "job.stop")]
async fn job_stop(&self, params: StopJobParams) -> RpcResult<()>;
/// Delete a job from the system
#[method(name = "job.delete")]
async fn job_delete(&self, params: DeleteJobParams) -> RpcResult<()>;
/// Remove a runner from the supervisor
#[method(name = "remove_runner")]
async fn remove_runner(&self, actor_id: String) -> RpcResult<()>;
/// List all runner IDs
#[method(name = "list_runners")]
async fn list_runners(&self) -> RpcResult<Vec<String>>;
/// Start a specific runner
#[method(name = "start_runner")]
async fn start_runner(&self, params: RunnerManagementParams) -> RpcResult<()>;
/// Stop a specific runner
#[method(name = "stop_runner")]
async fn stop_runner(&self, params: StopRunnerParams) -> RpcResult<()>;
/// Add a runner with configuration
#[method(name = "add_runner")]
async fn add_runner(&self, params: AddRunnerParams) -> RpcResult<()>;
/// Get a specific runner by ID
#[method(name = "get_runner")]
async fn get_runner(&self, actor_id: String) -> RpcResult<RunnerWrapper>;
/// Get the status of a specific runner
#[method(name = "get_runner_status")]
async fn get_runner_status(&self, params: RunnerManagementParams) -> RpcResult<ProcessStatusWrapper>;
/// Get logs for a specific runner
#[method(name = "get_runner_logs")]
async fn get_runner_logs(&self, params: GetLogsParams) -> RpcResult<Vec<LogInfoWrapper>>;
/// Queue a job to a specific runner
#[method(name = "queue_job_to_runner")]
async fn queue_job_to_runner(&self, params: QueueJobParams) -> RpcResult<()>;
/// Get a job by job ID
#[method(name = "get_job")]
async fn get_job(&self, job_id: String) -> RpcResult<Job>;
/// Ping a runner (dispatch a ping job)
#[method(name = "ping_runner")]
async fn ping_runner(&self, runner_id: String) -> RpcResult<String>;
/// Stop a job
#[method(name = "stop_job")]
async fn stop_job(&self, job_id: String) -> RpcResult<()>;
/// Delete a job
#[method(name = "delete_job")]
async fn delete_job(&self, job_id: String) -> RpcResult<()>;
/// Get logs for a specific job
#[method(name = "get_job_logs")]
async fn get_job_logs(&self, job_id: String, lines: Option<usize>) -> RpcResult<Vec<String>>;
/// Queue a job to a specific runner and wait for the result
#[method(name = "queue_and_wait")]
async fn queue_and_wait(&self, params: QueueAndWaitParams) -> RpcResult<Option<String>>;
/// Get status of all runners
#[method(name = "get_all_runner_status")]
async fn get_all_runner_status(&self) -> RpcResult<Vec<RunnerStatusResponse>>;
/// Start all runners
#[method(name = "start_all")]
async fn start_all(&self) -> RpcResult<Vec<(String, String)>>;
/// Stop all runners
#[method(name = "stop_all")]
async fn stop_all(&self, force: bool) -> RpcResult<Vec<(String, String)>>;
/// Get status of all runners (alternative format)
#[method(name = "get_all_status")]
async fn get_all_status(&self) -> RpcResult<Vec<(String, String)>>;
/// Add a secret to the supervisor (admin, user, or register)
#[method(name = "add_secret")]
async fn add_secret(&self, params: AddSecretParams) -> RpcResult<()>;
/// Remove a secret from the supervisor
#[method(name = "remove_secret")]
async fn remove_secret(&self, params: RemoveSecretParams) -> RpcResult<()>;
/// List all secrets (returns counts only for security)
#[method(name = "list_secrets")]
async fn list_secrets(&self, params: ListSecretsParams) -> RpcResult<SupervisorInfoResponse>;
/// List admin secrets (returns actual secret values)
#[method(name = "list_admin_secrets")]
async fn list_admin_secrets(&self, admin_secret: String) -> RpcResult<Vec<String>>;
/// List user secrets (returns actual secret values)
#[method(name = "list_user_secrets")]
async fn list_user_secrets(&self, admin_secret: String) -> RpcResult<Vec<String>>;
/// List register secrets (returns actual secret values)
#[method(name = "list_register_secrets")]
async fn list_register_secrets(&self, admin_secret: String) -> RpcResult<Vec<String>>;
/// Get supervisor information and statistics
#[method(name = "get_supervisor_info")]
async fn get_supervisor_info(&self, admin_secret: String) -> RpcResult<SupervisorInfoResponse>;
/// Verify an API key and return its metadata
#[method(name = "auth.verify")]
async fn auth_verify(&self) -> RpcResult<crate::auth::AuthVerifyResponse>;
/// Create a new API key (admin only)
#[method(name = "auth.create_key")]
async fn auth_create_key(&self, name: String, scope: String) -> RpcResult<crate::auth::ApiKey>;
/// Remove an API key (admin only)
#[method(name = "auth.remove_key")]
async fn auth_remove_key(&self, key: String) -> RpcResult<bool>;
/// List all API keys (admin only)
#[method(name = "auth.list_keys")]
async fn auth_list_keys(&self) -> RpcResult<Vec<crate::auth::ApiKey>>;
/// OpenRPC discovery method - returns the OpenRPC document describing this API
#[method(name = "rpc.discover")]
async fn rpc_discover(&self) -> RpcResult<serde_json::Value>;
}
/// Helper function to parse process manager type from string
fn parse_process_manager_type(pm_type: &str, session_name: Option<String>) -> Result<ProcessManagerType, ErrorObject<'static>> {
match pm_type.to_lowercase().as_str() {
"simple" => Ok(ProcessManagerType::Simple),
"tmux" => {
let session = session_name.unwrap_or_else(|| "default_session".to_string());
Ok(ProcessManagerType::Tmux(session))
},
_ => Err(invalid_params_error(&format!(
"Invalid process manager type: {}. Must be 'simple' or 'tmux'",
pm_type
))),
}
}
/// Direct RPC implementation on Arc<Mutex<Supervisor>>
/// This eliminates the need for a wrapper struct
#[async_trait]
impl SupervisorRpcServer for Arc<Mutex<Supervisor>> {
async fn register_runner(&self, name: String) -> RpcResult<String> {
debug!("OpenRPC request: register_runner with name: {}", name);
// Get API key from Authorization header
let key = get_current_api_key()
.ok_or_else(|| ErrorObject::owned(-32602, "Missing Authorization header", None::<()>))?;
let mut supervisor = self.lock().await;
// register_runner now handles API key verification internally
supervisor
.register_runner(&key, &name, &name)
.await
.map_err(runner_error_to_rpc_error)?;
// Return the runner name that was registered
Ok(name)
}
async fn jobs_create(&self, params: RunJobParams) -> RpcResult<String> {
debug!("OpenRPC request: jobs.create with params: {:?}", params);
// Get secret from Authorization header
let secret = get_current_api_key()
.ok_or_else(|| ErrorObject::owned(-32602, "Missing Authorization header", None::<()>))?;
let mut supervisor = self.lock().await;
let job_id = supervisor
.create_job(&secret, params.job)
.await
.map_err(runner_error_to_rpc_error)?;
Ok(job_id)
}
async fn jobs_list(&self) -> RpcResult<Vec<Job>> {
debug!("OpenRPC request: jobs.list");
let supervisor = self.lock().await;
supervisor
.list_all_jobs()
.await
.map_err(runner_error_to_rpc_error)
}
async fn job_run(&self, params: RunJobParams) -> RpcResult<JobResult> {
debug!("OpenRPC request: job.run with params: {:?}", params);
// Get secret from Authorization header
let secret = get_current_api_key()
.ok_or_else(|| ErrorObject::owned(-32602, "Missing Authorization header", None::<()>))?;
let mut supervisor = self.lock().await;
match supervisor
.run_job(&secret, params.job)
.await
.map_err(runner_error_to_rpc_error)? {
Some(output) => Ok(JobResult::Success { success: output }),
None => Ok(JobResult::Error { error: "Job execution failed".to_string() })
}
}
async fn job_start(&self, params: StartJobParams) -> RpcResult<()> {
debug!("OpenRPC request: job.start with params: {:?}", params);
// Get secret from Authorization header
let secret = get_current_api_key()
.ok_or_else(|| ErrorObject::owned(-32602, "Missing Authorization header", None::<()>))?;
let mut supervisor = self.lock().await;
supervisor
.start_job(&secret, &params.job_id)
.await
.map_err(runner_error_to_rpc_error)
}
async fn job_status(&self, job_id: String) -> RpcResult<JobStatusResponse> {
debug!("OpenRPC request: job.status with job_id: {}", job_id);
let supervisor = self.lock().await;
let status = supervisor
.get_job_status(&job_id)
.await
.map_err(runner_error_to_rpc_error)?;
Ok(status)
}
async fn job_result(&self, job_id: String) -> RpcResult<JobResult> {
debug!("OpenRPC request: job.result with job_id: {}", job_id);
let supervisor = self.lock().await;
match supervisor
.get_job_result(&job_id)
.await
.map_err(runner_error_to_rpc_error)? {
Some(result) => {
if result.starts_with("Error:") {
Ok(JobResult::Error { error: result })
} else {
Ok(JobResult::Success { success: result })
}
},
None => Ok(JobResult::Error { error: "Job result not available".to_string() })
}
}
async fn job_stop(&self, params: StopJobParams) -> RpcResult<()> {
debug!("OpenRPC request: job.stop with params: {:?}", params);
let mut supervisor = self.lock().await;
supervisor
.stop_job(&params.job_id)
.await
.map_err(runner_error_to_rpc_error)
}
async fn job_delete(&self, params: DeleteJobParams) -> RpcResult<()> {
debug!("OpenRPC request: job.delete with params: {:?}", params);
// Get secret from Authorization header
let secret = get_current_api_key()
.ok_or_else(|| ErrorObject::owned(-32602, "Missing Authorization header", None::<()>))?;
let mut supervisor = self.lock().await;
supervisor
.delete_job_with_auth(&secret, &params.job_id)
.await
.map_err(runner_error_to_rpc_error)
}
async fn remove_runner(&self, actor_id: String) -> RpcResult<()> {
debug!("OpenRPC request: remove_runner with actor_id: {}", actor_id);
let mut supervisor = self.lock().await;
supervisor
.remove_runner(&actor_id)
.await
.map_err(runner_error_to_rpc_error)
}
async fn list_runners(&self) -> RpcResult<Vec<String>> {
debug!("OpenRPC request: list_runners");
let supervisor = self.lock().await;
Ok(supervisor.list_runners().into_iter().map(|s| s.to_string()).collect())
}
async fn start_runner(&self, params: RunnerManagementParams) -> RpcResult<()> {
debug!("OpenRPC request: start_runner with params: {:?}", params);
// TODO: Verify secret authorization
let mut supervisor = self.lock().await;
supervisor
.start_runner(&params.actor_id)
.await
.map_err(runner_error_to_rpc_error)
}
async fn stop_runner(&self, params: StopRunnerParams) -> RpcResult<()> {
debug!("OpenRPC request: stop_runner with params: {:?}", params);
// TODO: Verify secret authorization
let mut supervisor = self.lock().await;
supervisor
.stop_runner(&params.actor_id, params.force)
.await
.map_err(runner_error_to_rpc_error)
}
async fn add_runner(&self, params: AddRunnerParams) -> RpcResult<()> {
debug!("OpenRPC request: add_runner with params: {:?}", params);
// TODO: Verify secret authorization
// TODO: Implement actual runner addition logic with config
// For now, just register the runner by name
let mut supervisor = self.lock().await;
supervisor
.register_runner(&params.secret, &params.config.name, &params.config.name)
.await
.map_err(runner_error_to_rpc_error)?;
Ok(())
}
async fn get_runner(&self, actor_id: String) -> RpcResult<RunnerWrapper> {
debug!("OpenRPC request: get_runner with actor_id: {}", actor_id);
let supervisor = self.lock().await;
match supervisor.get_runner(&actor_id) {
Some(runner) => Ok(RunnerWrapper::from(runner)),
None => Err(ErrorObjectOwned::owned(-32000, format!("Runner not found: {}", actor_id), None::<()>)),
}
}
async fn get_runner_status(&self, params: RunnerManagementParams) -> RpcResult<ProcessStatusWrapper> {
debug!("OpenRPC request: get_runner_status with params: {:?}", params);
// TODO: Verify secret authorization
let supervisor = self.lock().await;
let status = supervisor
.get_runner_status(&params.actor_id)
.await
.map_err(runner_error_to_rpc_error)?;
Ok(ProcessStatusWrapper::from(status))
}
async fn get_runner_logs(&self, params: GetLogsParams) -> RpcResult<Vec<LogInfoWrapper>> {
debug!("OpenRPC request: get_runner_logs with params: {:?}", params);
let supervisor = self.lock().await;
let logs = supervisor
.get_runner_logs(&params.actor_id, params.lines, params.follow)
.await
.map_err(runner_error_to_rpc_error)?;
Ok(logs.into_iter().map(LogInfoWrapper::from).collect())
}
async fn queue_job_to_runner(&self, params: QueueJobParams) -> RpcResult<()> {
debug!("OpenRPC request: queue_job_to_runner with params: {:?}", params);
let mut supervisor = self.lock().await;
supervisor
.queue_job_to_runner(&params.runner, params.job)
.await
.map_err(runner_error_to_rpc_error)
}
async fn get_job(&self, job_id: String) -> RpcResult<Job> {
debug!("OpenRPC request: get_job with job_id: {}", job_id);
let supervisor = self.lock().await;
supervisor
.get_job(&job_id)
.await
.map_err(runner_error_to_rpc_error)
}
async fn ping_runner(&self, runner_id: String) -> RpcResult<String> {
debug!("OpenRPC request: ping_runner with runner_id: {}", runner_id);
let mut supervisor = self.lock().await;
supervisor
.ping_runner(&runner_id)
.await
.map_err(runner_error_to_rpc_error)
}
async fn stop_job(&self, job_id: String) -> RpcResult<()> {
debug!("OpenRPC request: stop_job with job_id: {}", job_id);
let mut supervisor = self.lock().await;
supervisor
.stop_job(&job_id)
.await
.map_err(runner_error_to_rpc_error)
}
async fn delete_job(&self, job_id: String) -> RpcResult<()> {
debug!("OpenRPC request: delete_job with job_id: {}", job_id);
let mut supervisor = self.lock().await;
supervisor
.delete_job(&job_id)
.await
.map_err(runner_error_to_rpc_error)
}
async fn get_job_logs(&self, job_id: String, lines: Option<usize>) -> RpcResult<Vec<String>> {
debug!("OpenRPC request: get_job_logs with job_id: {}, lines: {:?}", job_id, lines);
let supervisor = self.lock().await;
supervisor
.get_job_logs(&job_id, lines)
.await
.map_err(runner_error_to_rpc_error)
}
async fn queue_and_wait(&self, params: QueueAndWaitParams) -> RpcResult<Option<String>> {
debug!("OpenRPC request: queue_and_wait with params: {:?}", params);
let mut supervisor = self.lock().await;
supervisor
.queue_and_wait(&params.runner, params.job, params.timeout_secs)
.await
.map_err(runner_error_to_rpc_error)
}
async fn get_all_runner_status(&self) -> RpcResult<Vec<RunnerStatusResponse>> {
debug!("OpenRPC request: get_all_runner_status");
let supervisor = self.lock().await;
let statuses = supervisor.get_all_runner_status().await
.map_err(runner_error_to_rpc_error)?;
Ok(statuses
.into_iter()
.map(|(actor_id, status)| RunnerStatusResponse {
actor_id,
status: ProcessStatusWrapper::from(status),
})
.collect())
}
async fn start_all(&self) -> RpcResult<Vec<(String, String)>> {
debug!("OpenRPC request: start_all");
let mut supervisor = self.lock().await;
let results = supervisor.start_all().await;
Ok(results
.into_iter()
.map(|(actor_id, result)| {
let status = match result {
Ok(_) => "Success".to_string(),
Err(e) => format!("Error: {}", e),
};
(actor_id, status)
})
.collect())
}
async fn stop_all(&self, force: bool) -> RpcResult<Vec<(String, String)>> {
debug!("OpenRPC request: stop_all with force: {}", force);
let mut supervisor = self.lock().await;
let results = supervisor.stop_all(force).await;
Ok(results
.into_iter()
.map(|(actor_id, result)| {
let status = match result {
Ok(_) => "Success".to_string(),
Err(e) => format!("Error: {}", e),
};
(actor_id, status)
})
.collect())
}
async fn get_all_status(&self) -> RpcResult<Vec<(String, String)>> {
debug!("OpenRPC request: get_all_status");
let supervisor = self.lock().await;
let statuses = supervisor.get_all_runner_status().await
.map_err(runner_error_to_rpc_error)?;
Ok(statuses
.into_iter()
.map(|(actor_id, status)| {
let status_str = format!("{:?}", status);
(actor_id, status_str)
})
.collect())
}
async fn add_secret(&self, params: AddSecretParams) -> RpcResult<()> {
debug!("OpenRPC request: add_secret, type: {}", params.secret_type);
let mut supervisor = self.lock().await;
// Verify admin secret
if !supervisor.has_admin_secret(&params.admin_secret) {
return Err(ErrorObject::owned(-32602, "Invalid admin secret", None::<()>));
}
match params.secret_type.as_str() {
"admin" => {
supervisor.add_admin_secret(params.secret_value);
}
"user" => {
supervisor.add_user_secret(params.secret_value);
}
"register" => {
supervisor.add_register_secret(params.secret_value);
}
_ => {
return Err(ErrorObject::owned(-32602, "Invalid secret type. Must be 'admin', 'user', or 'register'", None::<()>));
}
}
Ok(())
}
async fn remove_secret(&self, params: RemoveSecretParams) -> RpcResult<()> {
debug!("OpenRPC request: remove_secret, type: {}", params.secret_type);
let mut supervisor = self.lock().await;
// Verify admin secret
if !supervisor.has_admin_secret(&params.admin_secret) {
return Err(ErrorObject::owned(-32602, "Invalid admin secret", None::<()>));
}
match params.secret_type.as_str() {
"admin" => {
supervisor.remove_admin_secret(&params.secret_value);
}
"user" => {
supervisor.remove_user_secret(&params.secret_value);
}
"register" => {
supervisor.remove_register_secret(&params.secret_value);
}
_ => {
return Err(ErrorObject::owned(-32602, "Invalid secret type. Must be 'admin', 'user', or 'register'", None::<()>));
}
}
Ok(())
}
async fn list_secrets(&self, params: ListSecretsParams) -> RpcResult<SupervisorInfoResponse> {
debug!("OpenRPC request: list_secrets");
let supervisor = self.lock().await;
// Verify admin secret
if !supervisor.has_admin_secret(&params.admin_secret) {
return Err(ErrorObject::owned(-32602, "Invalid admin secret", None::<()>));
}
Ok(SupervisorInfoResponse {
server_url: "http://127.0.0.1:3030".to_string(),
admin_secrets_count: supervisor.admin_secrets_count(),
user_secrets_count: supervisor.user_secrets_count(),
register_secrets_count: supervisor.register_secrets_count(),
runners_count: supervisor.runners_count(),
})
}
async fn list_admin_secrets(&self, admin_secret: String) -> RpcResult<Vec<String>> {
debug!("OpenRPC request: list_admin_secrets");
let supervisor = self.lock().await;
// Verify admin secret
if !supervisor.has_admin_secret(&admin_secret) {
return Err(ErrorObject::owned(-32602, "Invalid admin secret", None::<()>));
}
Ok(supervisor.get_admin_secrets())
}
async fn list_user_secrets(&self, admin_secret: String) -> RpcResult<Vec<String>> {
debug!("OpenRPC request: list_user_secrets");
let supervisor = self.lock().await;
// Verify admin secret
if !supervisor.has_admin_secret(&admin_secret) {
return Err(ErrorObject::owned(-32602, "Invalid admin secret", None::<()>));
}
Ok(supervisor.get_user_secrets())
}
async fn list_register_secrets(&self, admin_secret: String) -> RpcResult<Vec<String>> {
debug!("OpenRPC request: list_register_secrets");
let supervisor = self.lock().await;
// Verify admin secret
if !supervisor.has_admin_secret(&admin_secret) {
return Err(ErrorObject::owned(-32602, "Invalid admin secret", None::<()>));
}
Ok(supervisor.get_register_secrets())
}
async fn get_supervisor_info(&self, admin_secret: String) -> RpcResult<SupervisorInfoResponse> {
debug!("OpenRPC request: get_supervisor_info");
let supervisor = self.lock().await;
// Verify admin secret
if !supervisor.has_admin_secret(&admin_secret) {
return Err(ErrorObject::owned(-32602, "Invalid admin secret", None::<()>));
}
Ok(SupervisorInfoResponse {
server_url: "http://127.0.0.1:3030".to_string(),
admin_secrets_count: supervisor.admin_secrets_count(),
user_secrets_count: supervisor.user_secrets_count(),
register_secrets_count: supervisor.register_secrets_count(),
runners_count: supervisor.runners_count(),
})
}
async fn auth_verify(&self) -> RpcResult<crate::auth::AuthVerifyResponse> {
debug!("OpenRPC request: auth.verify");
let supervisor = self.lock().await;
// Get key from thread-local (set by middleware from Authorization header)
let key = get_current_api_key()
.ok_or_else(|| ErrorObject::owned(-32602, "Missing Authorization header", None::<()>))?;
match supervisor.verify_api_key(&key).await {
Some(api_key) => {
Ok(crate::auth::AuthVerifyResponse {
valid: true,
name: api_key.name,
scope: api_key.scope.as_str().to_string(),
})
}
None => {
Ok(crate::auth::AuthVerifyResponse {
valid: false,
name: String::new(),
scope: String::new(),
})
}
}
}
async fn auth_create_key(&self, name: String, scope: String) -> RpcResult<crate::auth::ApiKey> {
debug!("OpenRPC request: auth.create_key");
// Get API key from Authorization header
let key = get_current_api_key()
.ok_or_else(|| ErrorObject::owned(-32602, "Missing Authorization header", None::<()>))?;
let supervisor = self.lock().await;
// Verify admin key
if !supervisor.is_admin_key(&key).await {
return Err(ErrorObject::owned(-32603, "Admin permissions required", None::<()>));
}
// Parse scope
let api_scope = match scope.to_lowercase().as_str() {
"admin" => crate::auth::ApiKeyScope::Admin,
"registrar" => crate::auth::ApiKeyScope::Registrar,
"user" => crate::auth::ApiKeyScope::User,
_ => return Err(ErrorObject::owned(-32602, "Invalid scope. Must be 'admin', 'registrar', or 'user'", None::<()>)),
};
let api_key = supervisor.create_api_key(name, api_scope).await;
Ok(api_key)
}
async fn auth_remove_key(&self, key_to_remove: String) -> RpcResult<bool> {
debug!("OpenRPC request: auth.remove_key");
// Get API key from Authorization header
let key = get_current_api_key()
.ok_or_else(|| ErrorObject::owned(-32602, "Missing Authorization header", None::<()>))?;
let supervisor = self.lock().await;
// Verify admin key
if !supervisor.is_admin_key(&key).await {
return Err(ErrorObject::owned(-32603, "Admin permissions required", None::<()>));
}
Ok(supervisor.remove_api_key(&key_to_remove).await.is_some())
}
async fn auth_list_keys(&self) -> RpcResult<Vec<crate::auth::ApiKey>> {
debug!("OpenRPC request: auth.list_keys");
// Get API key from Authorization header
let key = get_current_api_key()
.ok_or_else(|| ErrorObject::owned(-32602, "Missing Authorization header", None::<()>))?;
let supervisor = self.lock().await;
// Verify admin key
if !supervisor.is_admin_key(&key).await {
return Err(ErrorObject::owned(-32603, "Admin permissions required", None::<()>));
}
Ok(supervisor.list_api_keys().await)
}
async fn rpc_discover(&self) -> RpcResult<serde_json::Value> {
debug!("OpenRPC request: rpc.discover");
// Read OpenRPC specification from docs/openrpc.json
match load_openrpc_spec() {
Ok(spec) => Ok(spec),
Err(e) => {
error!("Failed to load OpenRPC specification: {}", e);
// Fallback to a minimal spec if file loading fails
Ok(serde_json::json!({
"openrpc": "1.3.2",
"info": {
"title": "Hero Supervisor OpenRPC API",
"version": "1.0.0",
"description": "OpenRPC API for managing Hero Supervisor runners and jobs"
},
"methods": [],
"error": "Failed to load full specification"
}))
}
}
}
}
/// Start the OpenRPC server with a default supervisor
pub async fn start_server(addr: SocketAddr) -> anyhow::Result<ServerHandle> {
let supervisor = Arc::new(Mutex::new(Supervisor::default()));
start_server_with_supervisor(addr, supervisor).await
}
/// Start the OpenRPC server with an existing supervisor instance
pub async fn start_server_with_supervisor(
addr: SocketAddr,
supervisor: Arc<Mutex<Supervisor>>,
) -> anyhow::Result<ServerHandle> {
let server = Server::builder().build(addr).await?;
let handle = server.start(supervisor.into_rpc());
Ok(handle)
}
/// HTTP middleware layer to extract Authorization header
#[derive(Clone)]
struct AuthExtractLayer;
impl<S> tower::Layer<S> for AuthExtractLayer {
type Service = AuthExtractService<S>;
fn layer(&self, inner: S) -> Self::Service {
AuthExtractService { inner }
}
}
#[derive(Clone)]
struct AuthExtractService<S> {
inner: S,
}
impl<S, B> tower::Service<hyper::Request<B>> for AuthExtractService<S>
where
S: tower::Service<hyper::Request<B>> + Clone + Send + 'static,
S::Future: Send + 'static,
B: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = std::pin::Pin<Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: hyper::Request<B>) -> Self::Future {
// Extract Authorization header
let api_key = req.headers()
.get("authorization")
.and_then(|h| h.to_str().ok())
.and_then(|h| h.strip_prefix("Bearer "))
.map(|s| s.to_string());
// Store in thread-local
set_current_api_key(api_key);
let mut inner = self.inner.clone();
Box::pin(async move {
inner.call(req).await
})
}
}
/// Start HTTP OpenRPC server (Unix socket support would require additional dependencies)
pub async fn start_http_openrpc_server(
supervisor: Arc<Mutex<Supervisor>>,
bind_address: &str,
port: u16,
) -> anyhow::Result<ServerHandle> {
let http_addr: SocketAddr = format!("{}:{}", bind_address, port).parse()?;
// Configure CORS to allow requests from the admin UI
let cors = CorsLayer::new()
.allow_origin(Any)
.allow_headers(Any)
.allow_methods(Any);
// Build HTTP middleware stack with auth extraction
let http_middleware = tower::ServiceBuilder::new()
.layer(AuthExtractLayer)
.layer(cors);
// Start HTTP server with middleware
let http_server = Server::builder()
.set_http_middleware(http_middleware)
.build(http_addr)
.await?;
let http_handle = http_server.start(supervisor.into_rpc());
info!("OpenRPC HTTP server running at http://{} with CORS enabled", http_addr);
Ok(http_handle)
}
/// Simplified server startup function for supervisor binary
pub async fn start_openrpc_servers(
supervisor: Arc<Mutex<Supervisor>>,
bind_address: &str,
port: u16,
) -> Result<(), Box<dyn std::error::Error>> {
let bind_address = bind_address.to_string();
tokio::spawn(async move {
match start_http_openrpc_server(supervisor, &bind_address, port).await {
Ok(http_handle) => {
info!("OpenRPC server started successfully");
// Keep the server running
http_handle.stopped().await;
error!("OpenRPC server stopped unexpectedly");
}
Err(e) => {
error!("Failed to start OpenRPC server: {}", e);
}
}
});
// Give the server a moment to start up
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::supervisor::Supervisor;
#[tokio::test]
async fn test_supervisor_rpc_creation() {
// Test that we can create a supervisor and use it with RPC
use crate::supervisor::SupervisorBuilder;
let supervisor = SupervisorBuilder::new()
.redis_url("redis://localhost:6379")
.namespace("test")
.build()
.await;
// Just test that we can build a supervisor
assert!(supervisor.is_ok() || supervisor.is_err()); // Either way is fine for this test
}
#[test]
fn test_process_manager_type_parsing() {
assert!(parse_process_manager_type("simple", None).is_ok());
assert!(parse_process_manager_type("tmux", Some("session".to_string())).is_ok());
assert!(parse_process_manager_type("Simple", None).is_ok());
assert!(parse_process_manager_type("TMUX", Some("session".to_string())).is_ok());
assert!(parse_process_manager_type("invalid", None).is_err());
}
#[tokio::test]
async fn test_job_api_methods() {
let supervisor = Arc::new(Mutex::new(Supervisor::default()));
let mut sup = supervisor.lock().await;
sup.add_user_secret("test-secret".to_string());
drop(sup);
// Test jobs.create
let job = crate::job::JobBuilder::new()
.caller_id("test")
.context_id("test")
.payload("test")
.runner("test_runner")
.executor("osis")
.build()
.unwrap();
let params = RunJobParams {
job: job.clone(),
};
// Set the API key in thread-local for the test
set_current_api_key(Some("test-secret".to_string()));
let result = supervisor.jobs_create(params).await;
// Should work or fail gracefully without Redis
assert!(result.is_ok() || result.is_err());
// Test job.start
let start_params = StartJobParams {
job_id: "test-job".to_string(),
};
let result = supervisor.job_start(start_params).await;
// Should fail gracefully without Redis/job
assert!(result.is_err());
// Test invalid secret
let invalid_params = StartJobParams {
job_id: "test-job".to_string(),
};
let result = supervisor.job_start(invalid_params).await;
assert!(result.is_err());
}
#[test]
fn test_job_result_serialization() {
let success = JobResult::Success { success: "test output".to_string() };
let json = serde_json::to_string(&success).unwrap();
assert!(json.contains("success"));
assert!(json.contains("test output"));
let error = JobResult::Error { error: "test error".to_string() };
let json = serde_json::to_string(&error).unwrap();
assert!(json.contains("error"));
assert!(json.contains("test error"));
}
#[test]
fn test_job_status_response_serialization() {
let status = JobStatusResponse {
job_id: "test-job".to_string(),
status: "running".to_string(),
created_at: "2023-01-01T00:00:00Z".to_string(),
started_at: Some("2023-01-01T00:00:05Z".to_string()),
completed_at: None,
};
let json = serde_json::to_string(&status).unwrap();
assert!(json.contains("test-job"));
assert!(json.contains("running"));
assert!(json.contains("2023-01-01T00:00:00Z"));
let deserialized: JobStatusResponse = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.job_id, "test-job");
assert_eq!(deserialized.status, "running");
}
}