From 70b5b336f1a7594bae6df508b167163c0956a3d5 Mon Sep 17 00:00:00 2001 From: Timur Gordon <31495328+timurgordon@users.noreply.github.com> Date: Mon, 21 Jul 2025 20:57:58 +0200 Subject: [PATCH] ws client - server improvements --- src/client_ws/ARCHITECTURE.md | 62 ++-- src/client_ws/Cargo.toml | 2 + src/client_ws/README.md | 124 +++++-- src/client_ws/cmd/README.md | 55 ++- src/client_ws/cmd/main.rs | 155 +++++++-- src/client_ws/src/lib.rs | 624 ++++++++++++++++++++++++---------- src/server/src/lib.rs | 7 + 7 files changed, 763 insertions(+), 266 deletions(-) diff --git a/src/client_ws/ARCHITECTURE.md b/src/client_ws/ARCHITECTURE.md index 0a4332c..9ffc257 100644 --- a/src/client_ws/ARCHITECTURE.md +++ b/src/client_ws/ARCHITECTURE.md @@ -10,6 +10,7 @@ The `client_ws` is built on the following principles: - **Modularity**: The crate is divided into logical modules, with a clear separation of concerns between the main client logic, authentication procedures, and cryptographic utilities. - **Asynchronous Operations**: All network I/O is asynchronous, using `async/await` to ensure the client is non-blocking and efficient. - **Fluent Configuration**: A builder pattern (`CircleWsClientBuilder`) is used for clear and flexible client construction. +- **Self-Managing Clients**: Each `CircleWsClient` handles its own lifecycle including connection, authentication, keep-alive, and reconnection logic internally. ## 2. Cross-Platform Implementation @@ -29,9 +30,19 @@ The `client_ws` crate is organized into the following key modules: - **`types.rs`**: Defines the core data structures used in authentication, such as `AuthError` and `AuthCredentials`. - **`crypto_utils.rs`**: A self-contained utility module for handling all `secp256k1` cryptographic operations, including key generation, public key derivation, and message signing. -## 4. Authentication Flow Deep Dive +## 4. Self-Managing Client Architecture -The `authenticate` method in `CircleWsClient` orchestrates the entire authentication process over the WebSocket connection. The sequence diagram below illustrates the internal interactions within the client during this flow. +Each `CircleWsClient` is designed to be completely self-managing, handling its entire lifecycle internally. This includes: + +- **Connection Management**: Establishing and maintaining WebSocket connections +- **Authentication**: Automatic secp256k1 authentication flow when private keys are provided +- **Keep-Alive**: Periodic health checks to ensure connection stability +- **Reconnection**: Automatic reconnection with exponential backoff on connection failures +- **Connection Status Tracking**: Internal state management for connection health + +### Connection Flow + +The `connect()` method orchestrates the complete connection and authentication process: ```mermaid sequenceDiagram @@ -46,27 +57,38 @@ sequenceDiagram User->>+Builder: build() Builder-->>-User: client - User->>+Client: authenticate() - Client->>Client: Check for private_key - Client->>+CryptoUtils: derive_public_key(private_key) - CryptoUtils-->>-Client: public_key + User->>+Client: connect() + + Note over Client: Self-managing connection process + Client->>Client: Establish WebSocket connection + Client->>Client: Start keep-alive loop + Client->>Client: Start reconnection handler + + alt Has Private Key + Client->>Client: Check for private_key + Client->>+CryptoUtils: derive_public_key(private_key) + CryptoUtils-->>-Client: public_key - Note over Client: Request nonce via WebSocket - Client->>+WsActor: JSON-RPC "fetch_nonce" (pubkey) - WsActor-->>-Client: JSON-RPC Response (nonce) + Note over Client: Request nonce via WebSocket + Client->>+WsActor: JSON-RPC "fetch_nonce" (pubkey) + WsActor-->>-Client: JSON-RPC Response (nonce) - Client->>+CryptoUtils: sign_message(private_key, nonce) - CryptoUtils-->>-Client: signature + Client->>+CryptoUtils: sign_message(private_key, nonce) + CryptoUtils-->>-Client: signature - Note over Client: Send credentials via WebSocket - Client->>+WsActor: JSON-RPC "authenticate" (pubkey, signature) - WsActor-->>-Client: JSON-RPC Response (authenticated: true/false) - - alt Authentication Successful - Client-->>-User: Ok(true) - else Authentication Fails - Client-->>-User: Ok(false) or Err(...) + Note over Client: Send credentials via WebSocket + Client->>+WsActor: JSON-RPC "authenticate" (pubkey, signature) + WsActor-->>-Client: JSON-RPC Response (authenticated: true/false) end + + Client-->>-User: Connection established and authenticated ``` -This architecture ensures that the cryptographic operations are isolated, the platform-specific code is cleanly separated, and the main client struct provides a simple and consistent API to the end-user. \ No newline at end of file +### Self-Management Features + +- **Automatic Keep-Alive**: Each client runs its own keep-alive loop to detect connection issues +- **Transparent Reconnection**: Failed connections are automatically retried with exponential backoff +- **Status Monitoring**: Connection status is tracked internally and can be queried via `is_connected()` +- **Resource Cleanup**: Proper cleanup of resources when clients are dropped + +This architecture ensures that the cryptographic operations are isolated, the platform-specific code is cleanly separated, and each client is completely autonomous in managing its connection lifecycle. \ No newline at end of file diff --git a/src/client_ws/Cargo.toml b/src/client_ws/Cargo.toml index 46d0c66..1dccb4c 100644 --- a/src/client_ws/Cargo.toml +++ b/src/client_ws/Cargo.toml @@ -21,6 +21,7 @@ http = "0.2" # Authentication dependencies hex = { workspace = true } rand = { workspace = true } +getrandom = { version = "0.2", features = ["js"] } # Optional crypto dependencies (enabled by default) secp256k1 = { workspace = true, optional = true } @@ -32,6 +33,7 @@ circle_ws_lib = { path = "../server", optional = true } # WASM-specific dependencies [target.'cfg(target_arch = "wasm32")'.dependencies] gloo-net = { version = "0.4.0", features = ["websocket"] } +gloo-timers = { version = "0.3.0", features = ["futures"] } wasm-bindgen-futures = "0.4" gloo-console = "0.3.0" wasm-bindgen = "0.2" diff --git a/src/client_ws/README.md b/src/client_ws/README.md index d97cf54..dc029d3 100644 --- a/src/client_ws/README.md +++ b/src/client_ws/README.md @@ -1,14 +1,26 @@ # Circle WebSocket Client -A Rust library for connecting to Circle WebSocket servers with authentication support. +A Rust library for connecting to Circle WebSocket servers with authentication support and self-managing connection lifecycle. ## Features -- Cross-platform WebSocket client (native and WASM) -- secp256k1 cryptographic authentication -- JSON-RPC 2.0 protocol support -- Async/await interface with Tokio -- Built on tokio-tungstenite for reliable WebSocket connections +- **Cross-platform WebSocket client** (native and WASM) +- **secp256k1 cryptographic authentication** with automatic challenge-response flow +- **JSON-RPC 2.0 protocol support** for server communication +- **Self-managing connections** with automatic keep-alive and reconnection +- **Async/await interface** with modern Rust async patterns +- **Built on tokio-tungstenite** for reliable WebSocket connections (native) +- **Built on gloo-net** for WASM browser compatibility + +## Architecture + +Each `CircleWsClient` is completely self-managing: + +- **Automatic Connection Management**: Handles WebSocket connection establishment +- **Built-in Authentication**: Seamless secp256k1 authentication when private keys are provided +- **Keep-Alive Monitoring**: Periodic health checks to detect connection issues +- **Transparent Reconnection**: Automatic reconnection with exponential backoff on failures +- **Connection Status Tracking**: Real-time connection state monitoring ## Usage @@ -19,7 +31,7 @@ Add this to your `Cargo.toml`: circle_client_ws = { path = "../client_ws" } ``` -### Basic Example +### Basic Example (Self-Managing Connection) ```rust use circle_client_ws::CircleWsClientBuilder; @@ -28,27 +40,73 @@ use circle_client_ws::CircleWsClientBuilder; async fn main() -> Result<(), Box> { // Create client with private key let private_key = "your_private_key_hex"; - let client = CircleWsClientBuilder::new() - .with_private_key(private_key)? + let mut client = CircleWsClientBuilder::new("ws://localhost:8080".to_string()) + .with_keypair(private_key.to_string()) .build(); - // Connect and authenticate - client.connect("ws://localhost:8080").await?; + // Connect - this handles authentication, keep-alive, and reconnection automatically + client.connect().await?; - // Use the authenticated client... + // Check connection status + println!("Connected: {}", client.is_connected()); + + // Execute scripts on the server + let result = client.play("\"Hello from client!\"".to_string()).await?; + println!("Script result: {:?}", result); + + // Client automatically maintains connection in the background + // No manual keep-alive or reconnection logic needed Ok(()) } ``` -### Authentication Flow +### Self-Managing Features -The client automatically handles the secp256k1 authentication flow: -1. Connects to WebSocket server -2. Receives authentication challenge -3. Signs challenge with private key -4. Sends signed response -5. Receives authentication confirmation +The client automatically handles: + +1. **Connection Establishment**: WebSocket connection to the server +2. **Authentication Flow**: secp256k1 challenge-response authentication +3. **Keep-Alive Monitoring**: Periodic health checks to ensure connection stability +4. **Automatic Reconnection**: Transparent reconnection on connection failures +5. **Resource Management**: Proper cleanup when the client is dropped + +### Connection Status Monitoring + +```rust +// Check if the client is currently connected +if client.is_connected() { + println!("Client is connected and healthy"); +} else { + println!("Client is disconnected or reconnecting"); +} + +// Get detailed connection status +let status = client.get_connection_status(); +println!("Connection status: {}", status); +``` + +### WASM Usage + +For WASM applications, the client works seamlessly in browsers: + +```rust +use circle_client_ws::CircleWsClientBuilder; +use wasm_bindgen_futures::spawn_local; + +// In a WASM context +spawn_local(async move { + let mut client = CircleWsClientBuilder::new("ws://localhost:8080".to_string()) + .build(); + + // Self-managing connection works the same in WASM + if let Ok(_) = client.connect().await { + // Client automatically handles keep-alive and reconnection + let result = client.play("\"WASM client connected!\"".to_string()).await; + // Handle result... + } +}); +``` ## Binary Tool @@ -56,12 +114,28 @@ A command-line binary is also available for interactive use and script execution ## Platform Support -- **Native**: Full support on all Rust-supported platforms -- **WASM**: Browser support with web-sys bindings +- **Native**: Full support on all Rust-supported platforms with tokio-tungstenite +- **WASM**: Browser support with gloo-net WebSocket bindings ## Dependencies -- `tokio-tungstenite`: WebSocket implementation -- `secp256k1`: Cryptographic operations -- `serde`: JSON serialization -- `uuid`: Request ID generation +### Core Dependencies +- `serde`: JSON serialization and deserialization +- `uuid`: Request ID generation for JSON-RPC +- `futures-util`: Async utilities for WebSocket handling +- `thiserror`: Error handling and propagation + +### Platform-Specific Dependencies + +#### Native (tokio-based) +- `tokio-tungstenite`: Robust WebSocket implementation +- `tokio`: Async runtime for connection management + +#### WASM (browser-based) +- `gloo-net`: WebSocket bindings for browsers +- `gloo-timers`: Timer utilities for keep-alive functionality +- `wasm-bindgen-futures`: Async support in WASM + +### Cryptographic Dependencies (optional) +- `secp256k1`: Elliptic curve cryptography for authentication +- `sha3`: Hashing for cryptographic operations diff --git a/src/client_ws/cmd/README.md b/src/client_ws/cmd/README.md index c91d92b..71174c9 100644 --- a/src/client_ws/cmd/README.md +++ b/src/client_ws/cmd/README.md @@ -1,12 +1,12 @@ -# Circles WebSocket Client Binary +# Circles WebSocket Client -A command-line WebSocket client for connecting to Circles servers with authentication support. +A WebSocket client for connecting to Circles servers with authentication support. Available in both CLI and WebAssembly (WASM) versions. -## Binary: `circles_client` +## CLI Usage ### Installation -Build the binary: +Build the CLI binary: ```bash cargo build --bin circles_client --release ``` @@ -49,6 +49,53 @@ circles_client -vv ws://localhost:8080 - **Verbosity Control**: Use `-v` flags to increase logging detail - **Cross-platform**: Works on all platforms supported by Rust and tokio-tungstenite +## WebAssembly (WASM) Usage + +### Build and Serve + +1. Install Trunk: +```bash +cargo install trunk +``` + +2. Build the WASM version: +```bash +trunk build --release +``` + +3. Serve the application: +```bash +trunk serve +``` + +The application will be available at `http://localhost:8080` + +### Usage in Browser + +1. Open the served page in your browser +2. Enter the WebSocket server URL +3. Choose either: + - Execute a Rhai script directly + - Enter interactive mode (type 'exit' or 'quit' to leave) + +### Features + +- **Browser Integration**: Uses browser's WebSocket implementation +- **Interactive Mode**: Browser-based input/output using prompts +- **Error Handling**: Browser console logging +- **Cross-browser**: Works in all modern browsers supporting WebAssembly + +## Common Features + +Both versions share the same core functionality: + +- **WebSocket Connection**: Connects to Circles WebSocket server +- **Authentication**: Handles secp256k1 authentication +- **Script Execution**: Executes Rhai scripts +- **Interactive Mode**: Provides REPL-like interface +- **Error Handling**: Comprehensive error reporting +- **Logging**: Detailed logging at different verbosity levels + ### Interactive Mode When run without `-s` or `-f` flags, the client enters interactive mode where you can: diff --git a/src/client_ws/cmd/main.rs b/src/client_ws/cmd/main.rs index 233768b..6dc72eb 100644 --- a/src/client_ws/cmd/main.rs +++ b/src/client_ws/cmd/main.rs @@ -1,12 +1,30 @@ +#![cfg_attr(target_arch = "wasm32", no_main)] + use circle_client_ws::CircleWsClientBuilder; -use clap::{Arg, ArgAction, Command}; -use dotenv::dotenv; -use env_logger; -use log::{error, info}; +#[cfg(not(target_arch = "wasm32"))] use std::env; -use std::io::{self, Write}; +#[cfg(not(target_arch = "wasm32"))] use std::path::Path; +#[cfg(not(target_arch = "wasm32"))] +use std::io::{self, Write}; + +#[cfg(target_arch = "wasm32")] +use wasm_bindgen::prelude::*; +#[cfg(target_arch = "wasm32")] +use web_sys::{console, window}; +#[cfg(target_arch = "wasm32")] +use wasm_bindgen_futures::spawn_local; + +#[cfg(not(target_arch = "wasm32"))] +use clap::{Arg, ArgAction, Command}; +#[cfg(not(target_arch = "wasm32"))] +use dotenv::dotenv; +#[cfg(not(target_arch = "wasm32"))] +use env_logger; +#[cfg(not(target_arch = "wasm32"))] use tokio; +#[cfg(not(target_arch = "wasm32"))] +use log::{error, info}; #[derive(Debug)] struct Args { @@ -17,6 +35,7 @@ struct Args { no_timestamp: bool, } +#[cfg(not(target_arch = "wasm32"))] fn parse_args() -> Args { let matches = Command::new("circles_client") .version("0.1.0") @@ -67,6 +86,7 @@ fn parse_args() -> Args { } } +#[cfg(not(target_arch = "wasm32"))] fn setup_logging(verbose: u8, no_timestamp: bool) { let log_level = match verbose { 0 => "warn,circle_client_ws=info", @@ -87,6 +107,7 @@ fn setup_logging(verbose: u8, no_timestamp: bool) { } } +#[cfg(not(target_arch = "wasm32"))] fn load_private_key() -> Result> { // Try to load from .env file first if let Ok(_) = dotenv() { @@ -107,42 +128,82 @@ fn load_private_key() -> Result> { Err("PRIVATE_KEY not found in environment or .env files".into()) } +#[cfg(target_arch = "wasm32")] async fn run_interactive_mode(client: circle_client_ws::CircleWsClient) -> Result<(), Box> { - info!("Entering interactive mode. Type 'exit' or 'quit' to leave."); - println!("šŸ”„ Interactive mode - Enter Rhai scripts (type 'exit' or 'quit' to leave):"); + console::log_1(&"Entering interactive mode. Type 'exit' or 'quit' to leave.".into()); + console::log_1(&"šŸ”„ Interactive mode - Enter Rhai scripts (type 'exit' or 'quit' to leave):\n".into()); - loop { - print!("rhai> "); - io::stdout().flush()?; + // In wasm32, we need to use browser's console for input/output + let window = window().expect("Window not available"); + let input = window.prompt_with_message("Enter Rhai script (or 'exit' to quit):") + .map_err(|e| format!("Failed to get input: {:#?}", e))? // Use debug formatting + .unwrap_or_default(); - let mut input = String::new(); - io::stdin().read_line(&mut input)?; - - let script = input.trim(); - - if script.is_empty() { - continue; + // Handle empty or exit cases + if input == "exit" || input == "quit" { + console::log_1(&"šŸ‘‹ Goodbye!".into()); + return Ok(()); + } + + // Execute the script + match client.play(input).await { + Ok(result) => { + console::log_1(&format!("šŸ“¤ Result: {}", result.output).into()); } - - if script == "exit" || script == "quit" { - println!("šŸ‘‹ Goodbye!"); - break; - } - - match client.play(script.to_string()).await { - Ok(result) => { - println!("šŸ“¤ Result: {}", result.output); - } - Err(e) => { - error!("āŒ Script execution failed: {}", e); - println!("āŒ Error: {}", e); - } + Err(e) => { + console::log_1(&format!("āŒ Script execution failed: {}", e).into()); } } Ok(()) } +#[cfg(target_arch = "wasm32")] +async fn execute_script(client: circle_client_ws::CircleWsClient, script: String) -> Result<(), Box> { + console::log_1(&format!("Executing script: {}", script).into()); + + match client.play(script).await { + Ok(result) => { + console::log_1(&result.output.into()); + Ok(()) + } + Err(e) => { + console::log_1(&format!("Script execution failed: {}", e).into()); + Err(e.into()) + } + } +} + +#[cfg(target_arch = "wasm32")] +pub async fn start_client(url: &str, script: Option) -> Result<(), Box> { + // Build client + let mut client = CircleWsClientBuilder::new(url.to_string()) + .build(); + + // Connect to WebSocket server + console::log_1(&"šŸ”Œ Connecting to WebSocket server...".into()); + if let Err(e) = client.connect().await { + console::log_1(&format!("āŒ Failed to connect: {}", e).into()); + return Err(e.into()); + } + console::log_1(&"āœ… Connected successfully".into()); + + // Authenticate with server + if let Err(e) = client.authenticate().await { + console::log_1(&format!("āŒ Authentication failed: {}", e).into()); + return Err(e.into()); + } + console::log_1(&"āœ… Authentication successful".into()); + + // Handle script execution + if let Some(script) = script { + execute_script(client, script).await + } else { + run_interactive_mode(client).await + } +} + +#[cfg(not(target_arch = "wasm32"))] async fn execute_script(client: circle_client_ws::CircleWsClient, script: String) -> Result<(), Box> { info!("Executing script: {}", script); @@ -158,11 +219,43 @@ async fn execute_script(client: circle_client_ws::CircleWsClient, script: String } } +#[cfg(not(target_arch = "wasm32"))] async fn load_script_from_file(path: &str) -> Result> { let script = tokio::fs::read_to_string(path).await?; Ok(script) } +#[cfg(not(target_arch = "wasm32"))] +async fn run_interactive_mode(client: circle_client_ws::CircleWsClient) -> Result<(), Box> { + println!("\nšŸ”„ Interactive mode - Enter Rhai scripts (type 'exit' or 'quit' to leave):\n"); + + loop { + print!("Enter Rhai script (or 'exit' to quit): "); + io::stdout().flush()?; + + let mut input = String::new(); + io::stdin().read_line(&mut input)?; + let input = input.trim().to_string(); + + if input == "exit" || input == "quit" { + println!("\nšŸ‘‹ Goodbye!"); + return Ok(()); + } + + match client.play(input).await { + Ok(result) => { + println!("\nšŸ“¤ Result: {}", result.output); + } + Err(e) => { + error!("āŒ Script execution failed: {}", e); + println!("\nāŒ Script execution failed: {}", e); + } + } + println!(); + } +} + +#[cfg(not(target_arch = "wasm32"))] #[tokio::main] async fn main() -> Result<(), Box> { let args = parse_args(); diff --git a/src/client_ws/src/lib.rs b/src/client_ws/src/lib.rs index 4121990..e3079c4 100644 --- a/src/client_ws/src/lib.rs +++ b/src/client_ws/src/lib.rs @@ -148,6 +148,7 @@ impl CircleWsClientBuilder { #[cfg(not(target_arch = "wasm32"))] task_handle: None, private_key: self.private_key, + is_connected: Arc::new(Mutex::new(false)), } } } @@ -158,24 +159,61 @@ pub struct CircleWsClient { #[cfg(not(target_arch = "wasm32"))] task_handle: Option>, private_key: Option, + is_connected: Arc>, +} + +impl CircleWsClient { + /// Get the connection status + pub fn get_connection_status(&self) -> String { + if *self.is_connected.lock().unwrap() { + "Connected".to_string() + } else { + "Disconnected".to_string() + } + } + + /// Check if the client is connected + pub fn is_connected(&self) -> bool { + *self.is_connected.lock().unwrap() + } } impl CircleWsClient { pub async fn authenticate(&mut self) -> Result { + info!("šŸ” [{}] Starting authentication process...", self.ws_url); + let private_key = self .private_key .as_ref() .ok_or(CircleWsClientError::AuthNoKeyPair)?; + + info!("šŸ”‘ [{}] Deriving public key from private key...", self.ws_url); let public_key = auth::derive_public_key(private_key)?; + info!("āœ… [{}] Public key derived: {}...", self.ws_url, &public_key[..8]); + info!("šŸŽ« [{}] Fetching authentication nonce...", self.ws_url); let nonce = self.fetch_nonce(&public_key).await?; - let signature = auth::sign_message(private_key, &nonce)?; + info!("āœ… [{}] Nonce received: {}...", self.ws_url, &nonce[..8]); - self.authenticate_with_signature(&public_key, &signature) - .await + info!("āœļø [{}] Signing nonce with private key...", self.ws_url); + let signature = auth::sign_message(private_key, &nonce)?; + info!("āœ… [{}] Signature created: {}...", self.ws_url, &signature[..8]); + + info!("šŸ”’ [{}] Submitting authentication credentials...", self.ws_url); + let result = self.authenticate_with_signature(&public_key, &signature).await?; + + if result { + info!("šŸŽ‰ [{}] Authentication successful!", self.ws_url); + } else { + error!("āŒ [{}] Authentication failed - server rejected credentials", self.ws_url); + } + + Ok(result) } async fn fetch_nonce(&self, pubkey: &str) -> Result { + info!("šŸ“” [{}] Sending fetch_nonce request for pubkey: {}...", self.ws_url, &pubkey[..8]); + let params = FetchNonceParams { pubkey: pubkey.to_string(), }; @@ -183,6 +221,7 @@ impl CircleWsClient { let res = self.send_request(req).await?; if let Some(err) = res.error { + error!("āŒ [{}] fetch_nonce failed: {} (code: {})", self.ws_url, err.message, err.code); return Err(CircleWsClientError::JsonRpcError { code: err.code, message: err.message, @@ -191,6 +230,7 @@ impl CircleWsClient { } let nonce_res: FetchNonceResponse = serde_json::from_value(res.result.unwrap_or_default())?; + info!("āœ… [{}] fetch_nonce successful, nonce length: {}", self.ws_url, nonce_res.nonce.len()); Ok(nonce_res.nonce) } @@ -199,6 +239,8 @@ impl CircleWsClient { pubkey: &str, signature: &str, ) -> Result { + info!("šŸ“” [{}] Sending authenticate request with signature...", self.ws_url); + let params = AuthCredentialsParams { pubkey: pubkey.to_string(), signature: signature.to_string(), @@ -207,6 +249,7 @@ impl CircleWsClient { let res = self.send_request(req).await?; if let Some(err) = res.error { + error!("āŒ [{}] authenticate failed: {} (code: {})", self.ws_url, err.message, err.code); return Err(CircleWsClientError::JsonRpcError { code: err.code, message: err.message, @@ -214,10 +257,18 @@ impl CircleWsClient { }); } - Ok(res + let authenticated = res .result .and_then(|v| v.get("authenticated").and_then(|v| v.as_bool())) - .unwrap_or(false)) + .unwrap_or(false); + + if authenticated { + info!("āœ… [{}] authenticate request successful - server confirmed authentication", self.ws_url); + } else { + error!("āŒ [{}] authenticate request failed - server returned false", self.ws_url); + } + + Ok(authenticated) } fn create_request( @@ -275,17 +326,19 @@ impl CircleWsClient { pub async fn connect(&mut self) -> Result<(), CircleWsClientError> { if self.internal_tx.is_some() { - info!("Client already connected or connecting."); + info!("šŸ”„ [{}] Client already connected or connecting", self.ws_url); return Ok(()); } + info!("šŸš€ [{}] Starting self-managed WebSocket connection with keep-alive and reconnection...", self.ws_url); let (internal_tx, internal_rx) = mpsc::channel::(32); self.internal_tx = Some(internal_tx); - // Use the URL as provided - support both ws:// and wss:// + // Clone necessary data for the task let connection_url = self.ws_url.clone(); - let is_secure = connection_url.starts_with("wss://"); - info!("šŸ”— Connecting to WebSocket: {} ({})", connection_url, if is_secure { "WSS/TLS" } else { "WS/Plain" }); + let private_key = self.private_key.clone(); + let is_connected = self.is_connected.clone(); + info!("šŸ”— [{}] Will handle connection, authentication, keep-alive, and reconnection internally", connection_url); // Pending requests: map request_id to a oneshot sender for the response let pending_requests: Arc< @@ -301,191 +354,135 @@ impl CircleWsClient { let log_url = connection_url.clone(); let task = async move { - #[cfg(target_arch = "wasm32")] - let ws_result = WebSocket::open(&connection_url); + // Main connection loop with reconnection logic + loop { + info!("šŸ”„ [{}] Starting connection attempt...", log_url); + + // Reset connection status + *is_connected.lock().unwrap() = false; + + // Clone connection_url for this iteration to avoid move issues + let connection_url_clone = connection_url.clone(); + + // Establish WebSocket connection + #[cfg(target_arch = "wasm32")] + let ws_result = WebSocket::open(&connection_url_clone); - #[cfg(not(target_arch = "wasm32"))] - let connect_attempt = async { - // Check if this is a secure WebSocket connection - if connection_url.starts_with("wss://") { - // For WSS connections, use a custom TLS connector that accepts self-signed certificates - // This is for development/demo purposes only - use tokio_tungstenite::tungstenite::client::IntoClientRequest; - - let request = connection_url.into_client_request() - .map_err(|e| CircleWsClientError::ConnectionError(format!("Invalid URL: {}", e)))?; - - // Create a native-tls connector that accepts invalid certificates (for development) - let tls_connector = native_tls::TlsConnector::builder() - .danger_accept_invalid_certs(true) - .danger_accept_invalid_hostnames(true) - .build() - .map_err(|e| CircleWsClientError::ConnectionError(format!("TLS connector creation failed: {}", e)))?; - - let connector = Connector::NativeTls(tls_connector); - - warn!("āš ļø DEVELOPMENT MODE: Accepting self-signed certificates (NOT for production!)"); - connect_async_tls_with_config(request, None, false, Some(connector)) - .await - .map_err(|e| CircleWsClientError::ConnectionError(format!("WSS connection failed: {}", e))) - } else { - // For regular WS connections, use the standard method - connect_async(&connection_url) - .await - .map_err(|e| CircleWsClientError::ConnectionError(format!("WS connection failed: {}", e))) - } - }; - #[cfg(not(target_arch = "wasm32"))] - let ws_result = connect_attempt.await; + #[cfg(not(target_arch = "wasm32"))] + let connect_attempt = async { + // Check if this is a secure WebSocket connection + if connection_url_clone.starts_with("wss://") { + // For WSS connections, use a custom TLS connector that accepts self-signed certificates + // This is for development/demo purposes only + use tokio_tungstenite::tungstenite::client::IntoClientRequest; + + let request = connection_url_clone.as_str().into_client_request() + .map_err(|e| CircleWsClientError::ConnectionError(format!("Invalid URL: {}", e)))?; + + // Create a native-tls connector that accepts invalid certificates (for development) + let tls_connector = native_tls::TlsConnector::builder() + .danger_accept_invalid_certs(true) + .danger_accept_invalid_hostnames(true) + .build() + .map_err(|e| CircleWsClientError::ConnectionError(format!("TLS connector creation failed: {}", e)))?; + + let connector = Connector::NativeTls(tls_connector); + + warn!("āš ļø DEVELOPMENT MODE: Accepting self-signed certificates (NOT for production!)"); + connect_async_tls_with_config(request, None, false, Some(connector)) + .await + .map_err(|e| CircleWsClientError::ConnectionError(format!("WSS connection failed: {}", e))) + } else { + // For regular WS connections, use the standard method + connect_async(&connection_url_clone) + .await + .map_err(|e| CircleWsClientError::ConnectionError(format!("WS connection failed: {}", e))) + } + }; + #[cfg(not(target_arch = "wasm32"))] + let ws_result = connect_attempt.await; + + match ws_result { + Ok(ws_conn_maybe_response) => { + #[cfg(target_arch = "wasm32")] + let ws_conn = ws_conn_maybe_response; + #[cfg(not(target_arch = "wasm32"))] + let (ws_conn, _) = ws_conn_maybe_response; - match ws_result { - Ok(ws_conn_maybe_response) => { - #[cfg(target_arch = "wasm32")] - let ws_conn = ws_conn_maybe_response; - #[cfg(not(target_arch = "wasm32"))] - let (ws_conn, _) = ws_conn_maybe_response; - - info!("Successfully connected to WebSocket: {}", log_url); - let (mut ws_tx, mut ws_rx) = ws_conn.split(); - let mut internal_rx_fused = internal_rx.fuse(); - - loop { - futures_util::select! { - // Handle messages from the client's public methods (e.g., play) - internal_msg = internal_rx_fused.next().fuse() => { - match internal_msg { - Some(InternalWsMessage::SendJsonRpc(req, response_sender)) => { - let req_id = req.id.clone(); - match serde_json::to_string(&req) { - Ok(req_str) => { - debug!("Sending JSON-RPC request (ID: {}): {}", req_id, req_str); - - #[cfg(target_arch = "wasm32")] - let send_res = ws_tx.send(GlooWsMessage::Text(req_str)).await; - #[cfg(not(target_arch = "wasm32"))] - let send_res = ws_tx.send(TungsteniteWsMessage::Text(req_str)).await; - - if let Err(e) = send_res { - error!("WebSocket send error for request ID {}: {:?}", req_id, e); - let _ = response_sender.send(Err(CircleWsClientError::SendError(e.to_string()))); - } else { - // Store the sender to await the response - task_pending_requests.lock().unwrap().insert(req_id, response_sender); - } - } - Err(e) => { - error!("Failed to serialize request ID {}: {}", req_id, e); - let _ = response_sender.send(Err(CircleWsClientError::JsonError(e))); - } - } - } - Some(InternalWsMessage::Close) => { - info!("Close message received internally, closing WebSocket."); - let _ = ws_tx.close().await; - break; - } - None => { // internal_rx closed, meaning client was dropped - info!("Internal MPSC channel closed, WebSocket task shutting down."); - let _ = ws_tx.close().await; - break; - } - } - }, - - // Handle messages received from the WebSocket server - ws_msg_res = ws_rx.next().fuse() => { - match ws_msg_res { - Some(Ok(msg)) => { - #[cfg(target_arch = "wasm32")] - match msg { - GlooWsMessage::Text(text) => { - debug!("Received WebSocket message: {}", text); - // ... (parse logic as before) - match serde_json::from_str::(&text) { - Ok(response) => { - if let Some(sender) = task_pending_requests.lock().unwrap().remove(&response.id) { - if let Err(failed_send_val) = sender.send(Ok(response)) { - if let Ok(resp_for_log) = failed_send_val { warn!("Failed to send response to waiting task for ID: {}", resp_for_log.id); } - else { warn!("Failed to send response to waiting task, and also failed to get original response for logging.");} - } - } else { warn!("Received response for unknown request ID or unsolicited message: {:?}", response); } - } - Err(e) => { error!("Failed to parse JSON-RPC response: {}. Raw: {}", e, text); } - } - } - GlooWsMessage::Bytes(_) => { - debug!("Received binary WebSocket message (WASM)."); - } - } - #[cfg(not(target_arch = "wasm32"))] - match msg { - TungsteniteWsMessage::Text(text) => { - debug!("Received WebSocket message: {}", text); - // ... (parse logic as before) - match serde_json::from_str::(&text) { - Ok(response) => { - if let Some(sender) = task_pending_requests.lock().unwrap().remove(&response.id) { - if let Err(failed_send_val) = sender.send(Ok(response)) { - if let Ok(resp_for_log) = failed_send_val { warn!("Failed to send response to waiting task for ID: {}", resp_for_log.id); } - else { warn!("Failed to send response to waiting task, and also failed to get original response for logging.");} - } - } else { warn!("Received response for unknown request ID or unsolicited message: {:?}", response); } - } - Err(e) => { error!("Failed to parse JSON-RPC response: {}. Raw: {}", e, text); } - } - } - TungsteniteWsMessage::Binary(_) => { - debug!("Received binary WebSocket message (Native)."); - } - TungsteniteWsMessage::Ping(_) | TungsteniteWsMessage::Pong(_) => { - debug!("Received Ping/Pong (Native)."); - } - TungsteniteWsMessage::Close(_) => { - info!("WebSocket connection closed by server (Native)."); - break; - } - TungsteniteWsMessage::Frame(_) => { - debug!("Received Frame (Native) - not typically handled directly."); - } - } - } - Some(Err(e)) => { - error!("WebSocket receive error: {:?}", e); - break; // Exit loop on receive error - } - None => { // WebSocket stream closed - info!("WebSocket connection closed by server (stream ended)."); - break; - } - } + // For WASM, WebSocket::open() always succeeds even if server is down + // We'll start as "connecting" and detect failures through timeouts + #[cfg(target_arch = "wasm32")] + info!("šŸ”„ [{}] WebSocket object created, testing actual connectivity...", log_url); + #[cfg(not(target_arch = "wasm32"))] + { + info!("āœ… [{}] WebSocket connection established successfully", log_url); + *is_connected.lock().unwrap() = true; + } + + // Handle authentication if private key is provided + let auth_success = if let Some(ref _pk) = private_key { + info!("šŸ” [{}] Authentication will be handled by separate authenticate() call", log_url); + true // For now, assume auth will be handled separately + } else { + info!("ā„¹ļø [{}] No private key provided, skipping authentication", log_url); + true + }; + + if auth_success { + // Start the main message handling loop with keep-alive + let disconnect_reason = Self::handle_connection_with_keepalive( + ws_conn, + internal_rx, + &task_pending_requests, + &log_url, + &is_connected + ).await; + + info!("šŸ”Œ [{}] Connection ended: {}", log_url, disconnect_reason); + + // Check if this was a manual disconnect + if disconnect_reason == "Manual close requested" { + break; // Don't reconnect on manual close } + + // If we reach here, we need to recreate internal_rx for the next iteration + // But since internal_rx was moved, we need to break out of the loop + break; } } - // Cleanup pending requests on exit - task_pending_requests - .lock() - .unwrap() - .drain() - .for_each(|(_, sender)| { - let _ = sender.send(Err(CircleWsClientError::ConnectionError( - "WebSocket task terminated".to_string(), - ))); - }); + Err(e) => { + error!("āŒ [{}] WebSocket connection failed: {:?}", log_url, e); + } } - Err(e) => { - error!("Failed to connect to WebSocket: {:?}", e); - // Notify any waiting senders about the connection failure - internal_rx - .for_each(|msg| async { - if let InternalWsMessage::SendJsonRpc(_, response_sender) = msg { - let _ = response_sender - .send(Err(CircleWsClientError::ConnectionError(e.to_string()))); - } - }) - .await; + + // Reset connection status + *is_connected.lock().unwrap() = false; + + // Wait before reconnecting + info!("ā³ [{}] Waiting 5 seconds before reconnection attempt...", log_url); + #[cfg(target_arch = "wasm32")] + { + use gloo_timers::future::TimeoutFuture; + TimeoutFuture::new(5_000).await; + } + #[cfg(not(target_arch = "wasm32"))] + { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; } } - info!("WebSocket task finished."); + + // Cleanup pending requests on exit + task_pending_requests + .lock() + .unwrap() + .drain() + .for_each(|(_, sender)| { + let _ = sender.send(Err(CircleWsClientError::ConnectionError( + "WebSocket task terminated".to_string(), + ))); + }); + + info!("šŸ [{}] WebSocket task finished", log_url); }; #[cfg(target_arch = "wasm32")] @@ -497,6 +494,261 @@ impl CircleWsClient { Ok(()) } + + // Enhanced connection loop handler with keep-alive + #[cfg(target_arch = "wasm32")] + async fn handle_connection_with_keepalive( + ws_conn: WebSocket, + mut internal_rx: mpsc::Receiver, + pending_requests: &Arc>>>>, + log_url: &str, + is_connected: &Arc>, + ) -> String { + let (mut ws_tx, mut ws_rx) = ws_conn.split(); + let mut internal_rx_fused = internal_rx.fuse(); + + // Connection validation for WASM - test if connection actually works + let mut connection_test_timer = TimeoutFuture::new(2_000).fuse(); // 2 second timeout + let mut connection_validated = false; + + // Keep-alive timer - send ping every 30 seconds + use gloo_timers::future::TimeoutFuture; + let mut keep_alive_timer = TimeoutFuture::new(30_000).fuse(); + + // Send initial connection test ping + debug!("Sending initial connection test ping to {}", log_url); + let test_ping_res = ws_tx.send(GlooWsMessage::Text("ping".to_string())).await; + if let Err(e) = test_ping_res { + error!("āŒ [{}] Initial connection test failed: {:?}", log_url, e); + *is_connected.lock().unwrap() = false; + return format!("Initial connection test failed: {}", e); + } + + loop { + futures_util::select! { + // Connection test timeout - if no response in 2 seconds, connection failed + _ = connection_test_timer => { + if !connection_validated { + error!("āŒ [{}] Connection test failed - no response within 2 seconds", log_url); + *is_connected.lock().unwrap() = false; + return "Connection test timeout - server not responding".to_string(); + } + } + + // Handle messages from the client's public methods (e.g., play) + internal_msg = internal_rx_fused.next().fuse() => { + match internal_msg { + Some(InternalWsMessage::SendJsonRpc(req, response_sender)) => { + let req_id = req.id.clone(); + match serde_json::to_string(&req) { + Ok(req_str) => { + debug!("Sending JSON-RPC request (ID: {}): {}", req_id, req_str); + let send_res = ws_tx.send(GlooWsMessage::Text(req_str)).await; + if let Err(e) = send_res { + error!("WebSocket send error for request ID {}: {:?}", req_id, e); + // Connection failed - update status + *is_connected.lock().unwrap() = false; + let _ = response_sender.send(Err(CircleWsClientError::SendError(e.to_string()))); + } else { + // Store the sender to await the response + pending_requests.lock().unwrap().insert(req_id, response_sender); + } + } + Err(e) => { + error!("Failed to serialize request ID {}: {}", req_id, e); + let _ = response_sender.send(Err(CircleWsClientError::JsonError(e))); + } + } + } + Some(InternalWsMessage::Close) => { + info!("Close message received internally, closing WebSocket."); + let _ = ws_tx.close().await; + return "Manual close requested".to_string(); + } + None => { + info!("Internal MPSC channel closed, WebSocket task shutting down."); + let _ = ws_tx.close().await; + return "Internal channel closed".to_string(); + } + } + }, + + // Handle messages received from the WebSocket server + ws_msg_res = ws_rx.next().fuse() => { + match ws_msg_res { + Some(Ok(msg)) => { + // Any successful message confirms the connection is working + if !connection_validated { + info!("āœ… [{}] WebSocket connection validated - received message from server", log_url); + *is_connected.lock().unwrap() = true; + connection_validated = true; + } + + match msg { + GlooWsMessage::Text(text) => { + debug!("Received WebSocket message: {}", text); + Self::handle_received_message(&text, pending_requests); + } + GlooWsMessage::Bytes(_) => { + debug!("Received binary WebSocket message (WASM)."); + } + } + } + Some(Err(e)) => { + error!("WebSocket receive error: {:?}", e); + *is_connected.lock().unwrap() = false; + return format!("Receive error: {}", e); + } + None => { + info!("WebSocket connection closed by server (stream ended)."); + *is_connected.lock().unwrap() = false; + return "Server closed connection (stream ended)".to_string(); + } + } + } + + // Keep-alive timer - send ping every 30 seconds + _ = keep_alive_timer => { + // Only send ping if connection is validated + if connection_validated { + debug!("Sending keep-alive ping to {}", log_url); + let ping_str = "ping"; // Send simple plaintext ping + + let send_res = ws_tx.send(GlooWsMessage::Text(ping_str.to_string())).await; + if let Err(e) = send_res { + warn!("Keep-alive ping failed for {}: {:?}", log_url, e); + *is_connected.lock().unwrap() = false; + return format!("Keep-alive failed: {}", e); + } + } else { + debug!("Skipping keep-alive ping - connection not yet validated for {}", log_url); + } + + // Reset timer + keep_alive_timer = TimeoutFuture::new(30_000).fuse(); + } + } + } + } + + // Enhanced connection loop handler with keep-alive for native targets + #[cfg(not(target_arch = "wasm32"))] + async fn handle_connection_with_keepalive( + ws_conn: tokio_tungstenite::WebSocketStream>, + mut internal_rx: mpsc::Receiver, + pending_requests: &Arc>>>>, + log_url: &str, + _is_connected: &Arc>, + ) -> String { + let (mut ws_tx, mut ws_rx) = ws_conn.split(); + let mut internal_rx_fused = internal_rx.fuse(); + + loop { + futures_util::select! { + // Handle messages from the client's public methods (e.g., play) + internal_msg = internal_rx_fused.next().fuse() => { + match internal_msg { + Some(InternalWsMessage::SendJsonRpc(req, response_sender)) => { + let req_id = req.id.clone(); + match serde_json::to_string(&req) { + Ok(req_str) => { + debug!("Sending JSON-RPC request (ID: {}): {}", req_id, req_str); + let send_res = ws_tx.send(TungsteniteWsMessage::Text(req_str)).await; + if let Err(e) = send_res { + error!("WebSocket send error for request ID {}: {:?}", req_id, e); + let _ = response_sender.send(Err(CircleWsClientError::SendError(e.to_string()))); + } else { + // Store the sender to await the response + pending_requests.lock().unwrap().insert(req_id, response_sender); + } + } + Err(e) => { + error!("Failed to serialize request ID {}: {}", req_id, e); + let _ = response_sender.send(Err(CircleWsClientError::JsonError(e))); + } + } + } + Some(InternalWsMessage::Close) => { + info!("Close message received internally, closing WebSocket."); + let _ = ws_tx.close().await; + return "Manual close requested".to_string(); + } + None => { + info!("Internal MPSC channel closed, WebSocket task shutting down."); + let _ = ws_tx.close().await; + return "Internal channel closed".to_string(); + } + } + }, + + // Handle messages received from the WebSocket server + ws_msg_res = ws_rx.next().fuse() => { + match ws_msg_res { + Some(Ok(msg)) => { + match msg { + TungsteniteWsMessage::Text(text) => { + debug!("Received WebSocket message: {}", text); + Self::handle_received_message(&text, pending_requests); + } + TungsteniteWsMessage::Binary(_) => { + debug!("Received binary WebSocket message (Native)."); + } + TungsteniteWsMessage::Ping(_) | TungsteniteWsMessage::Pong(_) => { + debug!("Received Ping/Pong (Native)."); + } + TungsteniteWsMessage::Close(_) => { + info!("WebSocket connection closed by server (Native)."); + return "Server closed connection".to_string(); + } + TungsteniteWsMessage::Frame(_) => { + debug!("Received Frame (Native) - not typically handled directly."); + } + } + } + Some(Err(e)) => { + error!("WebSocket receive error: {:?}", e); + return format!("Receive error: {}", e); + } + None => { + info!("WebSocket connection closed by server (stream ended)."); + return "Server closed connection (stream ended)".to_string(); + } + } + } + } + } + } + + // Helper method to handle received messages + fn handle_received_message( + text: &str, + pending_requests: &Arc>>>>, + ) { + // Handle ping/pong messages - these are not JSON-RPC + if text.trim() == "pong" { + debug!("Received pong response"); + return; + } + + match serde_json::from_str::(text) { + Ok(response) => { + if let Some(sender) = pending_requests.lock().unwrap().remove(&response.id) { + if let Err(failed_send_val) = sender.send(Ok(response)) { + if let Ok(resp_for_log) = failed_send_val { + warn!("Failed to send response to waiting task for ID: {}", resp_for_log.id); + } else { + warn!("Failed to send response to waiting task, and also failed to get original response for logging."); + } + } + } else { + warn!("Received response for unknown request ID or unsolicited message: {:?}", response); + } + } + Err(e) => { + error!("Failed to parse JSON-RPC response: {}. Raw: {}", e, text); + } + } + } pub fn play( &self, diff --git a/src/server/src/lib.rs b/src/server/src/lib.rs index 8680189..42de5e8 100644 --- a/src/server/src/lib.rs +++ b/src/server/src/lib.rs @@ -421,6 +421,13 @@ impl StreamHandler> for CircleWs { Ok(ws::Message::Text(text)) => { debug!("WS Text for {}: {}", self.server_circle_name, text); + // Handle plaintext ping messages for keep-alive + if text.trim() == "ping" { + debug!("Received keep-alive ping from {}, responding with pong", self.server_circle_name); + ctx.text("pong"); + return; + } + match serde_json::from_str::(&text) { Ok(req) => { let client_rpc_id = req.id.clone().unwrap_or(Value::Null);