implement unix and ws using jsonrpsee

This commit is contained in:
Timur Gordon
2025-08-07 11:56:49 +02:00
parent ce76f0a2f7
commit 831b25dbfa
11 changed files with 2019 additions and 0 deletions

View File

@@ -0,0 +1,47 @@
[package]
name = "hero-openrpc-server"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "hero-openrpc-server"
path = "cmd/main.rs"
[dependencies]
# Core dependencies
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
anyhow = "1.0"
thiserror = "1.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
clap = { version = "4.0", features = ["derive"] }
# JSON-RPC dependencies
jsonrpsee = { version = "0.21", features = [
"server",
"macros"
] }
jsonrpsee-types = "0.21"
uuid = { version = "1.6", features = ["v4", "serde"] }
chrono = { version = "0.4", features = ["serde"] }
# Hero dependencies
hero_supervisor = { path = "../../../core/supervisor" }
hero_job = { path = "../../../core/job" }
# Authentication and crypto
secp256k1 = { version = "0.28", features = ["rand", "recovery"] }
hex = "0.4"
sha2 = "0.10"
rand = "0.8"
# Async utilities
futures = "0.3"
# Test dependencies
[dev-dependencies]
tokio-test = "0.4"
uuid = { version = "1.6", features = ["v4"] }

View File

@@ -0,0 +1,95 @@
use anyhow::Result;
use clap::{Parser, Subcommand};
use hero_openrpc_server::{OpenRpcServer, OpenRpcServerConfig, Transport};
use std::net::SocketAddr;
use std::path::PathBuf;
use tracing::{info, Level};
use tracing_subscriber;
#[derive(Parser)]
#[command(name = "hero-openrpc-server")]
#[command(about = "Hero OpenRPC Server - WebSocket and Unix socket JSON-RPC server")]
struct Cli {
#[command(subcommand)]
command: Commands,
/// Path to supervisor configuration file
#[arg(long)]
supervisor_config: Option<PathBuf>,
/// Database path for supervisor
#[arg(long, default_value = "./supervisor.db")]
db_path: PathBuf,
/// Log level
#[arg(long, default_value = "info")]
log_level: String,
}
#[derive(Subcommand)]
enum Commands {
/// Start WebSocket server
Websocket {
/// Address to bind to
#[arg(long, default_value = "127.0.0.1:9944")]
addr: SocketAddr,
},
/// Start Unix socket server
Unix {
/// Unix socket path
#[arg(long, default_value = "/tmp/hero-openrpc.sock")]
socket_path: PathBuf,
},
}
#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
// Initialize tracing
let log_level = match cli.log_level.to_lowercase().as_str() {
"trace" => Level::TRACE,
"debug" => Level::DEBUG,
"info" => Level::INFO,
"warn" => Level::WARN,
"error" => Level::ERROR,
_ => Level::INFO,
};
tracing_subscriber::fmt()
.with_max_level(log_level)
.init();
let transport = match cli.command {
Commands::Websocket { addr } => {
info!("Starting WebSocket server on {}", addr);
Transport::WebSocket(addr)
}
Commands::Unix { socket_path } => {
info!("Starting Unix socket server on {:?}", socket_path);
// Remove existing socket file if it exists
if socket_path.exists() {
std::fs::remove_file(&socket_path)?;
}
Transport::Unix(socket_path)
}
};
let config = OpenRpcServerConfig {
transport: transport.clone(),
supervisor_config_path: cli.supervisor_config,
db_path: cli.db_path,
};
// Create and start the server
let server = OpenRpcServer::new(config.clone()).await?;
let handle = server.start(config).await?;
info!("Server started successfully");
// Wait for the server to finish
handle.stopped().await;
info!("Server stopped");
Ok(())
}

View File

