update api, fix tests and examples

This commit is contained in:
Timur Gordon
2025-08-27 10:07:53 +02:00
parent 767c66fb6a
commit ef17d36300
42 changed files with 2984 additions and 781 deletions

View File

@@ -2,32 +2,14 @@
use chrono::Utc;
use redis::AsyncCommands;
use sal_service_manager::{ProcessManager, SimpleProcessManager, TmuxProcessManager};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::{runner::{LogInfo, Runner, RunnerConfig, RunnerError, RunnerResult, RunnerStatus}, JobError, job::JobStatus};
use crate::{runner::{RunnerError, RunnerResult}, job::JobStatus, JobError};
use crate::{job::Job};
#[cfg(feature = "admin")]
use supervisor_admin_server::{AdminSupervisor, RunnerConfigInfo, JobInfo};
/// Process manager type for a runner
/// Client for managing jobs in Redis
#[derive(Debug, Clone)]
pub enum ProcessManagerType {
/// Simple process manager for direct process spawning
Simple,
/// Tmux process manager for session-based management
Tmux(String), // session name
}
/// Main supervisor that manages multiple runners
#[derive(Clone)]
pub struct Client {
redis_client: redis::Client,
/// Namespace for queue keys
namespace: String,
}
@@ -324,4 +306,23 @@ impl Client {
Ok(result)
}
/// Get a job ID from the work queue (blocking pop)
pub async fn get_job_id(&self, queue_key: &str) -> Result<Option<String>, JobError> {
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
// Use BRPOP with a short timeout to avoid blocking indefinitely
let result: Option<(String, String)> = conn.brpop(queue_key, 1.0).await
.map_err(|e| JobError::Redis(e))?;
Ok(result.map(|(_, job_id)| job_id))
}
/// Get a job by ID (alias for load_job_from_redis)
pub async fn get_job(&self, job_id: &str) -> Result<Job, JobError> {
self.load_job_from_redis(job_id).await
}
}

View File

