diff --git a/interfaces/openrpc/client/Cargo.toml b/interfaces/openrpc/client/Cargo.toml new file mode 100644 index 0000000..3c4c24d --- /dev/null +++ b/interfaces/openrpc/client/Cargo.toml @@ -0,0 +1,42 @@ +[package] +name = "hero-openrpc-client" +version = "0.1.0" +edition = "2021" + +[[bin]] +name = "hero-openrpc-client" +path = "cmd/main.rs" + +[dependencies] +# Core dependencies +tokio = { version = "1.0", features = ["full"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +anyhow = "1.0" +thiserror = "1.0" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +clap = { version = "4.0", features = ["derive"] } + +# JSON-RPC dependencies +jsonrpsee = { version = "0.21", features = [ + "client", + "macros" +] } +async-trait = "0.1" + +# Hero dependencies +hero_job = { path = "../../../core/job" } + +# Authentication and crypto +secp256k1 = { version = "0.28", features = ["rand", "recovery"] } +hex = "0.4" +sha2 = "0.10" +rand = "0.8" + +# CLI utilities +dialoguer = "0.11" +colored = "2.0" + +# Async utilities +futures = "0.3" diff --git a/interfaces/openrpc/client/cmd/main.rs b/interfaces/openrpc/client/cmd/main.rs new file mode 100644 index 0000000..47f7d84 --- /dev/null +++ b/interfaces/openrpc/client/cmd/main.rs @@ -0,0 +1,472 @@ +use anyhow::Result; +use clap::{Parser, Subcommand}; +use colored::*; +use dialoguer::{Input, Select, Confirm, MultiSelect}; +use hero_job::ScriptType; +use hero_openrpc_client::{ + AuthHelper, ClientTransport, HeroOpenRpcClient, JobParams, +}; +use std::path::PathBuf; +use tracing::{error, info, Level}; +use tracing_subscriber; + +#[derive(Parser)] +#[command(name = "hero-openrpc-client")] +#[command(about = "Hero OpenRPC Client - Interactive JSON-RPC client")] +struct Cli { + #[command(subcommand)] + command: Commands, + + /// Private key for authentication (hex format) + #[arg(long)] + private_key: Option, + + /// Generate a new private key and exit + #[arg(long)] + generate_key: bool, + + /// Log level + #[arg(long, default_value = "info")] + log_level: String, +} + +#[derive(Subcommand)] +enum Commands { + /// Connect to WebSocket server + Websocket { + /// Server URL + #[arg(long, default_value = "ws://127.0.0.1:9944")] + url: String, + }, + /// Connect to Unix socket server + Unix { + /// Unix socket path + #[arg(long, default_value = "/tmp/hero-openrpc.sock")] + socket_path: PathBuf, + }, +} + +/// Available RPC methods with descriptions +#[derive(Debug, Clone)] +struct RpcMethod { + name: &'static str, + description: &'static str, + requires_auth: bool, +} + +const RPC_METHODS: &[RpcMethod] = &[ + RpcMethod { + name: "fetch_nonce", + description: "Fetch a nonce for authentication", + requires_auth: false, + }, + RpcMethod { + name: "authenticate", + description: "Authenticate with public key and signature", + requires_auth: false, + }, + RpcMethod { + name: "whoami", + description: "Get authentication status and user information", + requires_auth: true, + }, + RpcMethod { + name: "play", + description: "Execute a Rhai script immediately", + requires_auth: true, + }, + RpcMethod { + name: "create_job", + description: "Create a new job without starting it", + requires_auth: true, + }, + RpcMethod { + name: "start_job", + description: "Start a previously created job", + requires_auth: true, + }, + RpcMethod { + name: "run_job", + description: "Create and run a job, returning result when complete", + requires_auth: true, + }, + RpcMethod { + name: "get_job_status", + description: "Get the current status of a job", + requires_auth: true, + }, + RpcMethod { + name: "get_job_output", + description: "Get the output of a completed job", + requires_auth: true, + }, + RpcMethod { + name: "get_job_logs", + description: "Get the logs of a job", + requires_auth: true, + }, + RpcMethod { + name: "list_jobs", + description: "List all jobs in the system", + requires_auth: true, + }, + RpcMethod { + name: "stop_job", + description: "Stop a running job", + requires_auth: true, + }, + RpcMethod { + name: "delete_job", + description: "Delete a job from the system", + requires_auth: true, + }, + RpcMethod { + name: "clear_all_jobs", + description: "Clear all jobs from the system", + requires_auth: true, + }, +]; + +#[tokio::main] +async fn main() -> Result<()> { + let cli = Cli::parse(); + + // Initialize tracing + let log_level = match cli.log_level.to_lowercase().as_str() { + "trace" => Level::TRACE, + "debug" => Level::DEBUG, + "info" => Level::INFO, + "warn" => Level::WARN, + "error" => Level::ERROR, + _ => Level::INFO, + }; + + tracing_subscriber::fmt() + .with_max_level(log_level) + .init(); + + // Handle key generation + if cli.generate_key { + let auth_helper = AuthHelper::generate()?; + println!("{}", "Generated new private key:".green().bold()); + println!("Private Key: {}", auth_helper.private_key_hex().yellow()); + println!("Public Key: {}", auth_helper.public_key_hex().cyan()); + println!(); + println!("{}", "Save the private key securely and use it with --private-key".bright_yellow()); + return Ok(()); + } + + let transport = match cli.command { + Commands::Websocket { url } => { + println!("{} {}", "Connecting to WebSocket server:".green(), url.cyan()); + ClientTransport::WebSocket(url) + } + Commands::Unix { socket_path } => { + println!("{} {:?}", "Connecting to Unix socket server:".green(), socket_path); + ClientTransport::Unix(socket_path) + } + }; + + // Connect to the server + let client = HeroOpenRpcClient::connect(transport).await?; + println!("{}", "Connected successfully!".green().bold()); + + // Handle authentication if private key is provided + let mut authenticated = false; + if let Some(private_key) = cli.private_key { + println!("{}", "Authenticating...".yellow()); + match client.authenticate_with_key(&private_key).await { + Ok(true) => { + println!("{}", "Authentication successful!".green().bold()); + authenticated = true; + } + Ok(false) => { + println!("{}", "Authentication failed!".red().bold()); + } + Err(e) => { + error!("Authentication error: {}", e); + println!("{} {}", "Authentication error:".red().bold(), e); + } + } + } else { + println!("{}", "No private key provided. Some methods will require authentication.".yellow()); + println!("{}", "Use --generate-key to create a new key or --private-key to use an existing one.".bright_yellow()); + } + + println!(); + + // Interactive loop + loop { + // Filter methods based on authentication status + let available_methods: Vec<&RpcMethod> = RPC_METHODS + .iter() + .filter(|method| !method.requires_auth || authenticated) + .collect(); + + if available_methods.is_empty() { + println!("{}", "No methods available. Please authenticate first.".red()); + break; + } + + // Display method selection + let method_names: Vec = available_methods + .iter() + .map(|method| { + if method.requires_auth && !authenticated { + format!("{} {} (requires auth)", method.name.red(), method.description) + } else { + format!("{} {}", method.name.green(), method.description) + } + }) + .collect(); + + let selection = Select::new() + .with_prompt("Select an RPC method to call") + .items(&method_names) + .default(0) + .interact_opt()?; + + let Some(selection) = selection else { + println!("{}", "Goodbye!".cyan()); + break; + }; + + let selected_method = available_methods[selection]; + println!(); + println!("{} {}", "Selected method:".bold(), selected_method.name.green()); + + // Handle method-specific parameter collection and execution + match execute_method(&client, selected_method.name).await { + Ok(_) => {} + Err(e) => { + error!("Method execution failed: {}", e); + println!("{} {}", "Error:".red().bold(), e); + } + } + + println!(); + + // Ask if user wants to continue + if !Confirm::new() + .with_prompt("Do you want to call another method?") + .default(true) + .interact()? + { + break; + } + + println!(); + } + + println!("{}", "Goodbye!".cyan().bold()); + Ok(()) +} + +async fn execute_method(client: &HeroOpenRpcClient, method_name: &str) -> Result<()> { + match method_name { + "fetch_nonce" => { + let pubkey: String = Input::new() + .with_prompt("Public key (hex)") + .interact_text()?; + + let result = client.fetch_nonce(pubkey).await?; + println!("{} {}", "Nonce:".green().bold(), result.yellow()); + } + + "authenticate" => { + let pubkey: String = Input::new() + .with_prompt("Public key (hex)") + .interact_text()?; + + let signature: String = Input::new() + .with_prompt("Signature (hex)") + .interact_text()?; + + let result = client.authenticate(pubkey, signature, nonce).await?; + println!("{} {}", "Authentication result:".green().bold(), + if result { "Success".green() } else { "Failed".red() }); + } + + "whoami" => { + let result = client.whoami().await?; + println!("{} {}", "User info:".green().bold(), + serde_json::to_string_pretty(&result)?.cyan()); + } + + "play" => { + let script: String = Input::new() + .with_prompt("Rhai script to execute") + .interact_text()?; + + let result = client.play(script).await?; + println!("{} {}", "Script output:".green().bold(), result.output.cyan()); + } + + "create_job" => { + let script: String = Input::new() + .with_prompt("Script content") + .interact_text()?; + + let script_types = ["HeroScript", "RhaiSAL", "RhaiDSL"]; + let script_type_selection = Select::new() + .with_prompt("Script type") + .items(&script_types) + .default(0) + .interact()?; + + let script_type = match script_type_selection { + 0 => ScriptType::HeroScript, + 1 => ScriptType::RhaiSAL, + 2 => ScriptType::RhaiDSL, + _ => ScriptType::HeroScript, + }; + + let add_prerequisites = Confirm::new() + .with_prompt("Add prerequisites?") + .default(false) + .interact()?; + + let prerequisites = if add_prerequisites { + let prereq_input: String = Input::new() + .with_prompt("Prerequisites (comma-separated job IDs)") + .interact_text()?; + Some(prereq_input.split(',').map(|s| s.trim().to_string()).collect()) + } else { + None + }; + + let job_params = JobParams { + script, + script_type, + prerequisites, + }; + + let result = client.create_job(job_params).await?; + println!("{} {}", "Created job ID:".green().bold(), result.yellow()); + } + + "start_job" => { + let job_id: String = Input::new() + .with_prompt("Job ID to start") + .interact_text()?; + + let result = client.start_job(job_id).await?; + println!("{} {}", "Start result:".green().bold(), + if result.success { "Success".green() } else { "Failed".red() }); + } + + "run_job" => { + let script: String = Input::new() + .with_prompt("Script content") + .interact_text()?; + + let script_types = ["HeroScript", "RhaiSAL", "RhaiDSL"]; + let script_type_selection = Select::new() + .with_prompt("Script type") + .items(&script_types) + .default(0) + .interact()?; + + let script_type = match script_type_selection { + 0 => ScriptType::HeroScript, + 1 => ScriptType::RhaiSAL, + 2 => ScriptType::RhaiDSL, + _ => ScriptType::HeroScript, + }; + + let add_prerequisites = Confirm::new() + .with_prompt("Add prerequisites?") + .default(false) + .interact()?; + + let prerequisites = if add_prerequisites { + let prereq_input: String = Input::new() + .with_prompt("Prerequisites (comma-separated job IDs)") + .interact_text()?; + Some(prereq_input.split(',').map(|s| s.trim().to_string()).collect()) + } else { + None + }; + + let result = client.run_job(script, script_type, prerequisites).await?; + println!("{} {}", "Job result:".green().bold(), result.cyan()); + } + + "get_job_status" => { + let job_id: String = Input::new() + .with_prompt("Job ID") + .interact_text()?; + + let result = client.get_job_status(job_id).await?; + println!("{} {:?}", "Job status:".green().bold(), result); + } + + "get_job_output" => { + let job_id: String = Input::new() + .with_prompt("Job ID") + .interact_text()?; + + let result = client.get_job_output(job_id).await?; + println!("{} {}", "Job output:".green().bold(), result.cyan()); + } + + "get_job_logs" => { + let job_id: String = Input::new() + .with_prompt("Job ID") + .interact_text()?; + + let result = client.get_job_logs(job_id).await?; + println!("{} {}", "Job logs:".green().bold(), result.logs.cyan()); + } + + "list_jobs" => { + let result = client.list_jobs().await?; + println!("{}", "Jobs:".green().bold()); + for job in result { + println!(" {} - {} ({:?})", + job.id().yellow(), + job.script_type(), + job.status() + ); + } + } + + "stop_job" => { + let job_id: String = Input::new() + .with_prompt("Job ID to stop") + .interact_text()?; + + client.stop_job(job_id.clone()).await?; + println!("{} {}", "Stopped job:".green().bold(), job_id.yellow()); + } + + "delete_job" => { + let job_id: String = Input::new() + .with_prompt("Job ID to delete") + .interact_text()?; + + client.delete_job(job_id.clone()).await?; + println!("{} {}", "Deleted job:".green().bold(), job_id.yellow()); + } + + "clear_all_jobs" => { + let confirm = Confirm::new() + .with_prompt("Are you sure you want to clear ALL jobs?") + .default(false) + .interact()?; + + if confirm { + client.clear_all_jobs().await?; + println!("{}", "Cleared all jobs".green().bold()); + } else { + println!("{}", "Operation cancelled".yellow()); + } + } + + _ => { + println!("{} {}", "Unknown method:".red().bold(), method_name); + } + } + + Ok(()) +} diff --git a/interfaces/openrpc/client/src/auth.rs b/interfaces/openrpc/client/src/auth.rs new file mode 100644 index 0000000..47bdeb9 --- /dev/null +++ b/interfaces/openrpc/client/src/auth.rs @@ -0,0 +1,81 @@ +use anyhow::{anyhow, Result}; +use secp256k1::{Message, PublicKey, ecdsa::Signature, Secp256k1, SecretKey}; +use sha2::{Digest, Sha256}; + +/// Helper for authentication operations +pub struct AuthHelper { + secret_key: SecretKey, + public_key: PublicKey, + secp: Secp256k1, +} + +impl AuthHelper { + /// Create a new auth helper from a private key hex string + pub fn new(private_key_hex: &str) -> Result { + let secp = Secp256k1::new(); + + let secret_key_bytes = hex::decode(private_key_hex) + .map_err(|_| anyhow!("Invalid private key hex format"))?; + + let secret_key = SecretKey::from_slice(&secret_key_bytes) + .map_err(|_| anyhow!("Invalid private key"))?; + + let public_key = PublicKey::from_secret_key(&secp, &secret_key); + + Ok(Self { + secret_key, + public_key, + secp, + }) + } + + /// Generate a new random private key + pub fn generate() -> Result { + let secp = Secp256k1::new(); + let (secret_key, public_key) = secp.generate_keypair(&mut rand::thread_rng()); + + Ok(Self { + secret_key, + public_key, + secp, + }) + } + + /// Get the public key as a hex string + pub fn public_key_hex(&self) -> String { + hex::encode(self.public_key.serialize()) + } + + /// Get the private key as a hex string + pub fn private_key_hex(&self) -> String { + hex::encode(self.secret_key.secret_bytes()) + } + + /// Sign a message and return the signature as hex + pub fn sign_message(&self, message: &str) -> Result { + let message_hash = Sha256::digest(message.as_bytes()); + let message = Message::from_slice(&message_hash) + .map_err(|_| anyhow!("Failed to create message from hash"))?; + + let signature = self.secp.sign_ecdsa(&message, &self.secret_key); + Ok(hex::encode(signature.serialize_compact())) + } + + /// Verify a signature against a message + pub fn verify_signature(&self, message: &str, signature_hex: &str) -> Result { + let message_hash = Sha256::digest(message.as_bytes()); + let message = Message::from_slice(&message_hash) + .map_err(|_| anyhow!("Failed to create message from hash"))?; + + let signature_bytes = hex::decode(signature_hex) + .map_err(|_| anyhow!("Invalid signature hex format"))?; + + let signature = Signature::from_compact(&signature_bytes) + .map_err(|_| anyhow!("Invalid signature format"))?; + + match self.secp.verify_ecdsa(&message, &signature, &self.public_key) { + Ok(_) => Ok(true), + Err(_) => Ok(false), + } + } +} diff --git a/interfaces/openrpc/client/src/lib.rs b/interfaces/openrpc/client/src/lib.rs new file mode 100644 index 0000000..ca29e3d --- /dev/null +++ b/interfaces/openrpc/client/src/lib.rs @@ -0,0 +1,212 @@ +use anyhow::Result; +use async_trait::async_trait; +use hero_job::{Job, JobStatus, ScriptType}; +use jsonrpsee::core::client::ClientT; +use jsonrpsee::core::ClientError; +use jsonrpsee::proc_macros::rpc; +use jsonrpsee::rpc_params; +use jsonrpsee::ws_client::{WsClient, WsClientBuilder}; +use std::path::PathBuf; +use tracing::{error, info}; + +mod auth; +mod types; + +pub use auth::*; +pub use types::*; + +/// Transport configuration for the client +#[derive(Debug, Clone)] +pub enum ClientTransport { + WebSocket(String), +} + +/// OpenRPC client trait defining all available methods +#[rpc(client)] +pub trait OpenRpcClient { + // Authentication methods + #[method(name = "fetch_nonce")] + async fn fetch_nonce(&self, pubkey: String) -> Result; + + #[method(name = "authenticate")] + async fn authenticate( + &self, + pubkey: String, + signature: String, + nonce: String, + ) -> Result; + + #[method(name = "whoami")] + async fn whoami(&self) -> Result; + + // Script execution + #[method(name = "play")] + async fn play(&self, script: String) -> Result; + + // Job management + #[method(name = "create_job")] + async fn create_job(&self, job: JobParams) -> Result; + + #[method(name = "start_job")] + async fn start_job(&self, job_id: String) -> Result; + + #[method(name = "run_job")] + async fn run_job( + &self, + script: String, + script_type: ScriptType, + prerequisites: Option>, + ) -> Result; + + #[method(name = "get_job_status")] + async fn get_job_status(&self, job_id: String) -> Result; + + #[method(name = "get_job_output")] + async fn get_job_output(&self, job_id: String) -> Result; + + #[method(name = "get_job_logs")] + async fn get_job_logs(&self, job_id: String) -> Result; + + #[method(name = "list_jobs")] + async fn list_jobs(&self) -> Result, ClientError>; + + #[method(name = "stop_job")] + async fn stop_job(&self, job_id: String) -> Result<(), ClientError>; + + #[method(name = "delete_job")] + async fn delete_job(&self, job_id: String) -> Result<(), ClientError>; + + #[method(name = "clear_all_jobs")] + async fn clear_all_jobs(&self) -> Result<(), ClientError>; +} + +/// Wrapper client that can use WebSocket transport +pub struct HeroOpenRpcClient { + client: WsClient, +} + +impl HeroOpenRpcClient { + /// Connect to the OpenRPC server using the specified transport + pub async fn connect(transport: ClientTransport) -> Result { + match transport { + ClientTransport::WebSocket(url) => { + info!("Connecting to WebSocket server at {}", url); + let client = WsClientBuilder::default() + .build(&url) + .await?; + Ok(Self { client }) + } + } + } + + /// Get the underlying client for making RPC calls + pub fn client(&self) -> &WsClient { + &self.client + } + + /// Authenticate with the server using a private key + pub async fn authenticate_with_key(&self, private_key: &str) -> Result { + let auth_helper = AuthHelper::new(private_key)?; + + // Get nonce + let pubkey = auth_helper.public_key_hex(); + let nonce: String = self.client.fetch_nonce(pubkey.clone()).await?; + + // Sign nonce + let signature = auth_helper.sign_message(&nonce)?; + + // Authenticate + let result = self.client.authenticate(pubkey, signature, nonce).await?; + + if result { + info!("Authentication successful"); + } else { + error!("Authentication failed"); + } + + Ok(result) + } +} + +// Implement delegation methods on HeroOpenRpcClient to use the generated trait methods +impl HeroOpenRpcClient { + /// Delegate to fetch_nonce on the underlying client + pub async fn fetch_nonce(&self, pubkey: String) -> Result { + self.client.fetch_nonce(pubkey).await + } + + /// Delegate to authenticate on the underlying client + pub async fn authenticate( + &self, + pubkey: String, + signature: String, + nonce: String, + ) -> Result { + self.client.authenticate(pubkey, signature, nonce).await + } + + /// Delegate to whoami on the underlying client + pub async fn whoami(&self) -> Result { + self.client.whoami().await + } + + /// Delegate to play on the underlying client + pub async fn play(&self, script: String) -> Result { + self.client.play(script).await + } + + /// Delegate to create_job on the underlying client + pub async fn create_job(&self, job: JobParams) -> Result { + self.client.create_job(job).await + } + + /// Delegate to start_job on the underlying client + pub async fn start_job(&self, job_id: String) -> Result { + self.client.start_job(job_id).await + } + + /// Delegate to run_job on the underlying client + pub async fn run_job( + &self, + script: String, + script_type: ScriptType, + prerequisites: Option>, + ) -> Result { + self.client.run_job(script, script_type, prerequisites).await + } + + /// Delegate to get_job_status on the underlying client + pub async fn get_job_status(&self, job_id: String) -> Result { + self.client.get_job_status(job_id).await + } + + /// Delegate to get_job_output on the underlying client + pub async fn get_job_output(&self, job_id: String) -> Result { + self.client.get_job_output(job_id).await + } + + /// Delegate to get_job_logs on the underlying client + pub async fn get_job_logs(&self, job_id: String) -> Result { + self.client.get_job_logs(job_id).await + } + + /// Delegate to list_jobs on the underlying client + pub async fn list_jobs(&self) -> Result, ClientError> { + self.client.list_jobs().await + } + + /// Delegate to stop_job on the underlying client + pub async fn stop_job(&self, job_id: String) -> Result<(), ClientError> { + self.client.stop_job(job_id).await + } + + /// Delegate to delete_job on the underlying client + pub async fn delete_job(&self, job_id: String) -> Result<(), ClientError> { + self.client.delete_job(job_id).await + } + + /// Delegate to clear_all_jobs on the underlying client + pub async fn clear_all_jobs(&self) -> Result<(), ClientError> { + self.client.clear_all_jobs().await + } +} diff --git a/interfaces/openrpc/client/src/types.rs b/interfaces/openrpc/client/src/types.rs new file mode 100644 index 0000000..2e6530b --- /dev/null +++ b/interfaces/openrpc/client/src/types.rs @@ -0,0 +1,28 @@ +use hero_job::ScriptType; +use serde::{Deserialize, Serialize}; + +/// Parameters for creating a job +#[derive(Debug, Serialize, Deserialize)] +pub struct JobParams { + pub script: String, + pub script_type: ScriptType, + pub prerequisites: Option>, +} + +/// Result of script execution +#[derive(Debug, Serialize, Deserialize)] +pub struct PlayResult { + pub output: String, +} + +/// Result of starting a job +#[derive(Debug, Serialize, Deserialize)] +pub struct StartJobResult { + pub success: bool, +} + +/// Result of getting job logs +#[derive(Debug, Serialize, Deserialize)] +pub struct JobLogsResult { + pub logs: String, +} diff --git a/interfaces/openrpc/server/Cargo.toml b/interfaces/openrpc/server/Cargo.toml new file mode 100644 index 0000000..ab1933e --- /dev/null +++ b/interfaces/openrpc/server/Cargo.toml @@ -0,0 +1,47 @@ +[package] +name = "hero-openrpc-server" +version = "0.1.0" +edition = "2021" + +[[bin]] +name = "hero-openrpc-server" +path = "cmd/main.rs" + +[dependencies] +# Core dependencies +tokio = { version = "1.0", features = ["full"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +anyhow = "1.0" +thiserror = "1.0" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +clap = { version = "4.0", features = ["derive"] } + +# JSON-RPC dependencies +jsonrpsee = { version = "0.21", features = [ + "server", + "macros" +] } +jsonrpsee-types = "0.21" +uuid = { version = "1.6", features = ["v4", "serde"] } +chrono = { version = "0.4", features = ["serde"] } + +# Hero dependencies +hero_supervisor = { path = "../../../core/supervisor" } +hero_job = { path = "../../../core/job" } + +# Authentication and crypto +secp256k1 = { version = "0.28", features = ["rand", "recovery"] } +hex = "0.4" +sha2 = "0.10" +rand = "0.8" + + +# Async utilities +futures = "0.3" + +# Test dependencies +[dev-dependencies] +tokio-test = "0.4" +uuid = { version = "1.6", features = ["v4"] } diff --git a/interfaces/openrpc/server/cmd/main.rs b/interfaces/openrpc/server/cmd/main.rs new file mode 100644 index 0000000..5a6d48a --- /dev/null +++ b/interfaces/openrpc/server/cmd/main.rs @@ -0,0 +1,95 @@ +use anyhow::Result; +use clap::{Parser, Subcommand}; +use hero_openrpc_server::{OpenRpcServer, OpenRpcServerConfig, Transport}; +use std::net::SocketAddr; +use std::path::PathBuf; +use tracing::{info, Level}; +use tracing_subscriber; + +#[derive(Parser)] +#[command(name = "hero-openrpc-server")] +#[command(about = "Hero OpenRPC Server - WebSocket and Unix socket JSON-RPC server")] +struct Cli { + #[command(subcommand)] + command: Commands, + + /// Path to supervisor configuration file + #[arg(long)] + supervisor_config: Option, + + /// Database path for supervisor + #[arg(long, default_value = "./supervisor.db")] + db_path: PathBuf, + + /// Log level + #[arg(long, default_value = "info")] + log_level: String, +} + +#[derive(Subcommand)] +enum Commands { + /// Start WebSocket server + Websocket { + /// Address to bind to + #[arg(long, default_value = "127.0.0.1:9944")] + addr: SocketAddr, + }, + /// Start Unix socket server + Unix { + /// Unix socket path + #[arg(long, default_value = "/tmp/hero-openrpc.sock")] + socket_path: PathBuf, + }, +} + +#[tokio::main] +async fn main() -> Result<()> { + let cli = Cli::parse(); + + // Initialize tracing + let log_level = match cli.log_level.to_lowercase().as_str() { + "trace" => Level::TRACE, + "debug" => Level::DEBUG, + "info" => Level::INFO, + "warn" => Level::WARN, + "error" => Level::ERROR, + _ => Level::INFO, + }; + + tracing_subscriber::fmt() + .with_max_level(log_level) + .init(); + + let transport = match cli.command { + Commands::Websocket { addr } => { + info!("Starting WebSocket server on {}", addr); + Transport::WebSocket(addr) + } + Commands::Unix { socket_path } => { + info!("Starting Unix socket server on {:?}", socket_path); + // Remove existing socket file if it exists + if socket_path.exists() { + std::fs::remove_file(&socket_path)?; + } + Transport::Unix(socket_path) + } + }; + + let config = OpenRpcServerConfig { + transport: transport.clone(), + supervisor_config_path: cli.supervisor_config, + db_path: cli.db_path, + }; + + // Create and start the server + let server = OpenRpcServer::new(config.clone()).await?; + let handle = server.start(config).await?; + + info!("Server started successfully"); + + // Wait for the server to finish + handle.stopped().await; + + info!("Server stopped"); + Ok(()) +} diff --git a/interfaces/openrpc/server/src/auth.rs b/interfaces/openrpc/server/src/auth.rs new file mode 100644 index 0000000..c67cda9 --- /dev/null +++ b/interfaces/openrpc/server/src/auth.rs @@ -0,0 +1,131 @@ +use anyhow::{anyhow, Result}; +use secp256k1::{Message, PublicKey, Secp256k1, ecdsa::Signature}; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use std::collections::HashMap; +use std::time::{SystemTime, UNIX_EPOCH}; + +/// Nonce response structure +#[derive(Debug, Serialize, Deserialize)] +pub struct NonceResponse { + pub nonce: String, + pub timestamp: u64, +} + +/// Authentication manager for handling nonces and signature verification +#[derive(Debug)] +pub struct AuthManager { + nonces: HashMap, + authenticated_keys: HashMap, // pubkey -> timestamp +} + +impl AuthManager { + /// Create a new authentication manager + pub fn new() -> Self { + Self { + nonces: HashMap::new(), + authenticated_keys: HashMap::new(), + } + } + + /// Generate a nonce for a given public key + pub fn generate_nonce(&mut self, pubkey: &str) -> String { + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + let nonce = format!("{}:{}", pubkey, timestamp); + let nonce_hash = format!("{:x}", Sha256::digest(nonce.as_bytes())); + + self.nonces.insert( + pubkey.to_string(), + NonceResponse { + nonce: nonce_hash.clone(), + timestamp, + }, + ); + + nonce_hash + } + + /// Verify a signature against a stored nonce + pub fn verify_signature(&mut self, pubkey: &str, signature: &str) -> Result { + // Get the nonce for this public key + let nonce_response = self + .nonces + .get(pubkey) + .ok_or_else(|| anyhow!("No nonce found for public key"))?; + + // Check if nonce is not too old (5 minutes) + let current_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + if current_time - nonce_response.timestamp > 300 { + return Err(anyhow!("Nonce expired")); + } + + // Parse the public key + let pubkey_bytes = hex::decode(pubkey) + .map_err(|_| anyhow!("Invalid public key format"))?; + + let secp = Secp256k1::new(); + let public_key = PublicKey::from_slice(&pubkey_bytes) + .map_err(|_| anyhow!("Invalid public key"))?; + + // Parse the signature + let signature_bytes = hex::decode(signature) + .map_err(|_| anyhow!("Invalid signature format"))?; + + let signature = Signature::from_compact(&signature_bytes) + .map_err(|_| anyhow!("Invalid signature"))?; + + // Create message hash from nonce + let message_hash = Sha256::digest(nonce_response.nonce.as_bytes()); + let message = Message::from_slice(&message_hash) + .map_err(|_| anyhow!("Failed to create message"))?; + + // Verify the signature + match secp.verify_ecdsa(&message, &signature, &public_key) { + Ok(_) => { + // Mark this key as authenticated + self.authenticated_keys.insert(pubkey.to_string(), current_time); + // Remove the used nonce + self.nonces.remove(pubkey); + Ok(true) + } + Err(_) => Ok(false), + } + } + + /// Check if a public key is currently authenticated + pub fn is_authenticated(&self, pubkey: &str) -> bool { + if let Some(×tamp) = self.authenticated_keys.get(pubkey) { + let current_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + // Authentication is valid for 1 hour + current_time - timestamp < 3600 + } else { + false + } + } + + /// Remove expired authentications + pub fn cleanup_expired(&mut self) { + let current_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); + + // Remove expired nonces (older than 5 minutes) + self.nonces.retain(|_, nonce| current_time - nonce.timestamp <= 300); + + // Remove expired authentications (older than 1 hour) + self.authenticated_keys.retain(|_, &mut timestamp| current_time - timestamp <= 3600); + } +} diff --git a/interfaces/openrpc/server/src/lib.rs b/interfaces/openrpc/server/src/lib.rs new file mode 100644 index 0000000..758031e --- /dev/null +++ b/interfaces/openrpc/server/src/lib.rs @@ -0,0 +1,471 @@ +use anyhow::Result; +use hero_job::{Job, JobBuilder, JobStatus, ScriptType}; +use hero_supervisor::{Supervisor, SupervisorBuilder}; +use jsonrpsee::core::async_trait; +use jsonrpsee::proc_macros::rpc; +use jsonrpsee::server::{ServerBuilder, ServerHandle}; +use jsonrpsee::RpcModule; +use jsonrpsee_types::error::ErrorCode; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::sync::RwLock; +use tracing::error; + +mod auth; +pub mod types; + +pub use auth::*; +pub use types::*; + +/// Transport type for the OpenRPC server +#[derive(Debug, Clone)] +pub enum Transport { + WebSocket(SocketAddr), + Unix(PathBuf), +} + +/// OpenRPC server configuration +#[derive(Debug, Clone)] +pub struct OpenRpcServerConfig { + pub transport: Transport, + pub supervisor_config_path: Option, + pub db_path: PathBuf, +} + +/// Main OpenRPC server state +#[derive(Clone)] +pub struct OpenRpcServer { + supervisor: Arc>, + auth_manager: Arc>, +} + +/// OpenRPC trait defining all available methods +#[rpc(server)] +pub trait OpenRpcApi { + // Authentication methods + #[method(name = "fetch_nonce")] + async fn fetch_nonce(&self, public_key: String) -> Result; + + #[method(name = "authenticate")] + async fn authenticate(&self, public_key: String, signature: String, nonce: String) -> Result; + + #[method(name = "whoami")] + async fn whoami(&self) -> Result; + + // Script execution + #[method(name = "play")] + async fn play(&self, script: String) -> Result; + + // Job management + #[method(name = "create_job")] + async fn create_job(&self, job_params: JobParams) -> Result; + + #[method(name = "start_job")] + async fn start_job(&self, job_id: String) -> Result; + + #[method(name = "run_job")] + async fn run_job( + &self, + script: String, + script_type: ScriptType, + prerequisites: Option>, + ) -> Result; + + #[method(name = "get_job_status")] + async fn get_job_status(&self, job_id: String) -> Result; + + #[method(name = "get_job_output")] + async fn get_job_output(&self, job_id: String) -> Result; + + #[method(name = "get_job_logs")] + async fn get_job_logs(&self, job_id: String) -> Result; + + #[method(name = "list_jobs")] + async fn list_jobs(&self) -> Result, ErrorCode>; + + #[method(name = "stop_job")] + async fn stop_job(&self, job_id: String) -> Result<(), ErrorCode>; + + #[method(name = "delete_job")] + async fn delete_job(&self, job_id: String) -> Result<(), ErrorCode>; + + #[method(name = "clear_all_jobs")] + async fn clear_all_jobs(&self) -> Result<(), ErrorCode>; +} + +impl OpenRpcServer { + /// Create a new OpenRPC server instance + pub async fn new(config: OpenRpcServerConfig) -> Result { + let supervisor = if let Some(config_path) = config.supervisor_config_path { + // Load supervisor from config file + SupervisorBuilder::from_toml(&config_path)? + .build().await? + } else { + // Create default supervisor with Redis URL + SupervisorBuilder::new() + .redis_url("redis://localhost:6379") + .build().await? + }; + + Ok(Self { + supervisor: Arc::new(RwLock::new(supervisor)), + auth_manager: Arc::new(RwLock::new(AuthManager::new())), + }) + } + + /// Start the OpenRPC server + pub async fn start(self, config: OpenRpcServerConfig) -> Result { + let mut module = RpcModule::new(()); + + // Register all the RPC methods + let server_clone = self.clone(); + module.register_async_method("fetch_nonce", move |params, _| { + let server = server_clone.clone(); + async move { + let public_key: String = params.one().map_err(|_| ErrorCode::InvalidParams)?; + server.fetch_nonce(public_key).await + } + })?; + + let server_clone = self.clone(); + module.register_async_method("authenticate", move |params, _| { + let server = server_clone.clone(); + async move { + let (public_key, signature, nonce): (String, String, String) = params.parse().map_err(|_| ErrorCode::InvalidParams)?; + server.authenticate(public_key, signature, nonce).await + } + })?; + + let server_clone = self.clone(); + module.register_async_method("whoami", move |_params, _| { + let server = server_clone.clone(); + async move { + server.whoami().await + } + })?; + + let server_clone = self.clone(); + module.register_async_method("play", move |params, _| { + let server = server_clone.clone(); + async move { + let script: String = params.one().map_err(|_| ErrorCode::InvalidParams)?; + server.play(script).await + } + })?; + + let server_clone = self.clone(); + module.register_async_method("create_job", move |params, _| { + let server = server_clone.clone(); + async move { + let job: JobParams = params.one().map_err(|_| ErrorCode::InvalidParams)?; + server.create_job(job).await + } + })?; + + let server_clone = self.clone(); + module.register_async_method("start_job", move |params, _| { + let server = server_clone.clone(); + async move { + let job_id: String = params.one().map_err(|_| ErrorCode::InvalidParams)?; + server.start_job(job_id).await + } + })?; + + let server_clone = self.clone(); + module.register_async_method("run_job", move |params, _| { + let server = server_clone.clone(); + async move { + let (script, script_type, prerequisites): (String, ScriptType, Option>) = params.parse().map_err(|_| ErrorCode::InvalidParams)?; + server.run_job(script, script_type, prerequisites).await + } + })?; + + let server_clone = self.clone(); + module.register_async_method("get_job_status", move |params, _| { + let server = server_clone.clone(); + async move { + let job_id: String = params.one().map_err(|_| ErrorCode::InvalidParams)?; + server.get_job_status(job_id).await + } + })?; + + let server_clone = self.clone(); + module.register_async_method("get_job_output", move |params, _| { + let server = server_clone.clone(); + async move { + let job_id: String = params.one().map_err(|_| ErrorCode::InvalidParams)?; + server.get_job_output(job_id).await + } + })?; + + let server_clone = self.clone(); + module.register_async_method("get_job_logs", move |params, _| { + let server = server_clone.clone(); + async move { + let job_id: String = params.one().map_err(|_| ErrorCode::InvalidParams)?; + server.get_job_logs(job_id).await + } + })?; + + let server_clone = self.clone(); + module.register_async_method("list_jobs", move |params, _| { + let server = server_clone.clone(); + async move { + let _: () = params.parse().map_err(|_| ErrorCode::InvalidParams)?; + server.list_jobs().await + } + })?; + + let server_clone = self.clone(); + module.register_async_method("stop_job", move |params, _| { + let server = server_clone.clone(); + async move { + let job_id: String = params.one().map_err(|_| ErrorCode::InvalidParams)?; + server.stop_job(job_id).await + } + })?; + + let server_clone = self.clone(); + module.register_async_method("delete_job", move |params, _| { + let server = server_clone.clone(); + async move { + let job_id: String = params.one().map_err(|_| ErrorCode::InvalidParams)?; + server.delete_job(job_id).await + } + })?; + + let server_clone = self.clone(); + module.register_async_method("clear_all_jobs", move |params, _| { + let server = server_clone.clone(); + async move { + let _: () = params.parse().map_err(|_| ErrorCode::InvalidParams)?; + server.clear_all_jobs().await + } + })?; + + match config.transport { + Transport::WebSocket(addr) => { + let server = ServerBuilder::default() + .build(addr) + .await?; + let handle = server.start(module); + Ok(handle) + } + Transport::Unix(_path) => { + // Unix socket transport not yet implemented in jsonrpsee 0.21 + return Err(anyhow::anyhow!("Unix socket transport not yet supported").into()); + } + } + } +} + +#[async_trait] +impl OpenRpcApiServer for OpenRpcServer { + async fn fetch_nonce(&self, public_key: String) -> Result { + let mut auth_manager = self.auth_manager.write().await; + let nonce = auth_manager.generate_nonce(&public_key); + Ok(nonce) + } + + async fn authenticate( + &self, + public_key: String, + signature: String, + _nonce: String, + ) -> Result { + let mut auth_manager = self.auth_manager.write().await; + match auth_manager.verify_signature(&public_key, &signature) { + Ok(is_valid) => Ok(is_valid), + Err(e) => { + error!("Authentication error: {}", e); + Ok(false) + } + } + } + + async fn whoami(&self) -> Result { + let _auth_manager = self.auth_manager.read().await; + // For now, return basic info - in a real implementation, + // you'd track authenticated sessions + Ok(serde_json::json!({ + "authenticated": true, + "user_id": "anonymous" + }).to_string()) + } + + async fn play(&self, script: String) -> Result { + let _supervisor = self.supervisor.read().await; + + // For now, return a simple result since we need to implement execute_script method + Ok(PlayResult { + output: format!("Script executed: {}", script) + }) + } + + async fn create_job(&self, job_params: JobParams) -> Result { + let supervisor = self.supervisor.read().await; + + // Use JobBuilder to create a Job instance + let mut builder = hero_job::JobBuilder::new() + .caller_id(&job_params.caller_id) + .context_id(&job_params.context_id) + .script(&job_params.script) + .script_type(job_params.script_type); + + // Set timeout if provided + if let Some(timeout_secs) = job_params.timeout { + builder = builder.timeout(std::time::Duration::from_secs(timeout_secs)); + } + + // Set prerequisites if provided + if let Some(prerequisites) = job_params.prerequisites { + builder = builder.prerequisites(prerequisites); + } + + // Build the job + let job = match builder.build() { + Ok(job) => job, + Err(e) => { + error!("Failed to build job: {}", e); + return Err(ErrorCode::InvalidParams); + } + }; + + let job_id = job.id.clone(); + + // Create the job using the supervisor + match supervisor.create_job(&job).await { + Ok(_) => Ok(job_id), + Err(e) => { + error!("Failed to create job: {}", e); + Err(ErrorCode::InternalError) + } + } + } + + async fn start_job(&self, job_id: String) -> Result { + let supervisor = self.supervisor.read().await; + + match supervisor.start_job(&job_id).await { + Ok(_) => Ok(StartJobResult { success: true }), + Err(e) => { + error!("Failed to start job {}: {}", job_id, e); + Ok(StartJobResult { success: false }) + } + } + } + + async fn run_job( + &self, + script: String, + script_type: ScriptType, + _prerequisites: Option>, + ) -> Result { + // For now, return a simple result + Ok(format!("Job executed with script: {} (type: {:?})", script, script_type)) + } + + async fn get_job_status(&self, job_id: String) -> Result { + let supervisor = self.supervisor.read().await; + + match supervisor.get_job_status(&job_id).await { + Ok(status) => Ok(status), + Err(e) => { + error!("Failed to get job status for {}: {}", job_id, e); + Err(ErrorCode::InvalidParams) + } + } + } + + async fn get_job_output(&self, job_id: String) -> Result { + let supervisor = self.supervisor.read().await; + + match supervisor.get_job_output(&job_id).await { + Ok(output) => Ok(output.unwrap_or_else(|| "No output available".to_string())), + Err(e) => { + error!("Failed to get job output for {}: {}", job_id, e); + Err(ErrorCode::InvalidParams) + } + } + } + + async fn get_job_logs(&self, job_id: String) -> Result { + // For now, return mock logs + Ok(JobLogsResult { + logs: format!("Logs for job {}", job_id), + }) + } + + async fn list_jobs(&self) -> Result, ErrorCode> { + let supervisor = self.supervisor.read().await; + + match supervisor.list_jobs().await { + Ok(job_ids) => { + // For now, create minimal Job objects with just the IDs + // In a real implementation, we'd need a supervisor.get_job() method + let jobs: Vec = job_ids.into_iter().map(|job_id| { + // Create a minimal job object - this is a temporary solution + // until supervisor.get_job() is implemented + Job { + id: job_id, + caller_id: "unknown".to_string(), + context_id: "unknown".to_string(), + script: "unknown".to_string(), + script_type: ScriptType::OSIS, + timeout: std::time::Duration::from_secs(30), + retries: 0, + concurrent: false, + log_path: None, + env_vars: std::collections::HashMap::new(), + prerequisites: Vec::new(), + dependents: Vec::new(), + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + } + }).collect(); + Ok(jobs) + }, + Err(e) => { + error!("Failed to list jobs: {}", e); + Err(ErrorCode::InternalError) + } + } + } + + async fn stop_job(&self, job_id: String) -> Result<(), ErrorCode> { + let supervisor = self.supervisor.read().await; + + match supervisor.stop_job(&job_id).await { + Ok(_) => Ok(()), + Err(e) => { + error!("Failed to stop job {}: {}", job_id, e); + Err(ErrorCode::InvalidParams) + } + } + } + + async fn delete_job(&self, job_id: String) -> Result<(), ErrorCode> { + let supervisor = self.supervisor.read().await; + + match supervisor.delete_job(&job_id).await { + Ok(_) => Ok(()), + Err(e) => { + error!("Failed to delete job {}: {}", job_id, e); + Err(ErrorCode::InvalidParams) + } + } + } + + async fn clear_all_jobs(&self) -> Result<(), ErrorCode> { + let supervisor = self.supervisor.read().await; + + match supervisor.clear_all_jobs().await { + Ok(_) => Ok(()), + Err(e) => { + error!("Failed to clear all jobs: {}", e); + Err(ErrorCode::InternalError) + } + } + } +} diff --git a/interfaces/openrpc/server/src/types.rs b/interfaces/openrpc/server/src/types.rs new file mode 100644 index 0000000..5f9dc62 --- /dev/null +++ b/interfaces/openrpc/server/src/types.rs @@ -0,0 +1,31 @@ +use hero_job::ScriptType; +use serde::{Deserialize, Serialize}; + +/// Parameters for creating a job +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct JobParams { + pub script: String, + pub script_type: ScriptType, + pub caller_id: String, + pub context_id: String, + pub timeout: Option, // timeout in seconds + pub prerequisites: Option>, +} + +/// Result of script execution +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct PlayResult { + pub output: String, +} + +/// Result of starting a job +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct StartJobResult { + pub success: bool, +} + +/// Result of getting job logs +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct JobLogsResult { + pub logs: String, +} diff --git a/interfaces/openrpc/server/tests/integration_tests.rs b/interfaces/openrpc/server/tests/integration_tests.rs new file mode 100644 index 0000000..4f29de1 --- /dev/null +++ b/interfaces/openrpc/server/tests/integration_tests.rs @@ -0,0 +1,409 @@ +use hero_openrpc_server::{OpenRpcServer, OpenRpcServerConfig, OpenRpcApiServer, Transport, types::*}; +use hero_supervisor::{Supervisor, SupervisorBuilder}; +use hero_job::{JobBuilder, JobStatus, ScriptType}; +use jsonrpsee_types::error::ErrorCode; +use std::sync::Arc; +use tokio::sync::RwLock; +use std::time::Duration; + +/// Helper function to create a test supervisor +async fn create_test_supervisor() -> Arc> { + let supervisor = SupervisorBuilder::new() + .redis_url("redis://localhost:6379") + .build() + .await + .expect("Failed to create test supervisor"); + + Arc::new(RwLock::new(supervisor)) +} + +/// Helper function to create a test OpenRPC server +async fn create_test_server() -> OpenRpcServer { + use std::net::SocketAddr; + use std::path::PathBuf; + + let config = OpenRpcServerConfig { + transport: Transport::WebSocket("127.0.0.1:0".parse::().unwrap()), + supervisor_config_path: None, + db_path: PathBuf::from("/tmp/test_openrpc.db"), + }; + OpenRpcServer::new(config).await.expect("Failed to create OpenRPC server") +} + +#[tokio::test] +async fn test_fetch_nonce() { + let server = create_test_server().await; + let public_key = "test_public_key".to_string(); + + let result = server.fetch_nonce(public_key).await; + assert!(result.is_ok()); + + let nonce = result.unwrap(); + assert!(!nonce.is_empty()); + assert_eq!(nonce.len(), 64); // Should be a 32-byte hex string +} + +#[tokio::test] +async fn test_create_job_success() { + let server = create_test_server().await; + + let job_params = JobParams { + script: "print('Hello, World!');".to_string(), + script_type: ScriptType::OSIS, + caller_id: "test_caller".to_string(), + context_id: "test_context".to_string(), + timeout: Some(60), + prerequisites: None, + }; + + let result = server.create_job(job_params).await; + assert!(result.is_ok()); + + let job_id = result.unwrap(); + assert!(!job_id.is_empty()); + // Job ID should be a valid UUID format + assert!(uuid::Uuid::parse_str(&job_id).is_ok()); +} + +#[tokio::test] +async fn test_create_job_with_prerequisites() { + let server = create_test_server().await; + + let job_params = JobParams { + script: "print('Job with prerequisites');".to_string(), + script_type: ScriptType::SAL, + caller_id: "test_caller".to_string(), + context_id: "test_context".to_string(), + timeout: Some(120), + prerequisites: Some(vec!["prereq_job_1".to_string(), "prereq_job_2".to_string()]), + }; + + let result = server.create_job(job_params).await; + assert!(result.is_ok()); + + let job_id = result.unwrap(); + assert!(!job_id.is_empty()); +} + +#[tokio::test] +async fn test_create_job_invalid_params() { + let server = create_test_server().await; + + // Test with empty caller_id (should fail JobBuilder validation) + let job_params = JobParams { + script: "print('Test');".to_string(), + script_type: ScriptType::OSIS, + caller_id: "".to_string(), // Empty caller_id should fail + context_id: "test_context".to_string(), + timeout: Some(60), + prerequisites: None, + }; + + let result = server.create_job(job_params).await; + assert!(result.is_err()); + assert_eq!(result.unwrap_err(), ErrorCode::InvalidParams); +} + +#[tokio::test] +async fn test_start_job() { + let server = create_test_server().await; + + // First create a job + let job_params = JobParams { + script: "print('Test job');".to_string(), + script_type: ScriptType::OSIS, + caller_id: "test_caller".to_string(), + context_id: "test_context".to_string(), + timeout: Some(60), + prerequisites: None, + }; + + let job_id = server.create_job(job_params).await.unwrap(); + + // Then start the job + let result = server.start_job(job_id).await; + assert!(result.is_ok()); + + let start_result = result.unwrap(); + assert!(start_result.success); +} + +#[tokio::test] +async fn test_get_job_status() { + let server = create_test_server().await; + + // First create a job + let job_params = JobParams { + script: "print('Status test');".to_string(), + script_type: ScriptType::OSIS, + caller_id: "test_caller".to_string(), + context_id: "test_context".to_string(), + timeout: Some(60), + prerequisites: None, + }; + + let job_id = server.create_job(job_params).await.unwrap(); + + // Get job status + let result = server.get_job_status(job_id).await; + assert!(result.is_ok()); + + let status = result.unwrap(); + // Status should be one of the valid JobStatus variants + match status { + JobStatus::Dispatched | JobStatus::WaitingForPrerequisites | + JobStatus::Started | JobStatus::Error | JobStatus::Finished => { + // Valid status + } + } +} + +#[tokio::test] +async fn test_get_job_output() { + let server = create_test_server().await; + + // First create a job + let job_params = JobParams { + script: "print('Output test');".to_string(), + script_type: ScriptType::OSIS, + caller_id: "test_caller".to_string(), + context_id: "test_context".to_string(), + timeout: Some(60), + prerequisites: None, + }; + + let job_id = server.create_job(job_params).await.unwrap(); + + // Get job output + let result = server.get_job_output(job_id).await; + assert!(result.is_ok()); + + let output = result.unwrap(); + assert!(!output.is_empty()); +} + +#[tokio::test] +async fn test_list_jobs() { + let server = create_test_server().await; + + // Create a few jobs first + for i in 0..3 { + let job_params = JobParams { + script: format!("print('Job {}');", i), + script_type: ScriptType::OSIS, + caller_id: "test_caller".to_string(), + context_id: "test_context".to_string(), + timeout: Some(60), + prerequisites: None, + }; + + let _ = server.create_job(job_params).await.unwrap(); + } + + // List all jobs + let result = server.list_jobs().await; + assert!(result.is_ok()); + + let jobs = result.unwrap(); + assert!(jobs.len() >= 3); // Should have at least the 3 jobs we created + + // Verify job structure + for job in jobs { + assert!(!job.id.is_empty()); + assert!(uuid::Uuid::parse_str(&job.id).is_ok()); + } +} + +#[tokio::test] +async fn test_stop_job() { + let server = create_test_server().await; + + // First create and start a job + let job_params = JobParams { + script: "print('Stop test');".to_string(), + script_type: ScriptType::OSIS, + caller_id: "test_caller".to_string(), + context_id: "test_context".to_string(), + timeout: Some(60), + prerequisites: None, + }; + + let job_id = server.create_job(job_params).await.unwrap(); + let _ = server.start_job(job_id.clone()).await.unwrap(); + + // Stop the job + let result = server.stop_job(job_id).await; + assert!(result.is_ok()); +} + +#[tokio::test] +async fn test_delete_job() { + let server = create_test_server().await; + + // First create a job + let job_params = JobParams { + script: "print('Delete test');".to_string(), + script_type: ScriptType::OSIS, + caller_id: "test_caller".to_string(), + context_id: "test_context".to_string(), + timeout: Some(60), + prerequisites: None, + }; + + let job_id = server.create_job(job_params).await.unwrap(); + + // Delete the job + let result = server.delete_job(job_id).await; + assert!(result.is_ok()); +} + +#[tokio::test] +async fn test_clear_all_jobs() { + let server = create_test_server().await; + + // Create a few jobs first + for i in 0..3 { + let job_params = JobParams { + script: format!("print('Clear test {}');", i), + script_type: ScriptType::OSIS, + caller_id: "test_caller".to_string(), + context_id: "test_context".to_string(), + timeout: Some(60), + prerequisites: None, + }; + + let _ = server.create_job(job_params).await.unwrap(); + } + + // Clear all jobs + let result = server.clear_all_jobs().await; + assert!(result.is_ok()); + + // Verify jobs are cleared + let jobs = server.list_jobs().await.unwrap(); + assert_eq!(jobs.len(), 0); +} + +#[tokio::test] +async fn test_run_job() { + let server = create_test_server().await; + + let script = "print('Run job test');".to_string(); + let script_type = ScriptType::OSIS; + let prerequisites = None; + + let result = server.run_job(script, script_type, prerequisites).await; + assert!(result.is_ok()); + + let output = result.unwrap(); + assert!(!output.is_empty()); + assert!(output.contains("Run job test")); +} + + + +#[tokio::test] +async fn test_play_script() { + let server = create_test_server().await; + + let script = "print('Play script test');".to_string(); + + let result = server.play(script.clone()).await; + assert!(result.is_ok()); + + let play_result = result.unwrap(); + assert!(!play_result.output.is_empty()); + assert!(play_result.output.contains(&script)); +} + +#[tokio::test] +async fn test_get_job_logs() { + let server = create_test_server().await; + + // First create a job + let job_params = JobParams { + script: "print('Logs test');".to_string(), + script_type: ScriptType::OSIS, + caller_id: "test_caller".to_string(), + context_id: "test_context".to_string(), + timeout: Some(60), + prerequisites: None, + }; + + let job_id = server.create_job(job_params).await.unwrap(); + + // Get job logs + let result = server.get_job_logs(job_id).await; + assert!(result.is_ok()); + + let logs_result = result.unwrap(); + assert!(!logs_result.logs.is_empty()); +} + +#[tokio::test] +async fn test_job_builder_integration() { + // Test that JobBuilder is working correctly with all the fields + let job_params = JobParams { + script: "print('JobBuilder test');".to_string(), + script_type: ScriptType::V, + caller_id: "test_caller".to_string(), + context_id: "test_context".to_string(), + timeout: Some(300), + prerequisites: Some(vec!["prereq1".to_string(), "prereq2".to_string()]), + }; + + // Build job using JobBuilder (similar to what the server does) + let mut builder = JobBuilder::new() + .caller_id(&job_params.caller_id) + .context_id(&job_params.context_id) + .script(&job_params.script) + .script_type(job_params.script_type); + + if let Some(timeout_secs) = job_params.timeout { + builder = builder.timeout(Duration::from_secs(timeout_secs)); + } + + if let Some(prerequisites) = job_params.prerequisites { + builder = builder.prerequisites(prerequisites); + } + + let job = builder.build(); + assert!(job.is_ok()); + + let job = job.unwrap(); + assert_eq!(job.caller_id, "test_caller"); + assert_eq!(job.context_id, "test_context"); + assert_eq!(job.script, "print('JobBuilder test');"); + assert_eq!(job.script_type, ScriptType::V); + assert_eq!(job.timeout, Duration::from_secs(300)); + assert_eq!(job.prerequisites, vec!["prereq1".to_string(), "prereq2".to_string()]); +} + +#[tokio::test] +async fn test_error_handling() { + let server = create_test_server().await; + + // Test getting status for non-existent job + let result = server.get_job_status("non_existent_job".to_string()).await; + // Should return an error or handle gracefully + match result { + Ok(_) => { + // Some implementations might return a default status + }, + Err(error_code) => { + assert_eq!(error_code, ErrorCode::InvalidParams); + } + } + + // Test getting output for non-existent job + let result = server.get_job_output("non_existent_job".to_string()).await; + match result { + Ok(output) => { + // Should return "No output available" or similar + assert!(output.contains("No output available") || output.is_empty()); + }, + Err(error_code) => { + assert_eq!(error_code, ErrorCode::InvalidParams); + } + } +}