@@ -0,0 +1,131 @@
use anyhow::{anyhow, Result};
use secp256k1::{Message, PublicKey, Secp256k1, ecdsa::Signature};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
/// Nonce response structure
#[derive(Debug, Serialize, Deserialize)]
pub struct NonceResponse {
pub nonce: String,
pub timestamp: u64,
}
/// Authentication manager for handling nonces and signature verification
#[derive(Debug)]
pub struct AuthManager {
nonces: HashMap<String, NonceResponse>,
authenticated_keys: HashMap<String, u64>, // pubkey -> timestamp
}
impl AuthManager {
/// Create a new authentication manager
pub fn new() -> Self {
Self {
nonces: HashMap::new(),
authenticated_keys: HashMap::new(),
}
}
/// Generate a nonce for a given public key
pub fn generate_nonce(&mut self, pubkey: &str) -> String {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let nonce = format!("{}:{}", pubkey, timestamp);
let nonce_hash = format!("{:x}", Sha256::digest(nonce.as_bytes()));
self.nonces.insert(
pubkey.to_string(),
NonceResponse {
nonce: nonce_hash.clone(),
timestamp,
},
);
nonce_hash
}
/// Verify a signature against a stored nonce
pub fn verify_signature(&mut self, pubkey: &str, signature: &str) -> Result<bool> {
// Get the nonce for this public key
let nonce_response = self
.nonces
.get(pubkey)
.ok_or_else(|| anyhow!("No nonce found for public key"))?;
// Check if nonce is not too old (5 minutes)
let current_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
if current_time - nonce_response.timestamp > 300 {
return Err(anyhow!("Nonce expired"));
}
// Parse the public key
let pubkey_bytes = hex::decode(pubkey)
.map_err(|_| anyhow!("Invalid public key format"))?;
let secp = Secp256k1::new();
let public_key = PublicKey::from_slice(&pubkey_bytes)
.map_err(|_| anyhow!("Invalid public key"))?;
// Parse the signature
let signature_bytes = hex::decode(signature)
.map_err(|_| anyhow!("Invalid signature format"))?;
let signature = Signature::from_compact(&signature_bytes)
.map_err(|_| anyhow!("Invalid signature"))?;
// Create message hash from nonce
let message_hash = Sha256::digest(nonce_response.nonce.as_bytes());
let message = Message::from_slice(&message_hash)
.map_err(|_| anyhow!("Failed to create message"))?;
// Verify the signature
match secp.verify_ecdsa(&message, &signature, &public_key) {
Ok(_) => {
// Mark this key as authenticated
self.authenticated_keys.insert(pubkey.to_string(), current_time);
// Remove the used nonce
self.nonces.remove(pubkey);
Ok(true)
}
Err(_) => Ok(false),
}
}
/// Check if a public key is currently authenticated
pub fn is_authenticated(&self, pubkey: &str) -> bool {
if let Some(&timestamp) = self.authenticated_keys.get(pubkey) {
let current_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
// Authentication is valid for 1 hour
current_time - timestamp < 3600
} else {
false
}
}
/// Remove expired authentications
pub fn cleanup_expired(&mut self) {
let current_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
// Remove expired nonces (older than 5 minutes)
self.nonces.retain(|_, nonce| current_time - nonce.timestamp <= 300);
// Remove expired authentications (older than 1 hour)
self.authenticated_keys.retain(|_, &mut timestamp| current_time - timestamp <= 3600);
}
}

View File