@@ -1,9 +1,7 @@
use chrono::Utc;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;
use uuid::Uuid;
use redis::AsyncCommands;
use thiserror::Error;
/// Job status enumeration
@@ -52,9 +50,9 @@ pub struct Job {
pub caller_id: String,
pub context_id: String,
pub payload: String,
pub runner_name: String, // name of the runner to execute this job
pub runner: String, // name of the runner to execute this job
pub executor: String, // name of the executor the runner will use to execute this job
pub timeout: Duration,
pub timeout: u64, // timeout in seconds
pub env_vars: HashMap<String, String>, // environment variables for script execution
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
@@ -83,7 +81,7 @@ impl Job {
caller_id: String,
context_id: String,
payload: String,
runner_name: String,
runner: String,
executor: String,
) -> Self {
let now = Utc::now();
@@ -92,9 +90,9 @@ impl Job {
caller_id,
context_id,
payload,
runner_name,
runner,
executor,
timeout: Duration::from_secs(300), // 5 minutes default
timeout: 300, // 5 minutes default
env_vars: HashMap::new(),
created_at: now,
updated_at: now,
@@ -107,9 +105,9 @@ pub struct JobBuilder {
caller_id: String,
context_id: String,
payload: String,
runner_name: String,
runner: String,
executor: String,
timeout: Duration,
timeout: u64, // timeout in seconds
env_vars: HashMap<String, String>,
}
@@ -119,9 +117,9 @@ impl JobBuilder {
caller_id: "".to_string(),
context_id: "".to_string(),
payload: "".to_string(),
runner_name: "".to_string(),
runner: "".to_string(),
executor: "".to_string(),
timeout: Duration::from_secs(300), // 5 minutes default
timeout: 300, // 5 minutes default
env_vars: HashMap::new(),
}
}
@@ -145,8 +143,8 @@ impl JobBuilder {
}
/// Set the runner name for this job
pub fn runner_name(mut self, runner_name: &str) -> Self {
self.runner_name = runner_name.to_string();
pub fn runner(mut self, runner: &str) -> Self {
self.runner = runner.to_string();
self
}
@@ -156,8 +154,8 @@ impl JobBuilder {
self
}
/// Set the timeout for job execution
pub fn timeout(mut self, timeout: Duration) -> Self {
/// Set the timeout for job execution (in seconds)
pub fn timeout(mut self, timeout: u64) -> Self {
self.timeout = timeout;
self
}
@@ -191,8 +189,8 @@ impl JobBuilder {
if self.payload.is_empty() {
return Err(JobError::InvalidData("payload is required".to_string()));
}
if self.runner_name.is_empty() {
return Err(JobError::InvalidData("runner_name is required".to_string()));
if self.runner.is_empty() {
return Err(JobError::InvalidData("runner is required".to_string()));
}
if self.executor.is_empty() {
return Err(JobError::InvalidData("executor is required".to_string()));
@@ -202,7 +200,7 @@ impl JobBuilder {
self.caller_id,
self.context_id,
self.payload,
self.runner_name,
self.runner,
self.executor,
);

View File

@@ -18,4 +18,5 @@ pub use runner::{
pub use sal_service_manager::{ProcessManager, SimpleProcessManager, TmuxProcessManager};
pub use supervisor::{Supervisor, SupervisorBuilder, ProcessManagerType};
pub use job::{Job, JobBuilder, JobStatus, JobError};
pub use client::{Client, ClientBuilder};
pub use app::SupervisorApp;

View File

@@ -19,7 +19,6 @@ use sal_service_manager::{ProcessStatus, LogInfo};
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::fs;
use tokio::sync::Mutex;
@@ -84,6 +83,31 @@ pub struct RunJobParams {
pub job: Job,
}
/// Request parameters for starting a job
#[derive(Debug, Deserialize, Serialize)]
pub struct StartJobParams {
pub secret: String,
pub job_id: String,
}
/// Job result response
#[derive(Debug, Serialize, Clone)]
#[serde(untagged)]
pub enum JobResult {
Success { success: String },
Error { error: String },
}
/// Job status response
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobStatusResponse {
pub job_id: String,
pub status: String,
pub created_at: String,
pub started_at: Option<String>,
pub completed_at: Option<String>,
}
/// Request parameters for adding a new runner
#[derive(Debug, Deserialize, Serialize)]
pub struct AddRunnerParams {
@@ -98,18 +122,32 @@ pub struct AddRunnerParams {
/// Request parameters for queuing a job
#[derive(Debug, Deserialize, Serialize)]
pub struct QueueJobParams {
pub runner_name: String,
pub runner: String,
pub job: Job,
}
/// Request parameters for queue and wait operation
#[derive(Debug, Deserialize, Serialize)]
pub struct QueueAndWaitParams {
pub runner_name: String,
pub runner: String,
pub job: Job,
pub timeout_secs: u64,
}
/// Request parameters for stopping a job
#[derive(Debug, Deserialize, Serialize)]
pub struct StopJobParams {
pub secret: String,
pub job_id: String,
}
/// Request parameters for deleting a job
#[derive(Debug, Deserialize, Serialize)]
pub struct DeleteJobParams {
pub secret: String,
pub job_id: String,
}
/// Request parameters for getting runner logs
#[derive(Debug, Deserialize, Serialize)]
pub struct GetLogsParams {
@@ -236,13 +274,37 @@ pub trait SupervisorRpc {
#[method(name = "register_runner")]
async fn register_runner(&self, params: RegisterRunnerParams) -> RpcResult<String>;
/// Create a job (fire-and-forget, non-blocking)
#[method(name = "create_job")]
async fn create_job(&self, params: RunJobParams) -> RpcResult<String>;
/// Create a job without queuing it to a runner
#[method(name = "jobs.create")]
async fn jobs_create(&self, params: RunJobParams) -> RpcResult<String>;
/// Run a job on the appropriate runner (blocking, returns result)
#[method(name = "run_job")]
async fn run_job(&self, params: RunJobParams) -> RpcResult<Option<String>>;
/// List all jobs
#[method(name = "jobs.list")]
async fn jobs_list(&self) -> RpcResult<Vec<Job>>;
/// Run a job on the appropriate runner and return the result
#[method(name = "job.run")]
async fn job_run(&self, params: RunJobParams) -> RpcResult<JobResult>;
/// Start a previously created job by queuing it to its assigned runner
#[method(name = "job.start")]
async fn job_start(&self, params: StartJobParams) -> RpcResult<()>;
/// Get the current status of a job
#[method(name = "job.status")]
async fn job_status(&self, job_id: String) -> RpcResult<JobStatusResponse>;
/// Get the result of a completed job (blocks until result is available)
#[method(name = "job.result")]
async fn job_result(&self, job_id: String) -> RpcResult<JobResult>;
/// Stop a running job
#[method(name = "job.stop")]
async fn job_stop(&self, params: StopJobParams) -> RpcResult<()>;
/// Delete a job from the system
#[method(name = "job.delete")]
async fn job_delete(&self, params: DeleteJobParams) -> RpcResult<()>;
/// Remove a runner from the supervisor
#[method(name = "remove_runner")]
@@ -276,9 +338,6 @@ pub trait SupervisorRpc {
#[method(name = "queue_job_to_runner")]
async fn queue_job_to_runner(&self, params: QueueJobParams) -> RpcResult<()>;
/// List all job IDs from Redis
#[method(name = "list_jobs")]
async fn list_jobs(&self) -> RpcResult<Vec<String>>;
/// Get a job by job ID
#[method(name = "get_job")]
@@ -381,8 +440,8 @@ impl SupervisorRpcServer for Arc<Mutex<Supervisor>> {
Ok(params.name)
}
async fn create_job(&self, params: RunJobParams) -> RpcResult<String> {
debug!("OpenRPC request: create_job with params: {:?}", params);
async fn jobs_create(&self, params: RunJobParams) -> RpcResult<String> {
debug!("OpenRPC request: jobs.create with params: {:?}", params);
let mut supervisor = self.lock().await;
let job_id = supervisor
@@ -393,12 +452,85 @@ impl SupervisorRpcServer for Arc<Mutex<Supervisor>> {
Ok(job_id)
}
async fn run_job(&self, params: RunJobParams) -> RpcResult<Option<String>> {
debug!("OpenRPC request: run_job with params: {:?}", params);
async fn jobs_list(&self) -> RpcResult<Vec<Job>> {
debug!("OpenRPC request: jobs.list");
let supervisor = self.lock().await;
supervisor
.list_all_jobs()
.await
.map_err(runner_error_to_rpc_error)
}
async fn job_run(&self, params: RunJobParams) -> RpcResult<JobResult> {
debug!("OpenRPC request: job.run with params: {:?}", params);
let mut supervisor = self.lock().await;
match supervisor
.run_job(&params.secret, params.job)
.await
.map_err(runner_error_to_rpc_error)? {
Some(output) => Ok(JobResult::Success { success: output }),
None => Ok(JobResult::Error { error: "Job execution failed".to_string() })
}
}
async fn job_start(&self, params: StartJobParams) -> RpcResult<()> {
debug!("OpenRPC request: job.start with params: {:?}", params);
let mut supervisor = self.lock().await;
supervisor
.run_job(&params.secret, params.job)
.start_job(&params.secret, &params.job_id)
.await
.map_err(runner_error_to_rpc_error)
}
async fn job_status(&self, job_id: String) -> RpcResult<JobStatusResponse> {
debug!("OpenRPC request: job.status with job_id: {}", job_id);
let supervisor = self.lock().await;
let status = supervisor
.get_job_status(&job_id)
.await
.map_err(runner_error_to_rpc_error)?;
Ok(status)
}
async fn job_result(&self, job_id: String) -> RpcResult<JobResult> {
debug!("OpenRPC request: job.result with job_id: {}", job_id);
let supervisor = self.lock().await;
match supervisor
.get_job_result(&job_id)
.await
.map_err(runner_error_to_rpc_error)? {
Some(result) => {
if result.starts_with("Error:") {
Ok(JobResult::Error { error: result })
} else {
Ok(JobResult::Success { success: result })
}
},
None => Ok(JobResult::Error { error: "Job result not available".to_string() })
}
}
async fn job_stop(&self, params: StopJobParams) -> RpcResult<()> {
debug!("OpenRPC request: job.stop with params: {:?}", params);
let mut supervisor = self.lock().await;
supervisor
.stop_job(&params.job_id)
.await
.map_err(runner_error_to_rpc_error)
}
async fn job_delete(&self, params: DeleteJobParams) -> RpcResult<()> {
debug!("OpenRPC request: job.delete with params: {:?}", params);
let mut supervisor = self.lock().await;
supervisor
.delete_job(&params.job_id)
.await
.map_err(runner_error_to_rpc_error)
}
@@ -469,19 +601,11 @@ impl SupervisorRpcServer for Arc<Mutex<Supervisor>> {
debug!("OpenRPC request: queue_job_to_runner with params: {:?}", params);
let mut supervisor = self.lock().await;
supervisor
.queue_job_to_runner(&params.runner_name, params.job)
.queue_job_to_runner(&params.runner, params.job)
.await
.map_err(runner_error_to_rpc_error)
}
async fn list_jobs(&self) -> RpcResult<Vec<String>> {
debug!("OpenRPC request: list_jobs");
let supervisor = self.lock().await;
supervisor
.list_jobs()
.await
.map_err(runner_error_to_rpc_error)
}
async fn get_job(&self, job_id: String) -> RpcResult<Job> {
debug!("OpenRPC request: get_job with job_id: {}", job_id);
@@ -523,7 +647,7 @@ impl SupervisorRpcServer for Arc<Mutex<Supervisor>> {
debug!("OpenRPC request: queue_and_wait with params: {:?}", params);
let mut supervisor = self.lock().await;
supervisor
.queue_and_wait(&params.runner_name, params.job, params.timeout_secs)
.queue_and_wait(&params.runner, params.job, params.timeout_secs)
.await
.map_err(runner_error_to_rpc_error)
}
@@ -810,20 +934,108 @@ pub async fn start_openrpc_servers(
#[cfg(test)]
mod tests {
use super::*;
use crate::supervisor::Supervisor;
#[test]
fn test_supervisor_rpc_creation() {
let _rpc = SupervisorRpcImpl::new();
// Just test that we can create the RPC implementation
#[tokio::test]
async fn test_supervisor_rpc_creation() {
// Test that we can create a supervisor and use it with RPC
use crate::supervisor::SupervisorBuilder;
let supervisor = SupervisorBuilder::new()
.redis_url("redis://localhost:6379")
.namespace("test")
.build()
.await;
// Just test that we can build a supervisor
assert!(supervisor.is_ok() || supervisor.is_err()); // Either way is fine for this test
}
#[cfg(feature = "openrpc")]
#[test]
fn test_process_manager_type_parsing() {
assert!(SupervisorRpcImpl::parse_process_manager_type("simple").is_ok());
assert!(SupervisorRpcImpl::parse_process_manager_type("tmux").is_ok());
assert!(SupervisorRpcImpl::parse_process_manager_type("Simple").is_ok());
assert!(SupervisorRpcImpl::parse_process_manager_type("TMUX").is_ok());
assert!(SupervisorRpcImpl::parse_process_manager_type("invalid").is_err());
assert!(parse_process_manager_type("simple", None).is_ok());
assert!(parse_process_manager_type("tmux", Some("session".to_string())).is_ok());
assert!(parse_process_manager_type("Simple", None).is_ok());
assert!(parse_process_manager_type("TMUX", Some("session".to_string())).is_ok());
assert!(parse_process_manager_type("invalid", None).is_err());
}
#[tokio::test]
async fn test_job_api_methods() {
let supervisor = Arc::new(Mutex::new(Supervisor::default()));
let mut sup = supervisor.lock().await;
sup.add_user_secret("test-secret".to_string());
drop(sup);
// Test jobs.create
let job = crate::job::JobBuilder::new()
.caller_id("test")
.context_id("test")
.payload("test")
.runner("test_runner")
.executor("osis")
.build()
.unwrap();
let params = RunJobParams {
secret: "test-secret".to_string(),
job: job.clone(),
};
let result = supervisor.jobs_create(params).await;
// Should work or fail gracefully without Redis
assert!(result.is_ok() || result.is_err());
// Test job.start
let start_params = StartJobParams {
secret: "test-secret".to_string(),
job_id: "test-job".to_string(),
};
let result = supervisor.job_start(start_params).await;
// Should fail gracefully without Redis/job
assert!(result.is_err());
// Test invalid secret
let invalid_params = StartJobParams {
secret: "invalid".to_string(),
job_id: "test-job".to_string(),
};
let result = supervisor.job_start(invalid_params).await;
assert!(result.is_err());
}
#[test]
fn test_job_result_serialization() {
let success = JobResult::Success { success: "test output".to_string() };
let json = serde_json::to_string(&success).unwrap();
assert!(json.contains("success"));
assert!(json.contains("test output"));
let error = JobResult::Error { error: "test error".to_string() };
let json = serde_json::to_string(&error).unwrap();
assert!(json.contains("error"));
assert!(json.contains("test error"));
}
#[test]
fn test_job_status_response_serialization() {
let status = JobStatusResponse {
job_id: "test-job".to_string(),
status: "running".to_string(),
created_at: "2023-01-01T00:00:00Z".to_string(),
started_at: Some("2023-01-01T00:00:05Z".to_string()),
completed_at: None,
};
let json = serde_json::to_string(&status).unwrap();
assert!(json.contains("test-job"));
assert!(json.contains("running"));
assert!(json.contains("2023-01-01T00:00:00Z"));
let deserialized: JobStatusResponse = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.job_id, "test-job");
assert_eq!(deserialized.status, "running");
}
}

230
src/openrpc/tests.rs Normal file
View File

@@ -0,0 +1,230 @@
//! Tests for the new job API methods
#[cfg(test)]
mod job_api_tests {
use super::super::*;
use crate::supervisor::{Supervisor, SupervisorBuilder};
use crate::job::{Job, JobBuilder};
use std::sync::Arc;
use tokio::sync::Mutex;
use serde_json::json;
async fn create_test_supervisor() -> Arc<Mutex<Supervisor>> {
let supervisor = SupervisorBuilder::new()
.redis_url("redis://localhost:6379")
.namespace("test_job_api")
.build()
.await
.unwrap_or_else(|_| Supervisor::default());
let mut supervisor = supervisor;
supervisor.add_admin_secret("test-admin-secret".to_string());
supervisor.add_user_secret("test-user-secret".to_string());
Arc::new(Mutex::new(supervisor))
}
fn create_test_job() -> Job {
JobBuilder::new()
.id("test-job-123".to_string())
.caller_id("test-client".to_string())
.context_id("test-context".to_string())
.script("print('Hello World')".to_string())
.script_type(crate::job::ScriptType::Osis)
.timeout(30)
.build()
.unwrap()
}
#[tokio::test]
async fn test_jobs_create() {
let supervisor = create_test_supervisor().await;
let job = create_test_job();
let params = RunJobParams {
secret: "test-user-secret".to_string(),
job: job.clone(),
};
let result = supervisor.jobs_create(params).await;
assert!(result.is_ok());
let job_id = result.unwrap();
assert_eq!(job_id, job.id);
}
#[tokio::test]
async fn test_jobs_create_invalid_secret() {
let supervisor = create_test_supervisor().await;
let job = create_test_job();
let params = RunJobParams {
secret: "invalid-secret".to_string(),
job,
};
let result = supervisor.jobs_create(params).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_jobs_list() {
let supervisor = create_test_supervisor().await;
let result = supervisor.jobs_list().await;
// Should not error even if Redis is not available (will return empty list or error)
// The important thing is that the method signature works
assert!(result.is_ok() || result.is_err());
}
#[tokio::test]
async fn test_job_run_success_format() {
let supervisor = create_test_supervisor().await;
let job = create_test_job();
let params = RunJobParams {
secret: "test-user-secret".to_string(),
job,
};
let result = supervisor.job_run(params).await;
// The result should be a JobResult enum
match result {
Ok(JobResult::Success { success: _ }) => {
// Success case - job executed and returned output
},
Ok(JobResult::Error { error: _ }) => {
// Error case - job failed but method worked
},
Err(_) => {
// Method error (authentication, etc.)
// This is acceptable for testing without actual runners
}
}
}
#[tokio::test]
async fn test_job_start() {
let supervisor = create_test_supervisor().await;
let params = StartJobParams {
secret: "test-user-secret".to_string(),
job_id: "test-job-123".to_string(),
};
let result = supervisor.job_start(params).await;
// Should fail gracefully if job doesn't exist
assert!(result.is_err() || result.is_ok());
}
#[tokio::test]
async fn test_job_start_invalid_secret() {
let supervisor = create_test_supervisor().await;
let params = StartJobParams {
secret: "invalid-secret".to_string(),
job_id: "test-job-123".to_string(),
};
let result = supervisor.job_start(params).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_job_status() {
let supervisor = create_test_supervisor().await;
let result = supervisor.job_status("test-job-123".to_string()).await;
// Should return error for non-existent job
assert!(result.is_err());
}
#[tokio::test]
async fn test_job_result() {
let supervisor = create_test_supervisor().await;
let result = supervisor.job_result("test-job-123".to_string()).await;
// Should return error for non-existent job
assert!(result.is_err());
}
#[test]
fn test_job_result_enum_serialization() {
let success_result = JobResult::Success {
success: "Job completed successfully".to_string(),
};
let serialized = serde_json::to_string(&success_result).unwrap();
assert!(serialized.contains("success"));
assert!(serialized.contains("Job completed successfully"));
let error_result = JobResult::Error {
error: "Job failed with error".to_string(),
};
let serialized = serde_json::to_string(&error_result).unwrap();
assert!(serialized.contains("error"));
assert!(serialized.contains("Job failed with error"));
}
#[test]
fn test_job_status_response_serialization() {
let status_response = JobStatusResponse {
job_id: "test-job-123".to_string(),
status: "running".to_string(),
created_at: "2023-01-01T00:00:00Z".to_string(),
started_at: Some("2023-01-01T00:00:05Z".to_string()),
completed_at: None,
};
let serialized = serde_json::to_string(&status_response).unwrap();
assert!(serialized.contains("test-job-123"));
assert!(serialized.contains("running"));
assert!(serialized.contains("2023-01-01T00:00:00Z"));
assert!(serialized.contains("2023-01-01T00:00:05Z"));
let deserialized: JobStatusResponse = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized.job_id, "test-job-123");
assert_eq!(deserialized.status, "running");
assert_eq!(deserialized.started_at, Some("2023-01-01T00:00:05Z".to_string()));
assert_eq!(deserialized.completed_at, None);
}
#[test]
fn test_start_job_params_serialization() {
let params = StartJobParams {
secret: "test-secret".to_string(),
job_id: "job-123".to_string(),
};
let serialized = serde_json::to_string(&params).unwrap();
assert!(serialized.contains("test-secret"));
assert!(serialized.contains("job-123"));
let deserialized: StartJobParams = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized.secret, "test-secret");
assert_eq!(deserialized.job_id, "job-123");
}
#[test]
fn test_method_naming_convention() {
// Test that method names follow the jobs./job. convention
// These should be the actual method names in the trait
let jobs_methods = vec!["jobs.create", "jobs.list"];
let job_methods = vec!["job.run", "job.start", "job.status", "job.result"];
// Verify naming convention
for method in jobs_methods {
assert!(method.starts_with("jobs."));
}
for method in job_methods {
assert!(method.starts_with("job."));
}
}
}

View File

@@ -1,12 +1,7 @@
//! Runner implementation for actor process management.
use crate::job::{Job};
use log::{debug, info};
use redis::AsyncCommands;
use sal_service_manager::{ProcessManager, ProcessManagerError as ServiceProcessManagerError, ProcessStatus, ProcessConfig};
use sal_service_manager::{ProcessManagerError as ServiceProcessManagerError, ProcessStatus, ProcessConfig};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Mutex;
/// Represents the current status of an actor/runner (alias for ProcessStatus)
pub type RunnerStatus = ProcessStatus;
@@ -127,8 +122,17 @@ pub enum RunnerError {
#[from]
source: crate::JobError,
},
#[error("Job '{job_id}' not found")]
JobNotFound { job_id: String },
#[error("Authentication error: {message}")]
AuthenticationError { message: String },
}
// Type alias for backward compatibility
pub type RunnerConfig = Runner;
/// Convert Runner to ProcessConfig
pub fn runner_to_process_config(config: &Runner) -> ProcessConfig {
ProcessConfig::new(config.id.clone(), config.command.clone())
@@ -137,98 +141,3 @@ pub fn runner_to_process_config(config: &Runner) -> ProcessConfig {
.with_arg("--redis-url".to_string())
.with_arg(config.redis_url.clone())
}
// Type alias for backward compatibility
pub type RunnerConfig = Runner;
#[cfg(test)]
mod tests {
use super::*;
use sal_service_manager::{ProcessManagerError, SimpleProcessManager};
use std::collections::HashMap;
#[derive(Debug)]
struct MockProcessManager {
processes: HashMap<String, ProcessStatus>,
}
impl MockProcessManager {
fn new() -> Self {
Self {
processes: HashMap::new(),
}
}
}
#[async_trait::async_trait]
impl ProcessManager for MockProcessManager {
async fn start_process(&mut self, config: &ProcessConfig) -> Result<(), ProcessManagerError> {
self.processes.insert(config.id.clone(), ProcessStatus::Running);
Ok(())
}
async fn stop_process(&mut self, process_id: &str, _force: bool) -> Result<(), ProcessManagerError> {
self.processes.insert(process_id.to_string(), ProcessStatus::Stopped);
Ok(())
}
async fn process_status(&self, process_id: &str) -> Result<ProcessStatus, ProcessManagerError> {
Ok(self.processes.get(process_id).cloned().unwrap_or(ProcessStatus::Stopped))
}
async fn process_logs(&self, _process_id: &str, _lines: Option<usize>, _follow: bool) -> Result<Vec<LogInfo>, ProcessManagerError> {
Ok(vec![])
}
async fn health_check(&self) -> Result<(), ProcessManagerError> {
Ok(())
}
async fn list_processes(&self) -> Result<Vec<String>, ProcessManagerError> {
Ok(self.processes.keys().cloned().collect())
}
}
#[test]
fn test_runner_creation() {
let runner = Runner::new(
"test_actor".to_string(),
"test_runner".to_string(),
"".to_string(),
PathBuf::from("/path/to/binary"),
"redis://localhost:6379".to_string(),
);
assert_eq!(runner.id, "test_actor");
assert_eq!(runner.name, "test_runner");
assert_eq!(runner.command, PathBuf::from("/path/to/binary"));
assert_eq!(runner.redis_url, "redis://localhost:6379");
}
#[test]
fn test_runner_get_queue() {
let runner = Runner::new(
"test_actor".to_string(),
"test_runner".to_string(),
"".to_string(),
PathBuf::from("/path/to/binary"),
"redis://localhost:6379".to_string(),
);
let queue_key = runner.get_queue();
assert_eq!(queue_key, "runner:test_runner");
}
#[test]
fn test_runner_error_types() {
let error = RunnerError::ActorNotFound {
actor_id: "test".to_string(),
};
assert!(error.to_string().contains("test"));
let error = RunnerError::ActorAlreadyRunning {
actor_id: "test".to_string(),
};
assert!(error.to_string().contains("already running"));
}
}

View File

@@ -1,17 +1,13 @@
//! Main supervisor implementation for managing multiple actor runners.
use chrono::Utc;
use sal_service_manager::{ProcessManager, SimpleProcessManager, TmuxProcessManager};
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::{client::{Client, ClientBuilder}, job::JobStatus, runner::{LogInfo, Runner, RunnerConfig, RunnerError, RunnerResult, RunnerStatus}, JobError};
use crate::{job::Job};
use crate::{client::{Client, ClientBuilder}, job::JobStatus, runner::{LogInfo, Runner, RunnerConfig, RunnerError, RunnerResult, RunnerStatus}};
#[cfg(feature = "admin")]
use supervisor_admin_server::{AdminSupervisor, RunnerConfigInfo, JobInfo};
/// Process manager type for a runner
#[derive(Debug, Clone)]
@@ -201,7 +197,7 @@ impl Supervisor {
}
/// Register a new runner with secret-based authentication
pub async fn register_runner(&mut self, secret: &str, name: &str, queue: &str) -> RunnerResult<()> {
pub async fn register_runner(&mut self, secret: &str, name: &str, _queue: &str) -> RunnerResult<()> {
// Check if the secret is valid (admin or register secret)
if !self.admin_secrets.contains(&secret.to_string()) &&
!self.register_secrets.contains(&secret.to_string()) {
@@ -230,15 +226,15 @@ impl Supervisor {
}
// Find the runner by name
let runner_name = job.runner_name.clone();
let runner = job.runner.clone();
let job_id = job.id.clone(); // Store job ID before moving job
if let Some(_runner) = self.runners.get(&runner_name) {
if let Some(_runner) = self.runners.get(&runner) {
// Use the supervisor's queue_job_to_runner method (fire-and-forget)
self.queue_job_to_runner(&runner_name, job).await?;
self.queue_job_to_runner(&runner, job).await?;
Ok(job_id) // Return the job ID immediately
} else {
Err(RunnerError::ActorNotFound {
actor_id: job.runner_name.clone(),
actor_id: job.runner.clone(),
})
}
}
@@ -253,13 +249,13 @@ impl Supervisor {
}
// Find the runner by name
let runner_name = job.runner_name.clone();
if let Some(_runner) = self.runners.get(&runner_name) {
let runner = job.runner.clone();
if let Some(_runner) = self.runners.get(&runner) {
// Use the synchronous queue_and_wait method with a reasonable timeout (30 seconds)
self.queue_and_wait(&runner_name, job, 30).await
self.queue_and_wait(&runner, job, 30).await
} else {
Err(RunnerError::ActorNotFound {
actor_id: job.runner_name.clone(),
actor_id: job.runner.clone(),
})
}
}
@@ -280,9 +276,7 @@ impl Supervisor {
/// Get a job by job ID from Redis
pub async fn get_job(&self, job_id: &str) -> RunnerResult<crate::job::Job> {
use redis::AsyncCommands;
let mut conn = self.redis_client.get_multiplexed_async_connection().await
let _conn = self.redis_client.get_multiplexed_async_connection().await
.map_err(|e| RunnerError::RedisError {
source: e
})?;
@@ -296,9 +290,8 @@ impl Supervisor {
/// Ping a runner by dispatching a ping job to its queue
pub async fn ping_runner(&mut self, runner_id: &str) -> RunnerResult<String> {
use crate::job::{Job, JobBuilder};
use std::time::Duration;
use crate::job::JobBuilder;
// Check if runner exists
if !self.runners.contains_key(runner_id) {
return Err(RunnerError::ActorNotFound {
@@ -311,9 +304,9 @@ impl Supervisor {
.caller_id("supervisor_ping")
.context_id("ping_context")
.payload("ping")
.runner_name(runner_id)
.runner(runner_id)
.executor("ping")
.timeout(Duration::from_secs(10))
.timeout(10)
.build()
.map_err(|e| RunnerError::QueueError {
actor_id: runner_id.to_string(),
@@ -329,17 +322,15 @@ impl Supervisor {
/// Stop a job by ID
pub async fn stop_job(&mut self, job_id: &str) -> RunnerResult<()> {
use redis::AsyncCommands;
// For now, we'll implement a basic stop by removing the job from Redis
// In a more sophisticated implementation, you might send a stop signal to the runner
let mut conn = self.redis_client.get_multiplexed_async_connection().await
let _conn = self.redis_client.get_multiplexed_async_connection().await
.map_err(|e| RunnerError::QueueError {
actor_id: job_id.to_string(),
reason: format!("Failed to connect to Redis: {}", e),
})?;
let job_key = self.client.set_job_status(job_id, JobStatus::Stopping).await;
let _job_key = self.client.set_job_status(job_id, JobStatus::Stopping).await;
Ok(())
}
@@ -434,11 +425,11 @@ impl Supervisor {
}
/// Queue a job to a specific runner by name
pub async fn queue_job_to_runner(&mut self, runner_name: &str, job: crate::job::Job) -> RunnerResult<()> {
pub async fn queue_job_to_runner(&mut self, runner: &str, job: crate::job::Job) -> RunnerResult<()> {
use redis::AsyncCommands;
use log::{debug, info};
if let Some(runner) = self.runners.get(runner_name) {
if let Some(runner) = self.runners.get(runner) {
debug!("Queuing job {} for actor {}", job.id, runner.id);
let mut conn = self.redis_client.get_multiplexed_async_connection().await
@@ -467,7 +458,7 @@ impl Supervisor {
Ok(())
} else {
Err(RunnerError::ActorNotFound {
actor_id: runner_name.to_string(),
actor_id: runner.to_string(),
})
}
}
@@ -478,18 +469,18 @@ impl Supervisor {
/// 2. BLPOP on the reply queue for this job
/// 3. Get the job result from the job hash
/// 4. Return the complete result
pub async fn queue_and_wait(&mut self, runner_name: &str, job: crate::job::Job, timeout_secs: u64) -> RunnerResult<Option<String>> {
pub async fn queue_and_wait(&mut self, runner: &str, job: crate::job::Job, timeout_secs: u64) -> RunnerResult<Option<String>> {
use redis::AsyncCommands;
let job_id = job.id.clone();
// First queue the job
self.queue_job_to_runner(runner_name, job).await?;
self.queue_job_to_runner(runner, job).await?;
// Get Redis connection from the supervisor (shared Redis client)
let _runner = self.runners.get(runner_name)
let _runner = self.runners.get(runner)
.ok_or_else(|| RunnerError::ActorNotFound {
actor_id: runner_name.to_string(),
actor_id: runner.to_string(),
})?;
let mut conn = self.redis_client.get_multiplexed_async_connection().await
@@ -505,7 +496,7 @@ impl Supervisor {
})?;
match result {
Some(reply_data) => {
Some(_reply_data) => {
// Reply received, now get the job result from the job hash
let job_key = self.client.job_key(&job_id);
let job_result: Option<String> = conn.hget(&job_key, "result").await
@@ -526,7 +517,7 @@ impl Supervisor {
pub async fn get_all_runner_status(&self) -> RunnerResult<Vec<(String, RunnerStatus)>> {
let mut results = Vec::new();
for (actor_id, instance) in &self.runners {
for (actor_id, _instance) in &self.runners {
match self.get_runner_status(actor_id).await {
Ok(status) => results.push((actor_id.clone(), status)),
Err(_) => {
@@ -663,6 +654,107 @@ impl Supervisor {
self.client.list_jobs().await
}
/// List all jobs with full details from Redis
pub async fn list_all_jobs(&self) -> RunnerResult<Vec<crate::job::Job>> {
let job_ids = self.client.list_jobs().await?;
let mut jobs = Vec::new();
for job_id in job_ids {
if let Ok(job) = self.client.get_job(&job_id).await {
jobs.push(job);
}
}
Ok(jobs)
}
/// Start a previously created job by queuing it to its assigned runner
pub async fn start_job(&mut self, secret: &str, job_id: &str) -> RunnerResult<()> {
// Check if the secret is valid (admin or user secret)
if !self.admin_secrets.contains(&secret.to_string()) &&
!self.user_secrets.contains(&secret.to_string()) {
return Err(RunnerError::AuthenticationError {
message: "Invalid secret for job operations".to_string()
});
}
// Get the job from Redis
let job = self.get_job(job_id).await?;
let runner = job.runner.clone();
// Queue the job to its assigned runner
self.queue_job_to_runner(&runner, job).await
}
/// Get the status of a job
pub async fn get_job_status(&self, job_id: &str) -> RunnerResult<crate::openrpc::JobStatusResponse> {
use redis::AsyncCommands;
let mut conn = self.redis_client.get_multiplexed_async_connection().await
.map_err(|e| RunnerError::RedisError { source: e })?;
// Get job data from Redis hash
let job_data: std::collections::HashMap<String, String> = conn.hgetall(format!("{}:job:{}", self.namespace, job_id)).await
.map_err(|e| RunnerError::RedisError { source: e })?;
if job_data.is_empty() {
return Err(RunnerError::JobNotFound { job_id: job_id.to_string() });
}
let status = job_data.get("status").unwrap_or(&"unknown".to_string()).clone();
let created_at = job_data.get("created_at").unwrap_or(&"".to_string()).clone();
let started_at = job_data.get("started_at").cloned();
let completed_at = job_data.get("completed_at").cloned();
Ok(crate::openrpc::JobStatusResponse {
job_id: job_id.to_string(),
status,
created_at,
started_at,
completed_at,
})
}
/// Get the result of a job (blocks until result is available)
pub async fn get_job_result(&self, job_id: &str) -> RunnerResult<Option<String>> {
use redis::AsyncCommands;
use tokio::time::{sleep, Duration};
let mut conn = self.redis_client.get_multiplexed_async_connection().await
.map_err(|e| RunnerError::RedisError { source: e })?;
// Poll for job completion with timeout
for _ in 0..300 { // 5 minutes timeout (300 * 1 second)
let job_data: std::collections::HashMap<String, String> = conn.hgetall(format!("{}:job:{}", self.namespace, job_id)).await
.map_err(|e| RunnerError::RedisError { source: e })?;
if job_data.is_empty() {
return Err(RunnerError::JobNotFound { job_id: job_id.to_string() });
}
let status_str = "unknown".to_string();
let status = job_data.get("status").unwrap_or(&status_str);
match status.as_str() {
"completed" => {
return Ok(job_data.get("result").cloned());
},
"failed" | "timeout" => {
let default_error = "Job failed".to_string();
let error_msg = job_data.get("error").unwrap_or(&default_error).clone();
return Ok(Some(format!("Error: {}", error_msg)));
},
_ => {
// Job still running, wait and check again
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
}
// Timeout reached
Ok(Some("Error: Timeout waiting for job result".to_string()))
}
/// Get runners count
pub fn runners_count(&self) -> usize {
self.runners.len()
@@ -702,9 +794,8 @@ impl Default for Supervisor {
}
mod tests {
#[allow(unused_imports)]
use super::*;
use std::path::PathBuf;
use sal_service_manager::SimpleProcessManager;
#[tokio::test]
async fn test_supervisor_creation() {