diff --git a/Cargo.lock b/Cargo.lock index 2e6141a..73a98c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1428,6 +1428,7 @@ version = "0.1.0" name = "hero-websocket-examples" version = "0.1.0" dependencies = [ + "env_logger", "hero_websocket_client", "hero_websocket_server", "hex", @@ -1497,6 +1498,7 @@ dependencies = [ "gloo-console", "gloo-net", "gloo-timers", + "hero_job", "hex", "http 0.2.12", "js-sys", @@ -1530,6 +1532,8 @@ dependencies = [ "dotenv", "env_logger", "futures-util", + "hero_dispatcher", + "hero_job", "heromodels", "hex", "hmac", @@ -1539,7 +1543,6 @@ dependencies = [ "rand 0.8.5", "redis 0.23.3", "redis 0.25.4", - "rhai_dispatcher", "rustls", "rustls-pemfile 2.2.0", "secp256k1", diff --git a/core/dispatcher/src/error.rs b/core/dispatcher/src/error.rs index 05b8243..ccc0465 100644 --- a/core/dispatcher/src/error.rs +++ b/core/dispatcher/src/error.rs @@ -19,6 +19,8 @@ pub enum DispatcherError { ContextIdMissing, /// Invalid input provided InvalidInput(String), + /// Job operation error + JobError(hero_job::JobError), } impl From for DispatcherError { @@ -33,6 +35,12 @@ impl From for DispatcherError { } } +impl From for DispatcherError { + fn from(err: hero_job::JobError) -> Self { + DispatcherError::JobError(err) + } +} + impl std::fmt::Display for DispatcherError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -50,6 +58,9 @@ impl std::fmt::Display for DispatcherError { DispatcherError::InvalidInput(msg) => { write!(f, "Invalid input: {}", msg) } + DispatcherError::JobError(e) => { + write!(f, "Job error: {}", e) + } } } } diff --git a/core/dispatcher/src/lib.rs b/core/dispatcher/src/lib.rs index 027fae8..839f4c6 100644 --- a/core/dispatcher/src/lib.rs +++ b/core/dispatcher/src/lib.rs @@ -11,6 +11,7 @@ pub use crate::job::JobBuilder; // Re-export types from hero_job for public API pub use hero_job::{Job, JobStatus, ScriptType}; +#[derive(Clone)] pub struct Dispatcher { redis_client: redis::Client, caller_id: String, @@ -218,6 +219,23 @@ impl Dispatcher { Ok(()) } + // Method to start a previously created job + pub async fn start_job( + &self, + job_id: &str, + ) -> Result<(), DispatcherError> { + let mut conn = self.redis_client.get_multiplexed_async_connection().await?; + + // Load the job to get its script type + let job = Job::load_from_redis(&mut conn, job_id).await?; + + // Select worker based on script type + let worker_id = self.select_worker_for_script_type(&job.script_type)?; + + self.start_job_using_connection(&mut conn, job_id.to_string(), worker_id).await?; + Ok(()) + } + // New method using dedicated reply queue with automatic worker selection pub async fn run_job_and_await_result( &self, diff --git a/interfaces/openrpc.json b/interfaces/openrpc.json index 0565650..a8d5a56 100644 --- a/interfaces/openrpc.json +++ b/interfaces/openrpc.json @@ -2,13 +2,76 @@ "openrpc": "1.2.6", "info": { "title": "Circle WebSocket Server API", - "version": "0.1.0", - "description": "API for interacting with a Circle's WebSocket server, primarily for Rhai script execution." + "version": "0.2.0", + "description": "API for interacting with a Circle's WebSocket server, supporting script execution and comprehensive job management." }, "methods": [ + { + "name": "fetch_nonce", + "summary": "Fetches a nonce for authentication purposes.", + "params": [ + { + "name": "pubkey", + "description": "The public key to fetch a nonce for.", + "required": true, + "schema": { + "type": "string" + } + } + ], + "result": { + "name": "nonceResult", + "description": "The nonce string for authentication.", + "schema": { + "type": "string" + } + } + }, + { + "name": "authenticate", + "summary": "Authenticates a user with public key and signature.", + "params": [ + { + "name": "pubkey", + "description": "The public key of the user.", + "required": true, + "schema": { + "type": "string" + } + }, + { + "name": "signature", + "description": "The signature for authentication.", + "required": true, + "schema": { + "type": "string" + } + } + ], + "result": { + "name": "authResult", + "description": "Authentication result.", + "schema": { + "type": "boolean" + } + } + }, + { + "name": "whoami", + "summary": "Gets authentication status and user information.", + "params": [], + "result": { + "name": "whoamiResult", + "description": "User authentication information.", + "schema": { + "type": "object", + "additionalProperties": true + } + } + }, { "name": "play", - "summary": "Executes a Rhai script on the server.", + "summary": "Executes a Rhai script on the server and returns the result immediately.", "params": [ { "name": "script", @@ -43,6 +106,227 @@ } } ] + }, + { + "name": "create_job", + "summary": "Creates a new job without starting it.", + "params": [ + { + "name": "job", + "description": "The job to create.", + "required": true, + "schema": { + "$ref": "#/components/schemas/Job" + } + } + ], + "result": { + "name": "createJobResult", + "description": "The ID of the created job.", + "schema": { + "type": "string" + } + } + }, + { + "name": "start_job", + "summary": "Starts a previously created job.", + "params": [ + { + "name": "job_id", + "description": "The ID of the job to start.", + "required": true, + "schema": { + "type": "string" + } + } + ], + "result": { + "name": "startJobResult", + "description": "Confirmation that the job was started.", + "schema": { + "type": "object", + "properties": { + "success": { + "type": "boolean", + "description": "Whether the job was successfully started." + } + }, + "required": ["success"] + } + } + }, + { + "name": "run_job", + "summary": "Creates and runs a job, returning the result when complete.", + "params": [ + { + "name": "script", + "description": "The script content to execute.", + "required": true, + "schema": { + "type": "string" + } + }, + { + "name": "script_type", + "description": "The type of script (HeroScript, RhaiSAL, or RhaiDSL).", + "required": true, + "schema": { + "$ref": "#/components/schemas/ScriptType" + } + }, + { + "name": "prerequisites", + "description": "List of job IDs that must complete before this job can run.", + "required": false, + "schema": { + "type": "array", + "items": { + "type": "string" + } + } + } + ], + "result": { + "name": "runJobResult", + "description": "The job execution result.", + "schema": { + "type": "string" + } + } + }, + { + "name": "get_job_status", + "summary": "Gets the current status of a job.", + "params": [ + { + "name": "job_id", + "description": "The ID of the job to check.", + "required": true, + "schema": { + "type": "string" + } + } + ], + "result": { + "name": "jobStatus", + "description": "The current job status.", + "schema": { + "$ref": "#/components/schemas/JobStatus" + } + } + }, + { + "name": "get_job_output", + "summary": "Gets the output of a completed job.", + "params": [ + { + "name": "job_id", + "description": "The ID of the job to get output for.", + "required": true, + "schema": { + "type": "string" + } + } + ], + "result": { + "name": "jobOutput", + "description": "The job output, if available.", + "schema": { + "type": "string" + } + } + }, + { + "name": "get_job_logs", + "summary": "Gets the logs of a job.", + "params": [ + { + "name": "job_id", + "description": "The ID of the job to get logs for.", + "required": true, + "schema": { + "type": "string" + } + } + ], + "result": { + "name": "jobLogs", + "description": "The job logs, if available.", + "schema": { + "$ref": "#/components/schemas/JobLogsResult" + } + } + }, + { + "name": "list_jobs", + "summary": "Lists all job IDs in the system.", + "params": [], + "result": { + "name": "jobList", + "description": "List of all jobs.", + "schema": { + "type": "array", + "items": { + "$ref": "#/components/schemas/Job" + } + } + } + }, + { + "name": "stop_job", + "summary": "Stops a running job.", + "params": [ + { + "name": "job_id", + "description": "The ID of the job to stop.", + "required": true, + "schema": { + "type": "string" + } + } + ], + "result": { + "name": "stopJobResult", + "description": "Confirmation that the stop request was sent.", + "schema": { + "type": "null" + } + } + }, + { + "name": "delete_job", + "summary": "Deletes a job from the system.", + "params": [ + { + "name": "job_id", + "description": "The ID of the job to delete.", + "required": true, + "schema": { + "type": "string" + } + } + ], + "result": { + "name": "deleteJobResult", + "description": "Confirmation that the job was deleted.", + "schema": { + "type": "null" + } + } + }, + { + "name": "clear_all_jobs", + "summary": "Clears all jobs from the system.", + "params": [], + "result": { + "name": "clearJobsResult", + "description": "Information about the cleared jobs.", + "schema": { + "type": "null" + } + } } ], "components": { @@ -56,6 +340,98 @@ } }, "required": ["output"] + }, + "ScriptType": { + "type": "string", + "enum": ["HeroScript", "RhaiSAL", "RhaiDSL"], + "description": "The type of script to execute." + }, + "JobStatus": { + "type": "string", + "enum": ["Dispatched", "WaitingForPrerequisites", "Started", "Error", "Finished"], + "description": "The current status of a job." + }, + "Job": { + "type": "object", + "properties": { + "id": { + "type": "string", + "description": "Unique identifier for the job." + }, + "caller_id": { + "type": "string", + "description": "ID of the caller who created this job." + }, + "context_id": { + "type": "string", + "description": "Context ID for the job execution." + }, + "script": { + "type": "string", + "description": "The script content to execute." + }, + "script_type": { + "$ref": "#/components/schemas/ScriptType" + }, + "timeout": { + "type": "integer", + "description": "Timeout in seconds for script execution." + }, + "retries": { + "type": "integer", + "description": "Number of retries on script execution failure." + }, + "concurrent": { + "type": "boolean", + "description": "Whether to execute script in separate thread." + }, + "log_path": { + "type": "string", + "description": "Path to write logs of script execution to." + }, + "env_vars": { + "type": "object", + "additionalProperties": { + "type": "string" + }, + "description": "Environment variables for script execution." + }, + "prerequisites": { + "type": "array", + "items": { + "type": "string" + }, + "description": "Job IDs that must complete before this job can run." + }, + "dependents": { + "type": "array", + "items": { + "type": "string" + }, + "description": "Job IDs that depend on this job completing." + }, + "created_at": { + "type": "string", + "format": "date-time", + "description": "ISO 8601 timestamp when the job was created." + }, + "updated_at": { + "type": "string", + "format": "date-time", + "description": "ISO 8601 timestamp when the job was last updated." + } + }, + "required": ["id", "caller_id", "context_id", "script", "script_type", "timeout", "retries", "concurrent", "env_vars", "prerequisites", "dependents", "created_at", "updated_at"] + }, + "JobLogsResult": { + "type": "object", + "properties": { + "logs": { + "type": ["string", "null"], + "description": "The job logs, null if not available." + } + }, + "required": ["logs"] } } } diff --git a/interfaces/websocket/client/Cargo.toml b/interfaces/websocket/client/Cargo.toml index 7959f97..a9d5d5b 100644 --- a/interfaces/websocket/client/Cargo.toml +++ b/interfaces/websocket/client/Cargo.toml @@ -17,6 +17,7 @@ futures-util = { workspace = true, features = ["sink"] } thiserror = { workspace = true } url = { workspace = true } http = "0.2" +hero_job = { path = "../../../core/job" } # Authentication dependencies hex = { workspace = true } diff --git a/interfaces/websocket/client/cmd/main.rs b/interfaces/websocket/client/cmd/main.rs index 8b70e9e..c857ce7 100644 --- a/interfaces/websocket/client/cmd/main.rs +++ b/interfaces/websocket/client/cmd/main.rs @@ -148,7 +148,7 @@ async fn run_interactive_mode(client: hero_websocket_client::CircleWsClient) -> // Execute the script match client.play(input).await { Ok(result) => { - console::log_1(&format!("๐Ÿ“ค Result: {}", result.output).into()); + console::log_1(&format!("๐Ÿ“ค Result: {}", result).into()); } Err(e) => { console::log_1(&format!("โŒ Script execution failed: {}", e).into()); @@ -164,7 +164,7 @@ async fn execute_script(client: hero_websocket_client::CircleWsClient, script: S match client.play(script).await { Ok(result) => { - console::log_1(&result.output.into()); + console::log_1(&result.into()); Ok(()) } Err(e) => { @@ -209,7 +209,7 @@ async fn execute_script(client: hero_websocket_client::CircleWsClient, script: S match client.play(script).await { Ok(result) => { - println!("{}", result.output); + println!("{}", result); Ok(()) } Err(e) => { @@ -244,7 +244,7 @@ async fn run_interactive_mode(client: hero_websocket_client::CircleWsClient) -> match client.play(input).await { Ok(result) => { - println!("\n๐Ÿ“ค Result: {}", result.output); + println!("\n๐Ÿ“ค Result: {}", result); } Err(e) => { error!("โŒ Script execution failed: {}", e); diff --git a/interfaces/websocket/client/src/builder.rs b/interfaces/websocket/client/src/builder.rs new file mode 100644 index 0000000..4c1aac7 --- /dev/null +++ b/interfaces/websocket/client/src/builder.rs @@ -0,0 +1,34 @@ +use std::sync::{Arc, Mutex}; +use crate::CircleWsClient; + +// Platform-specific imports removed - not needed in builder.rs + +pub struct CircleWsClientBuilder { + ws_url: String, + private_key: Option, +} + +impl CircleWsClientBuilder { + pub fn new(ws_url: String) -> Self { + Self { + ws_url, + private_key: None, + } + } + + pub fn with_keypair(mut self, private_key: String) -> Self { + self.private_key = Some(private_key); + self + } + + pub fn build(self) -> CircleWsClient { + CircleWsClient { + ws_url: self.ws_url, + internal_tx: None, + #[cfg(not(target_arch = "wasm32"))] + task_handle: None, + private_key: self.private_key, + is_connected: Arc::new(Mutex::new(false)), + } + } +} diff --git a/interfaces/websocket/client/src/error.rs b/interfaces/websocket/client/src/error.rs new file mode 100644 index 0000000..af206ad --- /dev/null +++ b/interfaces/websocket/client/src/error.rs @@ -0,0 +1,40 @@ +use serde::Deserialize; +use serde_json::Value; +use thiserror::Error; + +#[derive(Deserialize, Debug, Clone)] +pub struct JsonRpcErrorClient { + pub code: i32, + pub message: String, + pub data: Option, +} + +#[derive(Error, Debug)] +pub enum CircleWsClientError { + #[error("WebSocket connection error: {0}")] + ConnectionError(String), + #[error("WebSocket send error: {0}")] + SendError(String), + #[error("WebSocket receive error: {0}")] + ReceiveError(String), + #[error("JSON serialization/deserialization error: {0}")] + JsonError(#[from] serde_json::Error), + #[error("Request timed out for request ID: {0}")] + Timeout(String), + #[error("JSON-RPC error response: {code} - {message}")] + JsonRpcError { + code: i32, + message: String, + data: Option, + }, + #[error("No response received for request ID: {0}")] + NoResponse(String), + #[error("Client is not connected")] + NotConnected, + #[error("Internal channel error: {0}")] + ChannelError(String), + #[error("Authentication error: {0}")] + Auth(#[from] crate::auth::AuthError), + #[error("Authentication requires a keypair, but none was provided.")] + AuthNoKeyPair, +} diff --git a/interfaces/websocket/client/src/lib.rs b/interfaces/websocket/client/src/lib.rs index 8de4412..0a82655 100644 --- a/interfaces/websocket/client/src/lib.rs +++ b/interfaces/websocket/client/src/lib.rs @@ -5,12 +5,15 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; use std::sync::{Arc, Mutex}; -use thiserror::Error; use uuid::Uuid; // Authentication module pub mod auth; +pub mod builder; +pub mod methods; +pub mod error; +pub use error::{JsonRpcErrorClient, CircleWsClientError}; pub use auth::{AuthCredentials, AuthError, AuthResult}; // Platform-specific WebSocket imports and spawn function @@ -51,69 +54,6 @@ pub struct JsonRpcResponseClient { pub id: String, } -#[derive(Deserialize, Debug, Clone)] -pub struct JsonRpcErrorClient { - pub code: i32, - pub message: String, - pub data: Option, -} - -#[derive(Serialize, Debug, Clone)] -pub struct PlayParamsClient { - pub script: String, -} - -#[derive(Deserialize, Debug, Clone)] -pub struct PlayResultClient { - pub output: String, -} - -#[derive(Serialize, Debug, Clone)] -pub struct AuthCredentialsParams { - pub pubkey: String, - pub signature: String, -} - -#[derive(Serialize, Debug, Clone)] -pub struct FetchNonceParams { - pub pubkey: String, -} - -#[derive(Deserialize, Debug, Clone)] -pub struct FetchNonceResponse { - pub nonce: String, -} - -#[derive(Error, Debug)] -pub enum CircleWsClientError { - #[error("WebSocket connection error: {0}")] - ConnectionError(String), - #[error("WebSocket send error: {0}")] - SendError(String), - #[error("WebSocket receive error: {0}")] - ReceiveError(String), - #[error("JSON serialization/deserialization error: {0}")] - JsonError(#[from] serde_json::Error), - #[error("Request timed out for request ID: {0}")] - Timeout(String), - #[error("JSON-RPC error response: {code} - {message}")] - JsonRpcError { - code: i32, - message: String, - data: Option, - }, - #[error("No response received for request ID: {0}")] - NoResponse(String), - #[error("Client is not connected")] - NotConnected, - #[error("Internal channel error: {0}")] - ChannelError(String), - #[error("Authentication error: {0}")] - Auth(#[from] auth::AuthError), - #[error("Authentication requires a keypair, but none was provided.")] - AuthNoKeyPair, -} - // Wrapper for messages sent to the WebSocket task enum InternalWsMessage { SendJsonRpc( @@ -183,116 +123,6 @@ impl CircleWsClient { } impl CircleWsClient { - pub async fn authenticate(&mut self) -> Result { - info!("๐Ÿ” [{}] Starting authentication process...", self.ws_url); - - let private_key = self - .private_key - .as_ref() - .ok_or(CircleWsClientError::AuthNoKeyPair)?; - - info!("๐Ÿ”‘ [{}] Deriving public key from private key...", self.ws_url); - let public_key = auth::derive_public_key(private_key)?; - info!("โœ… [{}] Public key derived: {}...", self.ws_url, &public_key[..8]); - - info!("๐ŸŽซ [{}] Fetching authentication nonce...", self.ws_url); - let nonce = self.fetch_nonce(&public_key).await?; - info!("โœ… [{}] Nonce received: {}...", self.ws_url, &nonce[..8]); - - info!("โœ๏ธ [{}] Signing nonce with private key...", self.ws_url); - let signature = auth::sign_message(private_key, &nonce)?; - info!("โœ… [{}] Signature created: {}...", self.ws_url, &signature[..8]); - - info!("๐Ÿ”’ [{}] Submitting authentication credentials...", self.ws_url); - let result = self.authenticate_with_signature(&public_key, &signature).await?; - - if result { - info!("๐ŸŽ‰ [{}] Authentication successful!", self.ws_url); - } else { - error!("โŒ [{}] Authentication failed - server rejected credentials", self.ws_url); - } - - Ok(result) - } - - async fn fetch_nonce(&self, pubkey: &str) -> Result { - info!("๐Ÿ“ก [{}] Sending fetch_nonce request for pubkey: {}...", self.ws_url, &pubkey[..8]); - - let params = FetchNonceParams { - pubkey: pubkey.to_string(), - }; - let req = self.create_request("fetch_nonce", params)?; - let res = self.send_request(req).await?; - - if let Some(err) = res.error { - error!("โŒ [{}] fetch_nonce failed: {} (code: {})", self.ws_url, err.message, err.code); - return Err(CircleWsClientError::JsonRpcError { - code: err.code, - message: err.message, - data: err.data, - }); - } - - let nonce_res: FetchNonceResponse = serde_json::from_value(res.result.unwrap_or_default())?; - info!("โœ… [{}] fetch_nonce successful, nonce length: {}", self.ws_url, nonce_res.nonce.len()); - Ok(nonce_res.nonce) - } - - async fn authenticate_with_signature( - &self, - pubkey: &str, - signature: &str, - ) -> Result { - info!("๐Ÿ“ก [{}] Sending authenticate request with signature...", self.ws_url); - - let params = AuthCredentialsParams { - pubkey: pubkey.to_string(), - signature: signature.to_string(), - }; - let req = self.create_request("authenticate", params)?; - let res = self.send_request(req).await?; - - if let Some(err) = res.error { - error!("โŒ [{}] authenticate failed: {} (code: {})", self.ws_url, err.message, err.code); - return Err(CircleWsClientError::JsonRpcError { - code: err.code, - message: err.message, - data: err.data, - }); - } - - let authenticated = res - .result - .and_then(|v| v.get("authenticated").and_then(|v| v.as_bool())) - .unwrap_or(false); - - if authenticated { - info!("โœ… [{}] authenticate request successful - server confirmed authentication", self.ws_url); - } else { - error!("โŒ [{}] authenticate request failed - server returned false", self.ws_url); - } - - Ok(authenticated) - } - - /// Call the whoami method to get authentication status and user information - pub async fn whoami(&self) -> Result { - let req = self.create_request("whoami", serde_json::json!({}))?; - let response = self.send_request(req).await?; - - if let Some(result) = response.result { - Ok(result) - } else if let Some(error) = response.error { - Err(CircleWsClientError::JsonRpcError { - code: error.code, - message: error.message, - data: error.data, - }) - } else { - Err(CircleWsClientError::NoResponse("whoami".to_string())) - } - } - fn create_request( &self, method: &str, @@ -676,7 +506,7 @@ impl CircleWsClient { ws_conn: tokio_tungstenite::WebSocketStream>, mut internal_rx: mpsc::Receiver, pending_requests: &Arc>>>>, - log_url: &str, + _log_url: &str, _is_connected: &Arc>, ) -> String { let (mut ws_tx, mut ws_rx) = ws_conn.split(); @@ -811,141 +641,6 @@ impl CircleWsClient { } } - pub fn play( - &self, - script: String, - ) -> impl std::future::Future> + Send + 'static - { - let req_id_outer = Uuid::new_v4().to_string(); - - // Clone the sender option. The sender itself (mpsc::Sender) is also Clone. - let internal_tx_clone_opt = self.internal_tx.clone(); - - async move { - let req_id = req_id_outer; // Move req_id into the async block - let params = PlayParamsClient { script }; // script is moved in - - let request = match serde_json::to_value(params) { - Ok(p_val) => JsonRpcRequestClient { - jsonrpc: "2.0".to_string(), - method: "play".to_string(), - params: p_val, - id: req_id.clone(), - }, - Err(e) => return Err(CircleWsClientError::JsonError(e)), - }; - - let (response_tx, response_rx) = oneshot::channel(); - - if let Some(mut internal_tx) = internal_tx_clone_opt { - internal_tx - .send(InternalWsMessage::SendJsonRpc(request, response_tx)) - .await - .map_err(|e| { - CircleWsClientError::ChannelError(format!( - "Failed to send request to internal task: {}", - e - )) - })?; - } else { - return Err(CircleWsClientError::NotConnected); - } - - // Add a timeout for waiting for the response - // For simplicity, using a fixed timeout here. Could be configurable. - #[cfg(target_arch = "wasm32")] - { - match response_rx.await { - Ok(Ok(rpc_response)) => { - if let Some(json_rpc_error) = rpc_response.error { - Err(CircleWsClientError::JsonRpcError { - code: json_rpc_error.code, - message: json_rpc_error.message, - data: json_rpc_error.data, - }) - } else if let Some(result_value) = rpc_response.result { - serde_json::from_value(result_value) - .map_err(CircleWsClientError::JsonError) - } else { - Err(CircleWsClientError::NoResponse(req_id.clone())) - } - } - Ok(Err(e)) => Err(e), // Error propagated from the ws task - Err(_) => Err(CircleWsClientError::Timeout(req_id.clone())), // oneshot channel cancelled - } - } - #[cfg(not(target_arch = "wasm32"))] - { - use tokio::time::timeout as tokio_timeout; - match tokio_timeout(std::time::Duration::from_secs(10), response_rx).await { - Ok(Ok(Ok(rpc_response))) => { - // Timeout -> Result - if let Some(json_rpc_error) = rpc_response.error { - Err(CircleWsClientError::JsonRpcError { - code: json_rpc_error.code, - message: json_rpc_error.message, - data: json_rpc_error.data, - }) - } else if let Some(result_value) = rpc_response.result { - serde_json::from_value(result_value) - .map_err(CircleWsClientError::JsonError) - } else { - Err(CircleWsClientError::NoResponse(req_id.clone())) - } - } - Ok(Ok(Err(e))) => Err(e), // Error propagated from the ws task - Ok(Err(_)) => Err(CircleWsClientError::ChannelError( - "Response channel cancelled".to_string(), - )), // oneshot cancelled - Err(_) => Err(CircleWsClientError::Timeout(req_id.clone())), // tokio_timeout expired - } - } - } - } - - /// Send a plaintext ping message and wait for pong response - pub async fn ping(&mut self) -> Result { - if let Some(mut tx) = self.internal_tx.clone() { - let (response_tx, response_rx) = oneshot::channel(); - - // Send plaintext ping message - tx.send(InternalWsMessage::SendPlaintext("ping".to_string(), response_tx)) - .await - .map_err(|e| { - CircleWsClientError::ChannelError(format!( - "Failed to send ping request to internal task: {}", - e - )) - })?; - - // Wait for pong response with timeout - #[cfg(target_arch = "wasm32")] - { - match response_rx.await { - Ok(Ok(response)) => Ok(response), - Ok(Err(e)) => Err(e), - Err(_) => Err(CircleWsClientError::ChannelError( - "Ping response channel cancelled".to_string(), - )), - } - } - #[cfg(not(target_arch = "wasm32"))] - { - use tokio::time::timeout as tokio_timeout; - match tokio_timeout(std::time::Duration::from_secs(10), response_rx).await { - Ok(Ok(Ok(response))) => Ok(response), - Ok(Ok(Err(e))) => Err(e), - Ok(Err(_)) => Err(CircleWsClientError::ChannelError( - "Ping response channel cancelled".to_string(), - )), - Err(_) => Err(CircleWsClientError::Timeout("ping".to_string())), - } - } - } else { - Err(CircleWsClientError::NotConnected) - } - } - pub async fn disconnect(&mut self) { if let Some(mut tx) = self.internal_tx.take() { info!("Sending close signal to internal WebSocket task."); diff --git a/interfaces/websocket/client/src/methods.rs b/interfaces/websocket/client/src/methods.rs new file mode 100644 index 0000000..ff67a5d --- /dev/null +++ b/interfaces/websocket/client/src/methods.rs @@ -0,0 +1,500 @@ +use futures_channel::oneshot; +use futures_util::SinkExt; +use log::{error, info}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use uuid::Uuid; +use hero_job::{Job, JobStatus}; + +use crate::{CircleWsClient, CircleWsClientError, InternalWsMessage, JsonRpcRequestClient, auth}; + +// Platform-specific WebSocket imports removed - not needed in methods.rs + +// JSON-RPC structures are imported from crate root + +#[derive(Serialize, Debug, Clone)] +pub struct PlayParamsClient { + pub script: String, +} + +#[derive(Deserialize, Debug, Clone)] +pub struct PlayResultClient { + pub output: String, +} + +#[derive(Serialize, Debug, Clone)] +pub struct AuthCredentialsParams { + pub pubkey: String, + pub signature: String, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct JobLogsResult { + pub logs: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct ClearJobsResult { + pub deleted_count: usize, +} + +impl CircleWsClient { + pub async fn authenticate(&mut self) -> Result { + info!("๐Ÿ” [{}] Starting authentication process...", self.ws_url); + + let private_key = self + .private_key + .as_ref() + .ok_or(CircleWsClientError::AuthNoKeyPair)?; + + info!("๐Ÿ”‘ [{}] Deriving public key from private key...", self.ws_url); + let public_key = auth::derive_public_key(private_key)?; + info!("โœ… [{}] Public key derived: {}...", self.ws_url, &public_key[..8]); + + info!("๐ŸŽซ [{}] Fetching authentication nonce...", self.ws_url); + let nonce = self.fetch_nonce(&public_key).await?; + info!("โœ… [{}] Nonce received: {}...", self.ws_url, &nonce[..8]); + + info!("โœ๏ธ [{}] Signing nonce with private key...", self.ws_url); + let signature = auth::sign_message(private_key, &nonce)?; + info!("โœ… [{}] Signature created: {}...", self.ws_url, &signature[..8]); + + info!("๐Ÿ”’ [{}] Submitting authentication credentials...", self.ws_url); + let result = self.authenticate_with_signature(&public_key, &signature).await?; + + if result { + info!("๐ŸŽ‰ [{}] Authentication successful!", self.ws_url); + } else { + error!("โŒ [{}] Authentication failed - server rejected credentials", self.ws_url); + } + + Ok(result) + } + + async fn fetch_nonce(&self, pubkey: &str) -> Result { + info!("๐Ÿ“ก [{}] Sending fetch_nonce request for pubkey: {}...", self.ws_url, &pubkey[..8]); + let req = self.create_request("fetch_nonce", pubkey.to_string())?; + let res = self.send_request(req).await?; + + if let Some(err) = res.error { + error!("โŒ [{}] fetch_nonce failed: {} (code: {})", self.ws_url, err.message, err.code); + return Err(CircleWsClientError::JsonRpcError { + code: err.code, + message: err.message, + data: err.data, + }); + } + + let nonce = res.result + .and_then(|v| v.as_str().map(|s| s.to_string())) + .ok_or_else(|| CircleWsClientError::JsonRpcError { + code: -32603, + message: "Invalid nonce response format".to_string(), + data: None, + })?; + + info!("โœ… [{}] fetch_nonce successful, nonce length: {}", self.ws_url, nonce.len()); + Ok(nonce) + } + + async fn authenticate_with_signature( + &self, + pubkey: &str, + signature: &str, + ) -> Result { + info!("๐Ÿ“ก [{}] Sending authenticate request with signature...", self.ws_url); + + let params = AuthCredentialsParams { + pubkey: pubkey.to_string(), + signature: signature.to_string(), + }; + let req = self.create_request("authenticate", params)?; + let res = self.send_request(req).await?; + + if let Some(err) = res.error { + error!("โŒ [{}] authenticate failed: {} (code: {})", self.ws_url, err.message, err.code); + return Err(CircleWsClientError::JsonRpcError { + code: err.code, + message: err.message, + data: err.data, + }); + } + + let authenticated = res + .result + .and_then(|v| v.get("authenticated").and_then(|v| v.as_bool())) + .unwrap_or(false); + + if authenticated { + info!("โœ… [{}] authenticate request successful - server confirmed authentication", self.ws_url); + } else { + error!("โŒ [{}] authenticate request failed - server returned false", self.ws_url); + } + + Ok(authenticated) + } + + /// Call the whoami method to get authentication status and user information + pub async fn whoami(&self) -> Result { + let req = self.create_request("whoami", serde_json::json!({}))?; + let response = self.send_request(req).await?; + + if let Some(result) = response.result { + Ok(result) + } else if let Some(error) = response.error { + Err(CircleWsClientError::JsonRpcError { + code: error.code, + message: error.message, + data: error.data, + }) + } else { + Err(CircleWsClientError::NoResponse("whoami".to_string())) + } + } + + pub fn play( + &self, + script: String, + ) -> impl std::future::Future> + Send + 'static + { + let req_id_outer = Uuid::new_v4().to_string(); + + // Clone the sender option. The sender itself (mpsc::Sender) is also Clone. + let internal_tx_clone_opt = self.internal_tx.clone(); + + async move { + let req_id = req_id_outer; // Move req_id into the async block + let params = PlayParamsClient { script }; // script is moved in + + let request = match serde_json::to_value(params) { + Ok(p_val) => JsonRpcRequestClient { + jsonrpc: "2.0".to_string(), + method: "play".to_string(), + params: p_val, + id: req_id.clone(), + }, + Err(e) => return Err(CircleWsClientError::JsonError(e)), + }; + + let (response_tx, response_rx) = oneshot::channel(); + + if let Some(mut internal_tx) = internal_tx_clone_opt { + internal_tx + .send(InternalWsMessage::SendJsonRpc(request, response_tx)) + .await + .map_err(|e| { + CircleWsClientError::ChannelError(format!( + "Failed to send request to internal task: {}", + e + )) + })?; + } else { + return Err(CircleWsClientError::NotConnected); + } + + // Add a timeout for waiting for the response + // For simplicity, using a fixed timeout here. Could be configurable. + #[cfg(target_arch = "wasm32")] + { + match response_rx.await { + Ok(Ok(rpc_response)) => { + if let Some(json_rpc_error) = rpc_response.error { + Err(CircleWsClientError::JsonRpcError { + code: json_rpc_error.code, + message: json_rpc_error.message, + data: json_rpc_error.data, + }) + } else if let Some(result_value) = rpc_response.result { + // Extract output string directly from the result + if let Some(output) = result_value.get("output").and_then(|v| v.as_str()) { + Ok(output.to_string()) + } else { + Err(CircleWsClientError::JsonRpcError { + code: -32603, + message: "Invalid play response format - missing output field".to_string(), + data: None, + }) + } + } else { + Err(CircleWsClientError::NoResponse(req_id.clone())) + } + } + Ok(Err(e)) => Err(e), // Error propagated from the ws task + Err(_) => Err(CircleWsClientError::Timeout(req_id.clone())), // oneshot channel cancelled + } + } + #[cfg(not(target_arch = "wasm32"))] + { + use tokio::time::timeout as tokio_timeout; + match tokio_timeout(std::time::Duration::from_secs(10), response_rx).await { + Ok(Ok(Ok(rpc_response))) => { + // Timeout -> Result + if let Some(json_rpc_error) = rpc_response.error { + Err(CircleWsClientError::JsonRpcError { + code: json_rpc_error.code, + message: json_rpc_error.message, + data: json_rpc_error.data, + }) + } else if let Some(result_value) = rpc_response.result { + // Extract output string directly from the result + if let Some(output) = result_value.get("output").and_then(|v| v.as_str()) { + Ok(output.to_string()) + } else { + Err(CircleWsClientError::JsonRpcError { + code: -32603, + message: "Invalid play response format - missing output field".to_string(), + data: None, + }) + } + } else { + Err(CircleWsClientError::NoResponse(req_id.clone())) + } + } + Ok(Ok(Err(e))) => Err(e), // Error propagated from the ws task + Ok(Err(_)) => Err(CircleWsClientError::ChannelError( + "Response channel cancelled".to_string(), + )), // oneshot cancelled + Err(_) => Err(CircleWsClientError::Timeout(req_id.clone())), // tokio_timeout expired + } + } + } + } + + /// Send a plaintext ping message and wait for pong response + pub async fn ping(&mut self) -> Result { + if let Some(mut tx) = self.internal_tx.clone() { + let (response_tx, response_rx) = oneshot::channel(); + + // Send plaintext ping message + tx.send(InternalWsMessage::SendPlaintext("ping".to_string(), response_tx)) + .await + .map_err(|e| { + CircleWsClientError::ChannelError(format!( + "Failed to send ping request to internal task: {}", + e + )) + })?; + + // Wait for pong response with timeout + #[cfg(target_arch = "wasm32")] + { + match response_rx.await { + Ok(Ok(response)) => Ok(response), + Ok(Err(e)) => Err(e), + Err(_) => Err(CircleWsClientError::ChannelError( + "Ping response channel cancelled".to_string(), + )), + } + } + #[cfg(not(target_arch = "wasm32"))] + { + use tokio::time::timeout as tokio_timeout; + match tokio_timeout(std::time::Duration::from_secs(10), response_rx).await { + Ok(Ok(Ok(response))) => Ok(response), + Ok(Ok(Err(e))) => Err(e), + Ok(Err(_)) => Err(CircleWsClientError::ChannelError( + "Ping response channel cancelled".to_string(), + )), + Err(_) => Err(CircleWsClientError::Timeout("ping".to_string())), + } + } + } else { + Err(CircleWsClientError::NotConnected) + } + } + + /// Create a new job without starting it + pub async fn create_job( + &self, + job: Job, + ) -> Result { + + let req = self.create_request("create_job", job)?; + let response = self.send_request(req).await?; + + if let Some(result) = response.result { + serde_json::from_value(result).map_err(CircleWsClientError::JsonError) + } else if let Some(error) = response.error { + Err(CircleWsClientError::JsonRpcError { + code: error.code, + message: error.message, + data: error.data, + }) + } else { + Err(CircleWsClientError::ReceiveError( + "No result or error in response".to_string(), + )) + } + } + + /// Create and run a job, returning the result when complete + pub async fn run_job( + &self, + job: &Job, + ) -> Result { + let req = self.create_request("run_job", job)?; + let response = self.send_request(req).await?; + + if let Some(result) = response.result { + serde_json::from_value(result).map_err(CircleWsClientError::JsonError) + } else if let Some(error) = response.error { + Err(CircleWsClientError::JsonRpcError { + code: error.code, + message: error.message, + data: error.data, + }) + } else { + Err(CircleWsClientError::ReceiveError( + "No result or error in response".to_string(), + )) + } + } + + /// Get the status of a job + pub async fn get_job_status(&self, job_id: String) -> Result { + let params = job_id; + + let req = self.create_request("get_job_status", params)?; + let response = self.send_request(req).await?; + + if let Some(result) = response.result { + serde_json::from_value(result).map_err(CircleWsClientError::JsonError) + } else if let Some(error) = response.error { + Err(CircleWsClientError::JsonRpcError { + code: error.code, + message: error.message, + data: error.data, + }) + } else { + Err(CircleWsClientError::ReceiveError( + "No result or error in response".to_string(), + )) + } + } + + /// Get the output of a job + pub async fn get_job_output(&self, job_id: String) -> Result { + let params = job_id; + + let req = self.create_request("get_job_output", params)?; + let response = self.send_request(req).await?; + + if let Some(result) = response.result { + serde_json::from_value(result).map_err(CircleWsClientError::JsonError) + } else if let Some(error) = response.error { + Err(CircleWsClientError::JsonRpcError { + code: error.code, + message: error.message, + data: error.data, + }) + } else { + Err(CircleWsClientError::ReceiveError( + "No result or error in response".to_string(), + )) + } + } + + /// Get the logs of a job + pub async fn get_job_logs(&self, job_id: String) -> Result { + let params = job_id; + + let req = self.create_request("get_job_logs", params)?; + let response = self.send_request(req).await?; + + if let Some(result) = response.result { + serde_json::from_value(result).map_err(CircleWsClientError::JsonError) + } else if let Some(error) = response.error { + Err(CircleWsClientError::JsonRpcError { + code: error.code, + message: error.message, + data: error.data, + }) + } else { + Err(CircleWsClientError::ReceiveError( + "No result or error in response".to_string(), + )) + } + } + + /// List all job IDs in the system + pub async fn list_jobs(&self) -> Result, CircleWsClientError> { + let req = self.create_request("list_jobs", serde_json::Value::Null)?; + let response = self.send_request(req).await?; + + if let Some(result) = response.result { + serde_json::from_value(result).map_err(CircleWsClientError::JsonError) + } else if let Some(error) = response.error { + Err(CircleWsClientError::JsonRpcError { + code: error.code, + message: error.message, + data: error.data, + }) + } else { + Err(CircleWsClientError::ReceiveError( + "No result or error in response".to_string(), + )) + } + } + + /// Stop a running job + pub async fn stop_job(&self, job_id: String) -> Result<(), CircleWsClientError> { + let req = self.create_request("stop_job", job_id)?; + let response = self.send_request(req).await?; + + if let Some(result) = response.result { + serde_json::from_value(result).map_err(CircleWsClientError::JsonError) + } else if let Some(error) = response.error { + Err(CircleWsClientError::JsonRpcError { + code: error.code, + message: error.message, + data: error.data, + }) + } else { + Err(CircleWsClientError::ReceiveError( + "No result or error in response".to_string(), + )) + } + } + + /// Delete a job from the system + pub async fn delete_job(&self, job_id: String) -> Result<(), CircleWsClientError> { + let req = self.create_request("delete_job", job_id)?; + let response = self.send_request(req).await?; + + if let Some(error) = response.error { + Err(CircleWsClientError::JsonRpcError { + code: error.code, + message: error.message, + data: error.data, + }) + } else if response.result.is_some() { + // Success - return void + Ok(()) + } else { + Err(CircleWsClientError::ReceiveError( + "No result or error in response".to_string(), + )) + } + } + + /// Clear all jobs from the system + pub async fn clear_all_jobs(&self) -> Result<(), CircleWsClientError> { + let req = self.create_request("clear_all_jobs", serde_json::Value::Null)?; + let response = self.send_request(req).await?; + + if let Some(error) = response.error { + Err(CircleWsClientError::JsonRpcError { + code: error.code, + message: error.message, + data: error.data, + }) + } else if response.result.is_some() { + // Success - return void + Ok(()) + } else { + Err(CircleWsClientError::ReceiveError( + "No result or error in response".to_string(), + )) + } + } +} \ No newline at end of file diff --git a/interfaces/websocket/examples/Cargo.toml b/interfaces/websocket/examples/Cargo.toml index 9875280..31754df 100644 --- a/interfaces/websocket/examples/Cargo.toml +++ b/interfaces/websocket/examples/Cargo.toml @@ -10,6 +10,7 @@ tokio = { version = "1.0", features = ["full"] } k256 = { version = "0.13", features = ["ecdsa", "sha256"] } rand = "0.8" hex = "0.4" +env_logger = "0.10" [[bin]] name = "ping" @@ -19,6 +20,10 @@ path = "src/ping.rs" name = "auth" path = "src/auth.rs" +[[bin]] +name = "circle_auth" +path = "src/circle_auth.rs" + [[bin]] name = "play" path = "src/play.rs" diff --git a/interfaces/websocket/examples/src/circle_auth.rs b/interfaces/websocket/examples/src/circle_auth.rs new file mode 100644 index 0000000..9c6e685 --- /dev/null +++ b/interfaces/websocket/examples/src/circle_auth.rs @@ -0,0 +1,234 @@ +use hero_websocket_client::CircleWsClientBuilder; +use hero_websocket_server::ServerBuilder; +use tokio::signal; +use tokio::time::{sleep, Duration}; +use k256::ecdsa::SigningKey; +use k256::elliptic_curve::sec1::ToEncodedPoint; +use rand::rngs::OsRng; +use std::collections::HashMap; + +#[tokio::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + + println!("๐Ÿ”— Circle-based Authentication Example"); + println!("====================================="); + + // Generate keypairs for different users + let (alice_pubkey, alice_privkey) = generate_keypair(); + let (bob_pubkey, bob_privkey) = generate_keypair(); + let (charlie_pubkey, charlie_privkey) = generate_keypair(); + let (eve_pubkey, eve_privkey) = generate_keypair(); // Not in any circle + + println!("๐Ÿ‘ฅ Generated test users:"); + println!(" Alice: {}...{}", &alice_pubkey[..10], &alice_pubkey[alice_pubkey.len()-10..]); + println!(" Bob: {}...{}", &bob_pubkey[..10], &bob_pubkey[bob_pubkey.len()-10..]); + println!(" Charlie: {}...{}", &charlie_pubkey[..10], &charlie_pubkey[charlie_pubkey.len()-10..]); + println!(" Eve: {}...{} (unauthorized)", &eve_pubkey[..10], &eve_pubkey[eve_pubkey.len()-10..]); + + // Define circles and their members + let mut circles = HashMap::new(); + + // Circle "alpha" - Alice and Bob are members + circles.insert("alpha".to_string(), vec![ + alice_pubkey.clone(), + bob_pubkey.clone(), + ]); + + // Circle "beta" - Only Charlie is a member + circles.insert("beta".to_string(), vec![ + charlie_pubkey.clone(), + ]); + + // Circle "gamma" - Alice and Charlie are members + circles.insert("gamma".to_string(), vec![ + alice_pubkey.clone(), + charlie_pubkey.clone(), + ]); + + println!("\n๐Ÿ” Circle memberships:"); + for (circle_id, members) in &circles { + println!(" Circle '{}': {} members", circle_id, members.len()); + for member in members { + let name = if member == &alice_pubkey { "Alice" } + else if member == &bob_pubkey { "Bob" } + else if member == &charlie_pubkey { "Charlie" } + else { "Unknown" }; + println!(" - {} ({}...{})", name, &member[..10], &member[member.len()-10..]); + } + } + + // Build server with circle-based authentication + let server = ServerBuilder::new() + .host("127.0.0.1") + .port(8443) + .redis_url("redis://localhost:6379") + .worker_id("circle_test") + .with_auth() + .circles(circles) + .build()?; + + // Start server + println!("\n๐Ÿš€ Starting server with circle-based authentication..."); + let (server_task, server_handle) = server.spawn_circle_server().map_err(|e| { + eprintln!("Failed to start server: {}", e); + e + })?; + + // Setup signal handling for clean shutdown + let server_handle_clone = server_handle.clone(); + tokio::spawn(async move { + signal::ctrl_c().await.expect("Failed to listen for Ctrl+C"); + println!("\n๐Ÿ”Œ Shutting down..."); + server_handle_clone.stop(true).await; + std::process::exit(0); + }); + + // Brief pause for server startup + sleep(Duration::from_millis(500)).await; + + println!("\n๐Ÿงช Testing authentication scenarios:"); + println!("==================================="); + + // Test 1: Alice connecting to circle "alpha" (should succeed) + println!("\n๐Ÿ“‹ Test 1: Alice โ†’ Circle 'alpha' (authorized)"); + test_authentication("Alice", "alpha", &alice_privkey).await?; + + // Test 2: Bob connecting to circle "alpha" (should succeed) + println!("\n๐Ÿ“‹ Test 2: Bob โ†’ Circle 'alpha' (authorized)"); + test_authentication("Bob", "alpha", &bob_privkey).await?; + + // Test 3: Charlie connecting to circle "beta" (should succeed) + println!("\n๐Ÿ“‹ Test 3: Charlie โ†’ Circle 'beta' (authorized)"); + test_authentication("Charlie", "beta", &charlie_privkey).await?; + + // Test 4: Alice connecting to circle "beta" (should fail - not a member) + println!("\n๐Ÿ“‹ Test 4: Alice โ†’ Circle 'beta' (unauthorized - not a member)"); + test_authentication_failure("Alice", "beta", &alice_privkey).await; + + // Test 5: Eve connecting to circle "alpha" (should fail - not a member) + println!("\n๐Ÿ“‹ Test 5: Eve โ†’ Circle 'alpha' (unauthorized - not a member)"); + test_authentication_failure("Eve", "alpha", &eve_privkey).await; + + // Test 6: Charlie connecting to circle "gamma" (should succeed) + println!("\n๐Ÿ“‹ Test 6: Charlie โ†’ Circle 'gamma' (authorized)"); + test_authentication("Charlie", "gamma", &charlie_privkey).await?; + + // Test 7: Bob connecting to non-existent circle (should fail) + println!("\n๐Ÿ“‹ Test 7: Bob โ†’ Circle 'nonexistent' (unauthorized - circle doesn't exist)"); + test_authentication_failure("Bob", "nonexistent", &bob_privkey).await; + + println!("\nโœ… All tests completed!"); + println!("\n๐Ÿ’ก Summary:"); + println!(" - Users can only authenticate to circles they are members of"); + println!(" - Circle membership is checked after signature verification"); + println!(" - Unauthorized access attempts are properly rejected"); + println!(" - Each circle operates independently with its own member list"); + + // Clean shutdown + server_handle.stop(true).await; + println!("\n๐Ÿ”Œ Server stopped. Example complete!"); + + Ok(()) +} + +fn generate_keypair() -> (String, String) { + let signing_key = SigningKey::random(&mut OsRng); + let verifying_key = signing_key.verifying_key(); + let public_key_bytes = verifying_key.to_encoded_point(false).as_bytes().to_vec(); + let private_key_bytes = signing_key.to_bytes().to_vec(); + (hex::encode(public_key_bytes), hex::encode(private_key_bytes)) +} + +async fn test_authentication( + user_name: &str, + circle_name: &str, + private_key: &str, +) -> Result<(), Box> { + println!(" ๐Ÿ“ค {} connecting to circle '{}'...", user_name, circle_name); + + let mut client = CircleWsClientBuilder::new(format!("ws://localhost:8443/{}", circle_name)) + .with_keypair(private_key.to_string()) + .build(); + + // Connect + match client.connect().await { + Ok(_) => println!(" โœ… Connection established"), + Err(e) => { + println!(" โŒ Connection failed: {}", e); + return Err(e.into()); + } + } + + // Authenticate + print!(" ๐Ÿ“ค Authenticating... "); + match client.authenticate().await { + Ok(response) => { + println!("โœ… Success!"); + println!(" Response: {}", response); + } + Err(e) => { + println!("โŒ Failed: {}", e); + client.disconnect().await; + return Err(e.into()); + } + } + + // Test whoami to confirm authentication + print!(" ๐Ÿ“ค Testing whoami... "); + match client.whoami().await { + Ok(response) => { + println!("โœ… Success!"); + println!(" Response: {}", response); + } + Err(e) => { + println!("โŒ Failed: {}", e); + client.disconnect().await; + return Err(e.into()); + } + } + + // Disconnect + client.disconnect().await; + println!(" ๐Ÿ”Œ Disconnected"); + + Ok(()) +} + +async fn test_authentication_failure( + user_name: &str, + circle_name: &str, + private_key: &str, +) { + println!(" ๐Ÿ“ค {} connecting to circle '{}'...", user_name, circle_name); + + let mut client = CircleWsClientBuilder::new(format!("ws://localhost:8443/{}", circle_name)) + .with_keypair(private_key.to_string()) + .build(); + + // Connect + match client.connect().await { + Ok(_) => println!(" โœ… Connection established"), + Err(e) => { + println!(" โŒ Connection failed: {}", e); + return; + } + } + + // Authenticate (expecting failure) + print!(" ๐Ÿ“ค Authenticating... "); + match client.authenticate().await { + Ok(response) => { + println!("โŒ Unexpected success: {}", response); + println!(" This should have failed!"); + } + Err(e) => { + println!("โœ… Expected failure!"); + println!(" Error: {}", e); + } + } + + // Disconnect + client.disconnect().await; + println!(" ๐Ÿ”Œ Disconnected"); +} diff --git a/interfaces/websocket/server/Cargo.toml b/interfaces/websocket/server/Cargo.toml index ea4215c..5a1c204 100644 --- a/interfaces/websocket/server/Cargo.toml +++ b/interfaces/websocket/server/Cargo.toml @@ -44,7 +44,8 @@ redis = { workspace = true } uuid = { workspace = true } tokio = { workspace = true } chrono = { workspace = true } -rhai_dispatcher = { path = "../../../../rhailib/src/dispatcher" } # Corrected relative path +hero_dispatcher = { path = "../../../core/dispatcher" } +hero_job = { path = "../../../core/job" } thiserror = { workspace = true } heromodels = { path = "../../../../db/heromodels" } diff --git a/interfaces/websocket/server/README.md b/interfaces/websocket/server/README.md index f34aba7..76f6df6 100644 --- a/interfaces/websocket/server/README.md +++ b/interfaces/websocket/server/README.md @@ -1,29 +1,11 @@ -# `server`: The Circles WebSocket Server +# `server`: The Hero WebSocket Server -The `server` crate provides a secure, high-performance WebSocket server built with `Actix`. It is the core backend component of the `circles` ecosystem, responsible for handling client connections, processing JSON-RPC requests, and executing Rhai scripts in a secure manner. +An OpenRPC WebSocket Server to interface with the [cores](../../core) of authorized circles. -## Features - -- **`Actix` Framework**: Built on `Actix`, a powerful and efficient actor-based web framework. -- **WebSocket Management**: Uses `actix-web-actors` to manage each client connection in its own isolated actor (`CircleWs`), ensuring robust and concurrent session handling. -- **JSON-RPC 2.0 API**: Implements a JSON-RPC 2.0 API for all client-server communication. The API is formally defined in the root [openrpc.json](../../openrpc.json) file. -- **Secure Authentication**: Features a built-in `secp256k1` signature-based authentication system to protect sensitive endpoints. -- **Stateful Session Management**: The `CircleWs` actor maintains the authentication state for each client, granting or denying access to protected methods like `play`. -- **Webhook Integration**: Supports HTTP webhook endpoints for external services (Stripe, iDenfy) with signature verification and script execution capabilities. - -## Core Components - -### `spawn_circle_server` - -This is the main entry point function for the server. It configures and starts the `Actix` HTTP server and sets up the WebSocket route with path-based routing (`/{circle_pk}`). - -### `CircleWs` Actor - -This `Actix` actor is the heart of the server's session management. A new instance of `CircleWs` is created for each client that connects. Its responsibilities include: -- Handling the WebSocket connection lifecycle. -- Parsing incoming JSON-RPC messages. -- Managing the authentication state of the session (i.e., whether the client is authenticated or not). -- Dispatching requests to the appropriate handlers (`fetch_nonce`, `authenticate`, and `play`). +- [OpenRPC Specification](openrpc.json) defines the API. +- There are RPC Operations specified to authorize a websocket connection. +- Authorized clients can execute Rhai scripts on the server. +- The server uses the [supervisor] to dispatch [jobs] to the [workers]. ## Authentication @@ -34,43 +16,6 @@ The server provides a robust authentication mechanism to ensure that only author For a more detailed breakdown of the authentication architecture, please see the [ARCHITECTURE.md](docs/ARCHITECTURE.md) file. -## Webhook Integration - -The server also provides HTTP webhook endpoints for external services alongside the WebSocket functionality: - -- **Stripe Webhooks**: `POST /webhooks/stripe/{circle_pk}` - Handles Stripe payment events -- **iDenfy Webhooks**: `POST /webhooks/idenfy/{circle_pk}` - Handles iDenfy KYC verification events - -### Webhook Features - -- **Signature Verification**: All webhooks use HMAC signature verification for security -- **Script Execution**: Webhook events trigger Rhai script execution via the same Redis-based system -- **Type Safety**: Webhook payload types are defined in the `heromodels` library for reusability -- **Modular Architecture**: Separate handlers for each webhook provider with common utilities - -For detailed webhook architecture and configuration, see [WEBHOOK_ARCHITECTURE.md](WEBHOOK_ARCHITECTURE.md). - ## How to Run -### As a Library - -The `server` is designed to be used as a library by the `launcher`, which is responsible for spawning a single multi-circle server instance that can handle multiple circles via path-based routing. - -To run the server via the launcher with circle public keys: -```bash -cargo run --package launcher -- -k -k [options] -``` - -The launcher will start a single `server` instance that can handle multiple circles through path-based WebSocket connections at `/{circle_pk}`. - -### Standalone Binary - -A standalone binary is also available for development and testing purposes. See [`cmd/README.md`](cmd/README.md) for detailed usage instructions. - -```bash -# Basic standalone server -cargo run - -# With authentication and TLS -cargo run -- --auth --tls --cert cert.pem --key key.pem -``` +cargo run diff --git a/interfaces/websocket/server/examples/circle_auth_demo.rs b/interfaces/websocket/server/examples/circle_auth_demo.rs new file mode 100644 index 0000000..cc88a6a --- /dev/null +++ b/interfaces/websocket/server/examples/circle_auth_demo.rs @@ -0,0 +1,73 @@ +use std::collections::HashMap; +use hero_websocket_server::ServerBuilder; + +#[tokio::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + + // Define circles and their members + let mut circles = HashMap::new(); + + // Circle "alpha" with two members + circles.insert("alpha".to_string(), vec![ + "04a1b2c3d4e5f6789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789ab".to_string(), + "04b2c3d4e5f6789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcd".to_string(), + ]); + + // Circle "beta" with one member + circles.insert("beta".to_string(), vec![ + "04c3d4e5f6789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef".to_string(), + ]); + + // Circle "gamma" with three members + circles.insert("gamma".to_string(), vec![ + "04d4e5f6789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef01".to_string(), + "04e5f6789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123".to_string(), + "04f6789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef012345".to_string(), + ]); + + // Build server with circle-based authentication + let server = ServerBuilder::new() + .host("127.0.0.1") + .port(8080) + .redis_url("redis://localhost:6379") + .with_auth() + .circles(circles) + .build()?; + + println!("Starting WebSocket server with circle-based authentication..."); + println!("Available circles:"); + for (circle_id, members) in &server.circles { + println!(" Circle '{}' has {} members:", circle_id, members.len()); + for (i, member) in members.iter().enumerate() { + println!(" Member {}: {}...{}", i + 1, &member[..10], &member[member.len()-10..]); + } + } + + println!("\nTo connect to a specific circle, use URLs like:"); + println!(" ws://127.0.0.1:8080/ws/alpha (for circle 'alpha')"); + println!(" ws://127.0.0.1:8080/ws/beta (for circle 'beta')"); + println!(" ws://127.0.0.1:8080/ws/gamma (for circle 'gamma')"); + + println!("\nAuthentication flow:"); + println!("1. Connect to WebSocket URL for specific circle"); + println!("2. Call 'fetch_nonce' method to get a nonce"); + println!("3. Sign the nonce with your private key"); + println!("4. Call 'authenticate' with your public key and signature"); + println!("5. Server will verify signature AND check circle membership"); + println!("6. Only members of the target circle will be authenticated"); + + // Start the server + let (task_handle, server_handle) = server.spawn_circle_server()?; + + println!("\nServer started! Press Ctrl+C to stop."); + + // Wait for the server to complete + match task_handle.await { + Ok(Ok(())) => println!("Server stopped successfully"), + Ok(Err(e)) => eprintln!("Server error: {}", e), + Err(e) => eprintln!("Task join error: {}", e), + } + + Ok(()) +} diff --git a/interfaces/websocket/server/src/builder.rs b/interfaces/websocket/server/src/builder.rs index 082b5ab..419465c 100644 --- a/interfaces/websocket/server/src/builder.rs +++ b/interfaces/websocket/server/src/builder.rs @@ -13,6 +13,7 @@ pub struct ServerBuilder { enable_auth: bool, enable_webhooks: bool, circle_worker_id: String, + circles: HashMap>, } impl ServerBuilder { @@ -28,6 +29,7 @@ impl ServerBuilder { enable_auth: false, enable_webhooks: false, circle_worker_id: "default".to_string(), + circles: HashMap::new(), } } @@ -72,6 +74,11 @@ impl ServerBuilder { self.enable_webhooks = true; self } + + pub fn circles(mut self, circles: HashMap>) -> Self { + self.circles = circles; + self + } pub fn build(self) -> Result { Ok(Server { @@ -87,8 +94,10 @@ impl ServerBuilder { circle_worker_id: self.circle_worker_id, circle_name: "default".to_string(), circle_public_key: "default".to_string(), + circles: self.circles, nonce_store: HashMap::new(), authenticated_pubkey: None, + dispatcher: None, }) } } diff --git a/interfaces/websocket/server/src/config.rs b/interfaces/websocket/server/src/config.rs new file mode 100644 index 0000000..3636915 --- /dev/null +++ b/interfaces/websocket/server/src/config.rs @@ -0,0 +1,220 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::fs; +use std::path::Path; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ServerConfig { + /// Server host address + #[serde(default = "default_host")] + pub host: String, + + /// Server port + #[serde(default = "default_port")] + pub port: u16, + + /// Redis connection URL + #[serde(default = "default_redis_url")] + pub redis_url: String, + + /// Enable authentication + #[serde(default)] + pub auth: bool, + + /// Enable TLS/WSS + #[serde(default)] + pub tls: bool, + + /// Path to TLS certificate file + pub cert: Option, + + /// Path to TLS private key file + pub key: Option, + + /// Separate port for TLS connections + pub tls_port: Option, + + /// Enable webhook handling + #[serde(default)] + pub webhooks: bool, + + /// Circles configuration - maps circle names to lists of member public keys + #[serde(default)] + pub circles: HashMap>, +} + +impl Default for ServerConfig { + fn default() -> Self { + Self { + host: default_host(), + port: default_port(), + redis_url: default_redis_url(), + auth: false, + tls: false, + cert: None, + key: None, + tls_port: None, + webhooks: false, + circles: HashMap::new(), + } + } +} + +impl ServerConfig { + /// Load configuration from a JSON file + pub fn from_file>(path: P) -> Result { + let content = fs::read_to_string(path.as_ref()) + .map_err(|e| ConfigError::FileRead(path.as_ref().to_path_buf(), e))?; + + let config: ServerConfig = serde_json::from_str(&content) + .map_err(|e| ConfigError::JsonParse(e))?; + + config.validate()?; + Ok(config) + } + + /// Save configuration to a JSON file + pub fn to_file>(&self, path: P) -> Result<(), ConfigError> { + let content = serde_json::to_string_pretty(self) + .map_err(|e| ConfigError::JsonSerialize(e))?; + + fs::write(path.as_ref(), content) + .map_err(|e| ConfigError::FileWrite(path.as_ref().to_path_buf(), e))?; + + Ok(()) + } + + /// Validate the configuration + pub fn validate(&self) -> Result<(), ConfigError> { + // Validate TLS configuration + if self.tls && (self.cert.is_none() || self.key.is_none()) { + return Err(ConfigError::InvalidTlsConfig( + "TLS is enabled but certificate or key path is missing".to_string() + )); + } + + // Validate that circles are not empty if auth is enabled + if self.auth && self.circles.is_empty() { + return Err(ConfigError::InvalidAuthConfig( + "Authentication is enabled but no circles are configured".to_string() + )); + } + + Ok(()) + } + + /// Create a sample configuration file + pub fn create_sample() -> Self { + let mut circles = HashMap::new(); + circles.insert( + "example_circle".to_string(), + vec![ + "0x1234567890abcdef1234567890abcdef12345678".to_string(), + "0xabcdef1234567890abcdef1234567890abcdef12".to_string(), + ] + ); + + Self { + host: "127.0.0.1".to_string(), + port: 8443, + redis_url: "redis://127.0.0.1/".to_string(), + auth: true, + tls: false, + cert: Some("cert.pem".to_string()), + key: Some("key.pem".to_string()), + tls_port: Some(8444), + webhooks: false, + circles, + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum ConfigError { + #[error("Failed to read config file {0}: {1}")] + FileRead(std::path::PathBuf, std::io::Error), + + #[error("Failed to write config file {0}: {1}")] + FileWrite(std::path::PathBuf, std::io::Error), + + #[error("Failed to parse JSON config: {0}")] + JsonParse(serde_json::Error), + + #[error("Failed to serialize JSON config: {0}")] + JsonSerialize(serde_json::Error), + + #[error("Invalid TLS configuration: {0}")] + InvalidTlsConfig(String), + + #[error("Invalid authentication configuration: {0}")] + InvalidAuthConfig(String), +} + +// Default value functions +fn default_host() -> String { + "127.0.0.1".to_string() +} + +fn default_port() -> u16 { + 8443 +} + +fn default_redis_url() -> String { + "redis://127.0.0.1/".to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::NamedTempFile; + + #[test] + fn test_config_serialization() { + let config = ServerConfig::create_sample(); + let json = serde_json::to_string_pretty(&config).unwrap(); + let deserialized: ServerConfig = serde_json::from_str(&json).unwrap(); + + assert_eq!(config.host, deserialized.host); + assert_eq!(config.port, deserialized.port); + assert_eq!(config.circles.len(), deserialized.circles.len()); + } + + #[test] + fn test_config_file_operations() { + let config = ServerConfig::create_sample(); + let temp_file = NamedTempFile::new().unwrap(); + + // Test writing + config.to_file(temp_file.path()).unwrap(); + + // Test reading + let loaded_config = ServerConfig::from_file(temp_file.path()).unwrap(); + assert_eq!(config.host, loaded_config.host); + assert_eq!(config.circles.len(), loaded_config.circles.len()); + } + + #[test] + fn test_config_validation() { + let mut config = ServerConfig::default(); + + // Valid config should pass + assert!(config.validate().is_ok()); + + // TLS enabled without cert/key should fail + config.tls = true; + assert!(config.validate().is_err()); + + // Fix TLS config + config.cert = Some("cert.pem".to_string()); + config.key = Some("key.pem".to_string()); + assert!(config.validate().is_ok()); + + // Auth enabled without circles should fail + config.auth = true; + assert!(config.validate().is_err()); + + // Add circles + config.circles.insert("test".to_string(), vec!["pubkey".to_string()]); + assert!(config.validate().is_ok()); + } +} diff --git a/interfaces/websocket/server/src/handler.rs b/interfaces/websocket/server/src/handler.rs index cfa3cb4..f8f89cc 100644 --- a/interfaces/websocket/server/src/handler.rs +++ b/interfaces/websocket/server/src/handler.rs @@ -31,6 +31,16 @@ impl actix::StreamHandler> for Server { self.handle_whoami(req.params, client_rpc_id, ctx) } "play" => self.handle_play(req.params, client_rpc_id, ctx), + "create_job" => self.handle_create_job(req.params, client_rpc_id, ctx), + "start_job" => self.handle_start_job(req.params, client_rpc_id, ctx), + "run_job" => self.handle_run_job(req.params, client_rpc_id, ctx), + "get_job_status" => self.handle_get_job_status(req.params, client_rpc_id, ctx), + "get_job_output" => self.handle_get_job_output(req.params, client_rpc_id, ctx), + "get_job_logs" => self.handle_get_job_logs(req.params, client_rpc_id, ctx), + "list_jobs" => self.handle_list_jobs(req.params, client_rpc_id, ctx), + "stop_job" => self.handle_stop_job(req.params, client_rpc_id, ctx), + "delete_job" => self.handle_delete_job(req.params, client_rpc_id, ctx), + "clear_all_jobs" => self.handle_clear_all_jobs(req.params, client_rpc_id, ctx), _ => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), diff --git a/interfaces/websocket/server/src/job_handlers.rs b/interfaces/websocket/server/src/job_handlers.rs new file mode 100644 index 0000000..ea89386 --- /dev/null +++ b/interfaces/websocket/server/src/job_handlers.rs @@ -0,0 +1,999 @@ +use crate::Server; +use actix::prelude::*; +use actix_web_actors::ws; +use hero_dispatcher::{Dispatcher, ScriptType}; +use serde_json::{json, Value}; +use std::time::Duration; + +const TASK_TIMEOUT_DURATION: Duration = Duration::from_secs(30); + +#[derive(serde::Serialize)] +struct SuccessResult { + success: bool, +} + +#[derive(serde::Serialize)] +struct JobResult { + job_id: String, +} + +#[derive(serde::Serialize)] +struct JsonRpcResponse { + jsonrpc: String, + result: Option, + error: Option, + id: Value, +} + +#[derive(serde::Serialize)] +struct JsonRpcError { + code: i32, + message: String, + data: Option, +} + +impl Server { + pub fn handle_create_job( + &mut self, + params: Value, + client_rpc_id: Value, + ctx: &mut ws::WebsocketContext, + ) { + // For now, create_job is the same as run_job + self.handle_run_job(params, client_rpc_id, ctx); + } + + pub fn handle_start_job( + &mut self, + params: Value, + client_rpc_id: Value, + ctx: &mut ws::WebsocketContext, + ) { + if self.enable_auth && !self.is_connection_authenticated() { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: "Authentication required".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + + let job_id = match params.get("job_id").and_then(|v| v.as_str()) { + Some(id) => id.to_string(), + None => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32602, + message: "Missing required parameter: job_id".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + }; + + let dispatcher = match self.dispatcher.clone() { + Some(d) => d, + None => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32603, + message: "Internal error: dispatcher not available".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + }; + + let client_rpc_id_clone = client_rpc_id.clone(); + let fut = async move { + dispatcher.start_job(&job_id).await + }; + + ctx.spawn( + fut.into_actor(self) + .map(move |res, _act, ctx_inner| match res { + Ok(_) => { + let result = SuccessResult { success: true }; + let resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: Some(serde_json::to_value(result).unwrap()), + error: None, + id: client_rpc_id_clone.clone(), + }; + ctx_inner.text(serde_json::to_string(&resp).unwrap()); + } + Err(e) => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: format!("Failed to start job: {}", e), + data: None, + }), + id: client_rpc_id_clone, + }; + ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); + } + }) + .timeout(TASK_TIMEOUT_DURATION) + .map(move |res, _act, ctx_inner| { + if res.is_err() { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: "Request timed out".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); + } + }), + ); + } + + pub fn handle_get_job_status( + &mut self, + params: Value, + client_rpc_id: Value, + ctx: &mut ws::WebsocketContext, + ) { + if self.enable_auth && !self.is_connection_authenticated() { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: "Authentication required".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + + let job_id = match params.get("job_id").and_then(|v| v.as_str()) { + Some(id) => id.to_string(), + None => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32602, + message: "Missing required parameter: job_id".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + }; + + let dispatcher = match self.dispatcher.clone() { + Some(d) => d, + None => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32603, + message: "Internal error: dispatcher not available".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + }; + + let client_rpc_id_clone = client_rpc_id.clone(); + let fut = async move { + dispatcher.get_job_status(&job_id).await + }; + + ctx.spawn( + fut.into_actor(self) + .map(move |res, _act, ctx_inner| match res { + Ok(status) => { + let resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: Some(json!(status)), + error: None, + id: client_rpc_id_clone.clone(), + }; + ctx_inner.text(serde_json::to_string(&resp).unwrap()); + } + Err(e) => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: format!("Failed to get job status: {}", e), + data: None, + }), + id: client_rpc_id_clone, + }; + ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); + } + }) + .timeout(TASK_TIMEOUT_DURATION) + .map(move |res, _act, ctx_inner| { + if res.is_err() { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: "Request timed out".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); + } + }), + ); + } + + pub fn handle_list_jobs( + &mut self, + _params: Value, + client_rpc_id: Value, + ctx: &mut ws::WebsocketContext, + ) { + if self.enable_auth && !self.is_connection_authenticated() { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: "Authentication required".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + + let dispatcher = match self.dispatcher.clone() { + Some(d) => d, + None => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32603, + message: "Internal error: dispatcher not available".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + }; + + let client_rpc_id_clone = client_rpc_id.clone(); + let fut = async move { + dispatcher.list_jobs().await + }; + + ctx.spawn( + fut.into_actor(self) + .map(move |res, _act, ctx_inner| match res { + Ok(jobs) => { + let resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: Some(json!(jobs)), + error: None, + id: client_rpc_id_clone.clone(), + }; + ctx_inner.text(serde_json::to_string(&resp).unwrap()); + } + Err(e) => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: format!("Failed to list jobs: {}", e), + data: None, + }), + id: client_rpc_id_clone, + }; + ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); + } + }) + .timeout(TASK_TIMEOUT_DURATION) + .map(move |res, _act, ctx_inner| { + if res.is_err() { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: "Request timed out".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); + } + }), + ); + } + pub fn handle_run_job( + &mut self, + params: Value, + client_rpc_id: Value, + ctx: &mut ws::WebsocketContext, + ) { + if self.enable_auth && !self.is_connection_authenticated() { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: "Authentication required".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + + let circle_pk = match params.get("circle_pk").and_then(|v| v.as_str()) { + Some(pk) => pk.to_string(), + None => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32602, + message: "Missing required parameter: circle_pk".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + }; + + let script_content = match params.get("script_content").and_then(|v| v.as_str()) { + Some(script) => script.to_string(), + None => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32602, + message: "Missing required parameter: script_content".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + }; + + let dispatcher = match self.dispatcher.clone() { + Some(d) => d, + None => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32603, + message: "Internal error: dispatcher not available".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + }; + + let client_rpc_id_clone = client_rpc_id.clone(); + let fut = async move { + dispatcher + .new_job() + .context_id(&circle_pk) + .script_type(ScriptType::RhaiSAL) + .script(&script_content) + .timeout(TASK_TIMEOUT_DURATION) + .await_response() + .await + }; + + ctx.spawn( + fut.into_actor(self) + .map(move |res, _act, ctx_inner| match res { + Ok(job_id) => { + let result = JobResult { job_id }; + let resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: Some(serde_json::to_value(result).unwrap()), + error: None, + id: client_rpc_id_clone.clone(), + }; + ctx_inner.text(serde_json::to_string(&resp).unwrap()); + } + Err(e) => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: format!("Failed to run job: {}", e), + data: None, + }), + id: client_rpc_id_clone, + }; + ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); + } + }) + .timeout(TASK_TIMEOUT_DURATION) + .map(move |res, _act, ctx_inner| { + if res.is_err() { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: "Request timed out".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); + } + }), + ); + } + + pub fn handle_get_job_output( + &mut self, + params: Value, + client_rpc_id: Value, + ctx: &mut ws::WebsocketContext, + ) { + if self.enable_auth && !self.is_connection_authenticated() { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: "Authentication required".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + + let job_id = match params.get("job_id").and_then(|v| v.as_str()) { + Some(id) => id.to_string(), + None => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32602, + message: "Missing required parameter: job_id".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + }; + + let dispatcher = match self.dispatcher.clone() { + Some(d) => d, + None => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32603, + message: "Internal error: dispatcher not available".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + }; + + let client_rpc_id_clone = client_rpc_id.clone(); + let fut = async move { + dispatcher.get_job_output(&job_id).await + }; + + ctx.spawn( + fut.into_actor(self) + .map(move |res, _act, ctx_inner| match res { + Ok(output) => { + let resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: Some(json!(output)), + error: None, + id: client_rpc_id_clone.clone(), + }; + ctx_inner.text(serde_json::to_string(&resp).unwrap()); + } + Err(e) => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: format!("Failed to get job output: {}", e), + data: None, + }), + id: client_rpc_id_clone, + }; + ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); + } + }) + .timeout(TASK_TIMEOUT_DURATION) + .map(move |res, _act, ctx_inner| { + if res.is_err() { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: "Request timed out".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); + } + }), + ); + } + + pub fn handle_get_job_logs( + &mut self, + params: Value, + client_rpc_id: Value, + ctx: &mut ws::WebsocketContext, + ) { + if self.enable_auth && !self.is_connection_authenticated() { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: "Authentication required".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + + let job_id = match params.get("job_id").and_then(|v| v.as_str()) { + Some(id) => id.to_string(), + None => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32602, + message: "Missing required parameter: job_id".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + }; + + let dispatcher = match self.dispatcher.clone() { + Some(d) => d, + None => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32603, + message: "Internal error: dispatcher not available".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + }; + + let client_rpc_id_clone = client_rpc_id.clone(); + let fut = async move { + dispatcher.get_job_logs(&job_id).await + }; + + ctx.spawn( + fut.into_actor(self) + .map(move |res, _act, ctx_inner| match res { + Ok(logs) => { + let result = json!({ "logs": logs }); + let resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: Some(result), + error: None, + id: client_rpc_id_clone.clone(), + }; + ctx_inner.text(serde_json::to_string(&resp).unwrap()); + } + Err(e) => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: format!("Failed to get job logs: {}", e), + data: None, + }), + id: client_rpc_id_clone, + }; + ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); + } + }) + .timeout(TASK_TIMEOUT_DURATION) + .map(move |res, _act, ctx_inner| { + if res.is_err() { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: "Request timed out".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); + } + }), + ); + } + + pub fn handle_stop_job( + &mut self, + params: Value, + client_rpc_id: Value, + ctx: &mut ws::WebsocketContext, + ) { + if self.enable_auth && !self.is_connection_authenticated() { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: "Authentication required".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + + let job_id = match params.get("job_id").and_then(|v| v.as_str()) { + Some(id) => id.to_string(), + None => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32602, + message: "Missing required parameter: job_id".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + }; + + let dispatcher = match self.dispatcher.clone() { + Some(d) => d, + None => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32603, + message: "Internal error: dispatcher not available".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + }; + + let client_rpc_id_clone = client_rpc_id.clone(); + let fut = async move { + dispatcher.stop_job(&job_id).await + }; + + ctx.spawn( + fut.into_actor(self) + .map(move |res, _act, ctx_inner| match res { + Ok(_) => { + let resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: Some(json!(null)), + error: None, + id: client_rpc_id_clone.clone(), + }; + ctx_inner.text(serde_json::to_string(&resp).unwrap()); + } + Err(e) => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: format!("Failed to stop job: {}", e), + data: None, + }), + id: client_rpc_id_clone, + }; + ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); + } + }) + .timeout(TASK_TIMEOUT_DURATION) + .map(move |res, _act, ctx_inner| { + if res.is_err() { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: "Request timed out".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); + } + }), + ); + } + + pub fn handle_delete_job( + &mut self, + params: Value, + client_rpc_id: Value, + ctx: &mut ws::WebsocketContext, + ) { + if self.enable_auth && !self.is_connection_authenticated() { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: "Authentication required".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + + let job_id = match params.get("job_id").and_then(|v| v.as_str()) { + Some(id) => id.to_string(), + None => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32602, + message: "Missing required parameter: job_id".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + }; + + let dispatcher = match self.dispatcher.clone() { + Some(d) => d, + None => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32603, + message: "Internal error: dispatcher not available".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + }; + + let client_rpc_id_clone = client_rpc_id.clone(); + let fut = async move { + dispatcher.delete_job(&job_id).await + }; + + ctx.spawn( + fut.into_actor(self) + .map(move |res, _act, ctx_inner| match res { + Ok(_) => { + let resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: Some(json!(null)), + error: None, + id: client_rpc_id_clone.clone(), + }; + ctx_inner.text(serde_json::to_string(&resp).unwrap()); + } + Err(e) => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: format!("Failed to delete job: {}", e), + data: None, + }), + id: client_rpc_id_clone, + }; + ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); + } + }) + .timeout(TASK_TIMEOUT_DURATION) + .map(move |res, _act, ctx_inner| { + if res.is_err() { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: "Request timed out".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); + } + }), + ); + } + + pub fn handle_clear_all_jobs( + &mut self, + _params: Value, + client_rpc_id: Value, + ctx: &mut ws::WebsocketContext, + ) { + if self.enable_auth && !self.is_connection_authenticated() { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: "Authentication required".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + + let dispatcher = match self.dispatcher.clone() { + Some(d) => d, + None => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32603, + message: "Internal error: dispatcher not available".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; + } + }; + + let client_rpc_id_clone = client_rpc_id.clone(); + let fut = async move { + dispatcher.clear_all_jobs().await + }; + + ctx.spawn( + fut.into_actor(self) + .map(move |res, _act, ctx_inner| match res { + Ok(_) => { + let resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: Some(json!(null)), + error: None, + id: client_rpc_id_clone.clone(), + }; + ctx_inner.text(serde_json::to_string(&resp).unwrap()); + } + Err(e) => { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: format!("Failed to clear jobs: {}", e), + data: None, + }), + id: client_rpc_id_clone, + }; + ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); + } + }) + .timeout(TASK_TIMEOUT_DURATION) + .map(move |res, _act, ctx_inner| { + if res.is_err() { + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32000, + message: "Request timed out".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); + } + }), + ); + } +} diff --git a/interfaces/websocket/server/src/lib.rs b/interfaces/websocket/server/src/lib.rs index aff0395..ef20259 100644 --- a/interfaces/websocket/server/src/lib.rs +++ b/interfaces/websocket/server/src/lib.rs @@ -3,7 +3,8 @@ use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer}; use actix_web_actors::ws; use log::{info, error}; // Added error for better logging use once_cell::sync::Lazy; -use hero_dispatcher::{DispatcherBuilder, DispatcherError}; +use hero_dispatcher::{Dispatcher, DispatcherBuilder, DispatcherError}; +use hero_job::{Job, JobStatus}; use rustls::pki_types::PrivateKeyDer; use rustls::ServerConfig as RustlsServerConfig; use rustls_pemfile::{certs, pkcs8_private_keys}; @@ -29,10 +30,13 @@ static AUTHENTICATED_CONNECTIONS: Lazy, String>>> = mod auth; mod builder; +mod config; mod handler; +mod job_handlers; use crate::auth::{generate_nonce, NonceResponse}; pub use crate::builder::ServerBuilder; +pub use crate::config::{ServerConfig, ConfigError}; // Re-export server handle type for external use pub type ServerHandle = actix_web::dev::ServerHandle; @@ -100,6 +104,64 @@ struct FetchNonceParams { pubkey: String, } +// Job management parameter structures +#[derive(Debug, Serialize, Deserialize)] +struct CreateJobParams { + job: Job, +} + +#[derive(Debug, Serialize, Deserialize)] +struct RunJobParams { + job: Job, +} + +#[derive(Debug, Serialize, Deserialize)] +struct JobIdParams { + job_id: String, +} + +// Job management result structures +#[derive(Debug, Serialize, Deserialize)] +struct CreateJobResult { + job_id: String, +} + +#[derive(Debug, Serialize, Deserialize)] +struct RunJobResult { + job_id: String, + output: String, +} + +#[derive(Debug, Serialize, Deserialize)] +struct JobStatusResult { + status: JobStatus, +} + +#[derive(Debug, Serialize, Deserialize)] +struct JobOutputResult { + output: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +struct JobLogsResult { + logs: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +struct ListJobsResult { + jobs: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +struct SuccessResult { + success: bool, +} + +#[derive(Debug, Serialize, Deserialize)] +struct ClearJobsResult { + deleted_count: usize, +} + impl Actor for Server { type Context = ws::WebsocketContext; @@ -142,11 +204,14 @@ pub struct Server { pub tls_port: Option, pub enable_auth: bool, pub enable_webhooks: bool, - pub circle_worker_id: String, + pub circle_name: String, pub circle_public_key: String, + /// Map of circle IDs to vectors of public keys that are members of that circle + pub circles: HashMap>, nonce_store: HashMap, authenticated_pubkey: Option, + pub dispatcher: Option, } impl Server { @@ -250,33 +315,34 @@ impl Server { client_rpc_id: Value, ctx: &mut ws::WebsocketContext, ) { - match serde_json::from_value::(params) { - Ok(params) => { - let nonce_response = generate_nonce(); - self.nonce_store - .insert(params.pubkey, nonce_response.clone()); - let resp = JsonRpcResponse { - jsonrpc: "2.0".to_string(), - result: Some(serde_json::to_value(nonce_response).unwrap()), - error: None, - id: client_rpc_id, - }; - ctx.text(serde_json::to_string(&resp).unwrap()); - } - Err(e) => { + // Extract pubkey string directly from params + let pubkey = match params.as_str() { + Some(pk) => pk.to_string(), + None => { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), result: None, error: Some(JsonRpcError { code: -32602, - message: format!("Invalid parameters for fetch_nonce: {}", e), + message: "Invalid parameters for fetch_nonce: expected string pubkey".to_string(), data: None, }), id: client_rpc_id, }; ctx.text(serde_json::to_string(&err_resp).unwrap()); + return; } - } + }; + + let nonce_response = generate_nonce(); + self.nonce_store.insert(pubkey, nonce_response.clone()); + let resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: Some(serde_json::to_value(&nonce_response.nonce).unwrap()), + error: None, + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&resp).unwrap()); } fn handle_authenticate( @@ -327,18 +393,41 @@ impl Server { }; if is_valid { - self.authenticated_pubkey = Some(auth_params.pubkey.clone()); - AUTHENTICATED_CONNECTIONS - .lock() - .unwrap() - .insert(ctx.address(), auth_params.pubkey); - let resp = JsonRpcResponse { - jsonrpc: "2.0".to_string(), - result: Some(serde_json::json!({ "authenticated": true })), - error: None, - id: client_rpc_id, - }; - ctx.text(serde_json::to_string(&resp).unwrap()); + // Check if the authenticated public key belongs to the circle + let is_circle_member = self.circles + .get(&self.circle_name) + .map(|members| members.contains(&auth_params.pubkey)) + .unwrap_or(false); + + if is_circle_member { + self.authenticated_pubkey = Some(auth_params.pubkey.clone()); + AUTHENTICATED_CONNECTIONS + .lock() + .unwrap() + .insert(ctx.address(), auth_params.pubkey); + let resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: Some(serde_json::json!({ "authenticated": true })), + error: None, + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&resp).unwrap()); + } else { + log::warn!("Auth failed for {}: Public key {} not a member of circle {}", + self.circle_name, auth_params.pubkey, self.circle_name); + let err_resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: None, + error: Some(JsonRpcError { + code: -32003, + message: "Public key not authorized for this circle".to_string(), + data: None, + }), + id: client_rpc_id, + }; + ctx.text(serde_json::to_string(&err_resp).unwrap()); + ctx.stop(); + } } else { let err_resp = JsonRpcResponse { jsonrpc: "2.0".to_string(), @@ -459,7 +548,7 @@ impl Server { let redis_url_clone = self.redis_url.clone(); let _rpc_id_clone = client_rpc_id.clone(); let public_key = self.authenticated_pubkey.clone(); - let worker_id_clone = self.circle_worker_id.clone(); + let fut = async move { let caller_id = public_key.unwrap_or_else(|| "anonymous".to_string()); @@ -471,7 +560,7 @@ impl Server { hero_dispatcher .new_job() .context_id(&circle_pk_clone) - .worker_id(&worker_id_clone) + .script_type(hero_dispatcher::ScriptType::RhaiSAL) .script(&script_content) .timeout(TASK_TIMEOUT_DURATION) .await_response() @@ -484,35 +573,16 @@ impl Server { ctx.spawn( fut.into_actor(self) .map(move |res, _act, ctx_inner| match res { - Ok(task_details) => { - if task_details.status == "completed" { - let output = task_details - .output - .unwrap_or_else(|| "No output".to_string()); - let result_value = PlayResult { output }; - let resp = JsonRpcResponse { - jsonrpc: "2.0".to_string(), - result: Some(serde_json::to_value(result_value).unwrap()), - error: None, - id: client_rpc_id, - }; - ctx_inner.text(serde_json::to_string(&resp).unwrap()); - } else { - let error_message = task_details.error.unwrap_or_else(|| { - "Rhai script execution failed".to_string() - }); - let err_resp = JsonRpcResponse { - jsonrpc: "2.0".to_string(), - result: None, - error: Some(JsonRpcError { - code: -32000, - message: error_message, - data: None, - }), - id: client_rpc_id, - }; - ctx_inner.text(serde_json::to_string(&err_resp).unwrap()); - } + Ok(output) => { + // The dispatcher returns the actual string output from job execution + let result_value = PlayResult { output }; + let resp = JsonRpcResponse { + jsonrpc: "2.0".to_string(), + result: Some(serde_json::to_value(result_value).unwrap()), + error: None, + id: client_rpc_id, + }; + ctx_inner.text(serde_json::to_string(&resp).unwrap()); } Err(e) => { let (code, message) = match e {