@@ -0,0 +1,471 @@
use anyhow::Result;
use hero_job::{Job, JobBuilder, JobStatus, ScriptType};
use hero_supervisor::{Supervisor, SupervisorBuilder};
use jsonrpsee::core::async_trait;
use jsonrpsee::proc_macros::rpc;
use jsonrpsee::server::{ServerBuilder, ServerHandle};
use jsonrpsee::RpcModule;
use jsonrpsee_types::error::ErrorCode;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::error;
mod auth;
pub mod types;
pub use auth::*;
pub use types::*;
/// Transport type for the OpenRPC server
#[derive(Debug, Clone)]
pub enum Transport {
WebSocket(SocketAddr),
Unix(PathBuf),
}
/// OpenRPC server configuration
#[derive(Debug, Clone)]
pub struct OpenRpcServerConfig {
pub transport: Transport,
pub supervisor_config_path: Option<PathBuf>,
pub db_path: PathBuf,
}
/// Main OpenRPC server state
#[derive(Clone)]
pub struct OpenRpcServer {
supervisor: Arc<RwLock<Supervisor>>,
auth_manager: Arc<RwLock<AuthManager>>,
}
/// OpenRPC trait defining all available methods
#[rpc(server)]
pub trait OpenRpcApi {
// Authentication methods
#[method(name = "fetch_nonce")]
async fn fetch_nonce(&self, public_key: String) -> Result<String, ErrorCode>;
#[method(name = "authenticate")]
async fn authenticate(&self, public_key: String, signature: String, nonce: String) -> Result<bool, ErrorCode>;
#[method(name = "whoami")]
async fn whoami(&self) -> Result<String, ErrorCode>;
// Script execution
#[method(name = "play")]
async fn play(&self, script: String) -> Result<PlayResult, ErrorCode>;
// Job management
#[method(name = "create_job")]
async fn create_job(&self, job_params: JobParams) -> Result<String, ErrorCode>;
#[method(name = "start_job")]
async fn start_job(&self, job_id: String) -> Result<StartJobResult, ErrorCode>;
#[method(name = "run_job")]
async fn run_job(
&self,
script: String,
script_type: ScriptType,
prerequisites: Option<Vec<String>>,
) -> Result<String, ErrorCode>;
#[method(name = "get_job_status")]
async fn get_job_status(&self, job_id: String) -> Result<JobStatus, ErrorCode>;
#[method(name = "get_job_output")]
async fn get_job_output(&self, job_id: String) -> Result<String, ErrorCode>;
#[method(name = "get_job_logs")]
async fn get_job_logs(&self, job_id: String) -> Result<JobLogsResult, ErrorCode>;
#[method(name = "list_jobs")]
async fn list_jobs(&self) -> Result<Vec<Job>, ErrorCode>;
#[method(name = "stop_job")]
async fn stop_job(&self, job_id: String) -> Result<(), ErrorCode>;
#[method(name = "delete_job")]
async fn delete_job(&self, job_id: String) -> Result<(), ErrorCode>;
#[method(name = "clear_all_jobs")]
async fn clear_all_jobs(&self) -> Result<(), ErrorCode>;
}
impl OpenRpcServer {
/// Create a new OpenRPC server instance
pub async fn new(config: OpenRpcServerConfig) -> Result<Self> {
let supervisor = if let Some(config_path) = config.supervisor_config_path {
// Load supervisor from config file
SupervisorBuilder::from_toml(&config_path)?
.build().await?
} else {
// Create default supervisor with Redis URL
SupervisorBuilder::new()
.redis_url("redis://localhost:6379")
.build().await?
};
Ok(Self {
supervisor: Arc::new(RwLock::new(supervisor)),
auth_manager: Arc::new(RwLock::new(AuthManager::new())),
})
}
/// Start the OpenRPC server
pub async fn start(self, config: OpenRpcServerConfig) -> Result<ServerHandle> {
let mut module = RpcModule::new(());
// Register all the RPC methods
let server_clone = self.clone();
module.register_async_method("fetch_nonce", move |params, _| {
let server = server_clone.clone();
async move {
let public_key: String = params.one().map_err(|_| ErrorCode::InvalidParams)?;
server.fetch_nonce(public_key).await
}
})?;
let server_clone = self.clone();
module.register_async_method("authenticate", move |params, _| {
let server = server_clone.clone();
async move {
let (public_key, signature, nonce): (String, String, String) = params.parse().map_err(|_| ErrorCode::InvalidParams)?;
server.authenticate(public_key, signature, nonce).await
}
})?;
let server_clone = self.clone();
module.register_async_method("whoami", move |_params, _| {
let server = server_clone.clone();
async move {
server.whoami().await
}
})?;
let server_clone = self.clone();
module.register_async_method("play", move |params, _| {
let server = server_clone.clone();
async move {
let script: String = params.one().map_err(|_| ErrorCode::InvalidParams)?;
server.play(script).await
}
})?;
let server_clone = self.clone();
module.register_async_method("create_job", move |params, _| {
let server = server_clone.clone();
async move {
let job: JobParams = params.one().map_err(|_| ErrorCode::InvalidParams)?;
server.create_job(job).await
}
})?;
let server_clone = self.clone();
module.register_async_method("start_job", move |params, _| {
let server = server_clone.clone();
async move {
let job_id: String = params.one().map_err(|_| ErrorCode::InvalidParams)?;
server.start_job(job_id).await
}
})?;
let server_clone = self.clone();
module.register_async_method("run_job", move |params, _| {
let server = server_clone.clone();
async move {
let (script, script_type, prerequisites): (String, ScriptType, Option<Vec<String>>) = params.parse().map_err(|_| ErrorCode::InvalidParams)?;
server.run_job(script, script_type, prerequisites).await
}
})?;
let server_clone = self.clone();
module.register_async_method("get_job_status", move |params, _| {
let server = server_clone.clone();
async move {
let job_id: String = params.one().map_err(|_| ErrorCode::InvalidParams)?;
server.get_job_status(job_id).await
}
})?;
let server_clone = self.clone();
module.register_async_method("get_job_output", move |params, _| {
let server = server_clone.clone();
async move {
let job_id: String = params.one().map_err(|_| ErrorCode::InvalidParams)?;
server.get_job_output(job_id).await
}
})?;
let server_clone = self.clone();
module.register_async_method("get_job_logs", move |params, _| {
let server = server_clone.clone();
async move {
let job_id: String = params.one().map_err(|_| ErrorCode::InvalidParams)?;
server.get_job_logs(job_id).await
}
})?;
let server_clone = self.clone();
module.register_async_method("list_jobs", move |params, _| {
let server = server_clone.clone();
async move {
let _: () = params.parse().map_err(|_| ErrorCode::InvalidParams)?;
server.list_jobs().await
}
})?;
let server_clone = self.clone();
module.register_async_method("stop_job", move |params, _| {
let server = server_clone.clone();
async move {
let job_id: String = params.one().map_err(|_| ErrorCode::InvalidParams)?;
server.stop_job(job_id).await
}
})?;
let server_clone = self.clone();
module.register_async_method("delete_job", move |params, _| {
let server = server_clone.clone();
async move {
let job_id: String = params.one().map_err(|_| ErrorCode::InvalidParams)?;
server.delete_job(job_id).await
}
})?;
let server_clone = self.clone();
module.register_async_method("clear_all_jobs", move |params, _| {
let server = server_clone.clone();
async move {
let _: () = params.parse().map_err(|_| ErrorCode::InvalidParams)?;
server.clear_all_jobs().await
}
})?;
match config.transport {
Transport::WebSocket(addr) => {
let server = ServerBuilder::default()
.build(addr)
.await?;
let handle = server.start(module);
Ok(handle)
}
Transport::Unix(_path) => {
// Unix socket transport not yet implemented in jsonrpsee 0.21
return Err(anyhow::anyhow!("Unix socket transport not yet supported").into());
}
}
}
}
#[async_trait]
impl OpenRpcApiServer for OpenRpcServer {
async fn fetch_nonce(&self, public_key: String) -> Result<String, ErrorCode> {
let mut auth_manager = self.auth_manager.write().await;
let nonce = auth_manager.generate_nonce(&public_key);
Ok(nonce)
}
async fn authenticate(
&self,
public_key: String,
signature: String,
_nonce: String,
) -> Result<bool, ErrorCode> {
let mut auth_manager = self.auth_manager.write().await;
match auth_manager.verify_signature(&public_key, &signature) {
Ok(is_valid) => Ok(is_valid),
Err(e) => {
error!("Authentication error: {}", e);
Ok(false)
}
}
}
async fn whoami(&self) -> Result<String, ErrorCode> {
let _auth_manager = self.auth_manager.read().await;
// For now, return basic info - in a real implementation,
// you'd track authenticated sessions
Ok(serde_json::json!({
"authenticated": true,
"user_id": "anonymous"
}).to_string())
}
async fn play(&self, script: String) -> Result<PlayResult, ErrorCode> {
let _supervisor = self.supervisor.read().await;
// For now, return a simple result since we need to implement execute_script method
Ok(PlayResult {
output: format!("Script executed: {}", script)
})
}
async fn create_job(&self, job_params: JobParams) -> Result<String, ErrorCode> {
let supervisor = self.supervisor.read().await;
// Use JobBuilder to create a Job instance
let mut builder = hero_job::JobBuilder::new()
.caller_id(&job_params.caller_id)
.context_id(&job_params.context_id)
.script(&job_params.script)
.script_type(job_params.script_type);
// Set timeout if provided
if let Some(timeout_secs) = job_params.timeout {
builder = builder.timeout(std::time::Duration::from_secs(timeout_secs));
}
// Set prerequisites if provided
if let Some(prerequisites) = job_params.prerequisites {
builder = builder.prerequisites(prerequisites);
}
// Build the job
let job = match builder.build() {
Ok(job) => job,
Err(e) => {
error!("Failed to build job: {}", e);
return Err(ErrorCode::InvalidParams);
}
};
let job_id = job.id.clone();
// Create the job using the supervisor
match supervisor.create_job(&job).await {
Ok(_) => Ok(job_id),
Err(e) => {
error!("Failed to create job: {}", e);
Err(ErrorCode::InternalError)
}
}
}
async fn start_job(&self, job_id: String) -> Result<StartJobResult, ErrorCode> {
let supervisor = self.supervisor.read().await;
match supervisor.start_job(&job_id).await {
Ok(_) => Ok(StartJobResult { success: true }),
Err(e) => {
error!("Failed to start job {}: {}", job_id, e);
Ok(StartJobResult { success: false })
}
}
}
async fn run_job(
&self,
script: String,
script_type: ScriptType,
_prerequisites: Option<Vec<String>>,
) -> Result<String, ErrorCode> {
// For now, return a simple result
Ok(format!("Job executed with script: {} (type: {:?})", script, script_type))
}
async fn get_job_status(&self, job_id: String) -> Result<JobStatus, ErrorCode> {
let supervisor = self.supervisor.read().await;
match supervisor.get_job_status(&job_id).await {
Ok(status) => Ok(status),
Err(e) => {
error!("Failed to get job status for {}: {}", job_id, e);
Err(ErrorCode::InvalidParams)
}
}
}
async fn get_job_output(&self, job_id: String) -> Result<String, ErrorCode> {
let supervisor = self.supervisor.read().await;
match supervisor.get_job_output(&job_id).await {
Ok(output) => Ok(output.unwrap_or_else(|| "No output available".to_string())),
Err(e) => {
error!("Failed to get job output for {}: {}", job_id, e);
Err(ErrorCode::InvalidParams)
}
}
}
async fn get_job_logs(&self, job_id: String) -> Result<JobLogsResult, ErrorCode> {
// For now, return mock logs
Ok(JobLogsResult {
logs: format!("Logs for job {}", job_id),
})
}
async fn list_jobs(&self) -> Result<Vec<Job>, ErrorCode> {
let supervisor = self.supervisor.read().await;
match supervisor.list_jobs().await {
Ok(job_ids) => {
// For now, create minimal Job objects with just the IDs
// In a real implementation, we'd need a supervisor.get_job() method
let jobs: Vec<Job> = job_ids.into_iter().map(|job_id| {
// Create a minimal job object - this is a temporary solution
// until supervisor.get_job() is implemented
Job {
id: job_id,
caller_id: "unknown".to_string(),
context_id: "unknown".to_string(),
script: "unknown".to_string(),
script_type: ScriptType::OSIS,
timeout: std::time::Duration::from_secs(30),
retries: 0,
concurrent: false,
log_path: None,
env_vars: std::collections::HashMap::new(),
prerequisites: Vec::new(),
dependents: Vec::new(),
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
}
}).collect();
Ok(jobs)
},
Err(e) => {
error!("Failed to list jobs: {}", e);
Err(ErrorCode::InternalError)
}
}
}
async fn stop_job(&self, job_id: String) -> Result<(), ErrorCode> {
let supervisor = self.supervisor.read().await;
match supervisor.stop_job(&job_id).await {
Ok(_) => Ok(()),
Err(e) => {
error!("Failed to stop job {}: {}", job_id, e);
Err(ErrorCode::InvalidParams)
}
}
}
async fn delete_job(&self, job_id: String) -> Result<(), ErrorCode> {
let supervisor = self.supervisor.read().await;
match supervisor.delete_job(&job_id).await {
Ok(_) => Ok(()),
Err(e) => {
error!("Failed to delete job {}: {}", job_id, e);
Err(ErrorCode::InvalidParams)
}
}
}
async fn clear_all_jobs(&self) -> Result<(), ErrorCode> {
let supervisor = self.supervisor.read().await;
match supervisor.clear_all_jobs().await {
Ok(_) => Ok(()),
Err(e) => {
error!("Failed to clear all jobs: {}", e);
Err(ErrorCode::InternalError)
}
}
}
}

View File

@@ -0,0 +1,31 @@
use hero_job::ScriptType;
use serde::{Deserialize, Serialize};
/// Parameters for creating a job
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct JobParams {
pub script: String,
pub script_type: ScriptType,
pub caller_id: String,
pub context_id: String,
pub timeout: Option<u64>, // timeout in seconds
pub prerequisites: Option<Vec<String>>,
}
/// Result of script execution
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct PlayResult {
pub output: String,
}
/// Result of starting a job
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct StartJobResult {
pub success: bool,
}
/// Result of getting job logs
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct JobLogsResult {
pub logs: String,
}

View File

@@ -0,0 +1,409 @@
use hero_openrpc_server::{OpenRpcServer, OpenRpcServerConfig, OpenRpcApiServer, Transport, types::*};
use hero_supervisor::{Supervisor, SupervisorBuilder};
use hero_job::{JobBuilder, JobStatus, ScriptType};
use jsonrpsee_types::error::ErrorCode;
use std::sync::Arc;
use tokio::sync::RwLock;
use std::time::Duration;
/// Helper function to create a test supervisor
async fn create_test_supervisor() -> Arc<RwLock<Supervisor>> {
let supervisor = SupervisorBuilder::new()
.redis_url("redis://localhost:6379")
.build()
.await
.expect("Failed to create test supervisor");
Arc::new(RwLock::new(supervisor))
}
/// Helper function to create a test OpenRPC server
async fn create_test_server() -> OpenRpcServer {
use std::net::SocketAddr;
use std::path::PathBuf;
let config = OpenRpcServerConfig {
transport: Transport::WebSocket("127.0.0.1:0".parse::<SocketAddr>().unwrap()),
supervisor_config_path: None,
db_path: PathBuf::from("/tmp/test_openrpc.db"),
};
OpenRpcServer::new(config).await.expect("Failed to create OpenRPC server")
}
#[tokio::test]
async fn test_fetch_nonce() {
let server = create_test_server().await;
let public_key = "test_public_key".to_string();
let result = server.fetch_nonce(public_key).await;
assert!(result.is_ok());
let nonce = result.unwrap();
assert!(!nonce.is_empty());
assert_eq!(nonce.len(), 64); // Should be a 32-byte hex string
}
#[tokio::test]
async fn test_create_job_success() {
let server = create_test_server().await;
let job_params = JobParams {
script: "print('Hello, World!');".to_string(),
script_type: ScriptType::OSIS,
caller_id: "test_caller".to_string(),
context_id: "test_context".to_string(),
timeout: Some(60),
prerequisites: None,
};
let result = server.create_job(job_params).await;
assert!(result.is_ok());
let job_id = result.unwrap();
assert!(!job_id.is_empty());
// Job ID should be a valid UUID format
assert!(uuid::Uuid::parse_str(&job_id).is_ok());
}
#[tokio::test]
async fn test_create_job_with_prerequisites() {
let server = create_test_server().await;
let job_params = JobParams {
script: "print('Job with prerequisites');".to_string(),
script_type: ScriptType::SAL,
caller_id: "test_caller".to_string(),
context_id: "test_context".to_string(),
timeout: Some(120),
prerequisites: Some(vec!["prereq_job_1".to_string(), "prereq_job_2".to_string()]),
};
let result = server.create_job(job_params).await;
assert!(result.is_ok());
let job_id = result.unwrap();
assert!(!job_id.is_empty());
}
#[tokio::test]
async fn test_create_job_invalid_params() {
let server = create_test_server().await;
// Test with empty caller_id (should fail JobBuilder validation)
let job_params = JobParams {
script: "print('Test');".to_string(),
script_type: ScriptType::OSIS,
caller_id: "".to_string(), // Empty caller_id should fail
context_id: "test_context".to_string(),
timeout: Some(60),
prerequisites: None,
};
let result = server.create_job(job_params).await;
assert!(result.is_err());
assert_eq!(result.unwrap_err(), ErrorCode::InvalidParams);
}
#[tokio::test]
async fn test_start_job() {
let server = create_test_server().await;
// First create a job
let job_params = JobParams {
script: "print('Test job');".to_string(),
script_type: ScriptType::OSIS,
caller_id: "test_caller".to_string(),
context_id: "test_context".to_string(),
timeout: Some(60),
prerequisites: None,
};
let job_id = server.create_job(job_params).await.unwrap();
// Then start the job
let result = server.start_job(job_id).await;
assert!(result.is_ok());
let start_result = result.unwrap();
assert!(start_result.success);
}
#[tokio::test]
async fn test_get_job_status() {
let server = create_test_server().await;
// First create a job
let job_params = JobParams {
script: "print('Status test');".to_string(),
script_type: ScriptType::OSIS,
caller_id: "test_caller".to_string(),
context_id: "test_context".to_string(),
timeout: Some(60),
prerequisites: None,
};
let job_id = server.create_job(job_params).await.unwrap();
// Get job status
let result = server.get_job_status(job_id).await;
assert!(result.is_ok());
let status = result.unwrap();
// Status should be one of the valid JobStatus variants
match status {
JobStatus::Dispatched | JobStatus::WaitingForPrerequisites |
JobStatus::Started | JobStatus::Error | JobStatus::Finished => {
// Valid status
}
}
}
#[tokio::test]
async fn test_get_job_output() {
let server = create_test_server().await;
// First create a job
let job_params = JobParams {
script: "print('Output test');".to_string(),
script_type: ScriptType::OSIS,
caller_id: "test_caller".to_string(),
context_id: "test_context".to_string(),
timeout: Some(60),
prerequisites: None,
};
let job_id = server.create_job(job_params).await.unwrap();
// Get job output
let result = server.get_job_output(job_id).await;
assert!(result.is_ok());
let output = result.unwrap();
assert!(!output.is_empty());
}
#[tokio::test]
async fn test_list_jobs() {
let server = create_test_server().await;
// Create a few jobs first
for i in 0..3 {
let job_params = JobParams {
script: format!("print('Job {}');", i),
script_type: ScriptType::OSIS,
caller_id: "test_caller".to_string(),
context_id: "test_context".to_string(),
timeout: Some(60),
prerequisites: None,
};
let _ = server.create_job(job_params).await.unwrap();
}
// List all jobs
let result = server.list_jobs().await;
assert!(result.is_ok());
let jobs = result.unwrap();
assert!(jobs.len() >= 3); // Should have at least the 3 jobs we created
// Verify job structure
for job in jobs {
assert!(!job.id.is_empty());
assert!(uuid::Uuid::parse_str(&job.id).is_ok());
}
}
#[tokio::test]
async fn test_stop_job() {
let server = create_test_server().await;
// First create and start a job
let job_params = JobParams {
script: "print('Stop test');".to_string(),
script_type: ScriptType::OSIS,
caller_id: "test_caller".to_string(),
context_id: "test_context".to_string(),
timeout: Some(60),
prerequisites: None,
};
let job_id = server.create_job(job_params).await.unwrap();
let _ = server.start_job(job_id.clone()).await.unwrap();
// Stop the job
let result = server.stop_job(job_id).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_delete_job() {
let server = create_test_server().await;
// First create a job
let job_params = JobParams {
script: "print('Delete test');".to_string(),
script_type: ScriptType::OSIS,
caller_id: "test_caller".to_string(),
context_id: "test_context".to_string(),
timeout: Some(60),
prerequisites: None,
};
let job_id = server.create_job(job_params).await.unwrap();
// Delete the job
let result = server.delete_job(job_id).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_clear_all_jobs() {
let server = create_test_server().await;
// Create a few jobs first
for i in 0..3 {
let job_params = JobParams {
script: format!("print('Clear test {}');", i),
script_type: ScriptType::OSIS,
caller_id: "test_caller".to_string(),
context_id: "test_context".to_string(),
timeout: Some(60),
prerequisites: None,
};
let _ = server.create_job(job_params).await.unwrap();
}
// Clear all jobs
let result = server.clear_all_jobs().await;
assert!(result.is_ok());
// Verify jobs are cleared
let jobs = server.list_jobs().await.unwrap();
assert_eq!(jobs.len(), 0);
}
#[tokio::test]
async fn test_run_job() {
let server = create_test_server().await;
let script = "print('Run job test');".to_string();
let script_type = ScriptType::OSIS;
let prerequisites = None;
let result = server.run_job(script, script_type, prerequisites).await;
assert!(result.is_ok());
let output = result.unwrap();
assert!(!output.is_empty());
assert!(output.contains("Run job test"));
}
#[tokio::test]
async fn test_play_script() {
let server = create_test_server().await;
let script = "print('Play script test');".to_string();
let result = server.play(script.clone()).await;
assert!(result.is_ok());
let play_result = result.unwrap();
assert!(!play_result.output.is_empty());
assert!(play_result.output.contains(&script));
}
#[tokio::test]
async fn test_get_job_logs() {
let server = create_test_server().await;
// First create a job
let job_params = JobParams {
script: "print('Logs test');".to_string(),
script_type: ScriptType::OSIS,
caller_id: "test_caller".to_string(),
context_id: "test_context".to_string(),
timeout: Some(60),
prerequisites: None,
};
let job_id = server.create_job(job_params).await.unwrap();
// Get job logs
let result = server.get_job_logs(job_id).await;
assert!(result.is_ok());
let logs_result = result.unwrap();
assert!(!logs_result.logs.is_empty());
}
#[tokio::test]
async fn test_job_builder_integration() {
// Test that JobBuilder is working correctly with all the fields
let job_params = JobParams {
script: "print('JobBuilder test');".to_string(),
script_type: ScriptType::V,
caller_id: "test_caller".to_string(),
context_id: "test_context".to_string(),
timeout: Some(300),
prerequisites: Some(vec!["prereq1".to_string(), "prereq2".to_string()]),
};
// Build job using JobBuilder (similar to what the server does)
let mut builder = JobBuilder::new()
.caller_id(&job_params.caller_id)
.context_id(&job_params.context_id)
.script(&job_params.script)
.script_type(job_params.script_type);
if let Some(timeout_secs) = job_params.timeout {
builder = builder.timeout(Duration::from_secs(timeout_secs));
}
if let Some(prerequisites) = job_params.prerequisites {
builder = builder.prerequisites(prerequisites);
}
let job = builder.build();
assert!(job.is_ok());
let job = job.unwrap();
assert_eq!(job.caller_id, "test_caller");
assert_eq!(job.context_id, "test_context");
assert_eq!(job.script, "print('JobBuilder test');");
assert_eq!(job.script_type, ScriptType::V);
assert_eq!(job.timeout, Duration::from_secs(300));
assert_eq!(job.prerequisites, vec!["prereq1".to_string(), "prereq2".to_string()]);
}
#[tokio::test]
async fn test_error_handling() {
let server = create_test_server().await;
// Test getting status for non-existent job
let result = server.get_job_status("non_existent_job".to_string()).await;
// Should return an error or handle gracefully
match result {
Ok(_) => {
// Some implementations might return a default status
},
Err(error_code) => {
assert_eq!(error_code, ErrorCode::InvalidParams);
}
}
// Test getting output for non-existent job
let result = server.get_job_output("non_existent_job".to_string()).await;
match result {
Ok(output) => {
// Should return "No output available" or similar
assert!(output.contains("No output available") || output.is_empty());
},
Err(error_code) => {
assert_eq!(error_code, ErrorCode::InvalidParams);
}
}
}