ws client - server improvements

This commit is contained in:
Timur Gordon 2025-07-21 20:57:58 +02:00
parent 29ff40d1a4
commit 70b5b336f1
7 changed files with 763 additions and 266 deletions

View File

@ -10,6 +10,7 @@ The `client_ws` is built on the following principles:
- **Modularity**: The crate is divided into logical modules, with a clear separation of concerns between the main client logic, authentication procedures, and cryptographic utilities.
- **Asynchronous Operations**: All network I/O is asynchronous, using `async/await` to ensure the client is non-blocking and efficient.
- **Fluent Configuration**: A builder pattern (`CircleWsClientBuilder`) is used for clear and flexible client construction.
- **Self-Managing Clients**: Each `CircleWsClient` handles its own lifecycle including connection, authentication, keep-alive, and reconnection logic internally.
## 2. Cross-Platform Implementation
@ -29,9 +30,19 @@ The `client_ws` crate is organized into the following key modules:
- **`types.rs`**: Defines the core data structures used in authentication, such as `AuthError` and `AuthCredentials`.
- **`crypto_utils.rs`**: A self-contained utility module for handling all `secp256k1` cryptographic operations, including key generation, public key derivation, and message signing.
## 4. Authentication Flow Deep Dive
## 4. Self-Managing Client Architecture
The `authenticate` method in `CircleWsClient` orchestrates the entire authentication process over the WebSocket connection. The sequence diagram below illustrates the internal interactions within the client during this flow.
Each `CircleWsClient` is designed to be completely self-managing, handling its entire lifecycle internally. This includes:
- **Connection Management**: Establishing and maintaining WebSocket connections
- **Authentication**: Automatic secp256k1 authentication flow when private keys are provided
- **Keep-Alive**: Periodic health checks to ensure connection stability
- **Reconnection**: Automatic reconnection with exponential backoff on connection failures
- **Connection Status Tracking**: Internal state management for connection health
### Connection Flow
The `connect()` method orchestrates the complete connection and authentication process:
```mermaid
sequenceDiagram
@ -46,27 +57,38 @@ sequenceDiagram
User->>+Builder: build()
Builder-->>-User: client
User->>+Client: authenticate()
Client->>Client: Check for private_key
Client->>+CryptoUtils: derive_public_key(private_key)
CryptoUtils-->>-Client: public_key
User->>+Client: connect()
Note over Client: Self-managing connection process
Client->>Client: Establish WebSocket connection
Client->>Client: Start keep-alive loop
Client->>Client: Start reconnection handler
alt Has Private Key
Client->>Client: Check for private_key
Client->>+CryptoUtils: derive_public_key(private_key)
CryptoUtils-->>-Client: public_key
Note over Client: Request nonce via WebSocket
Client->>+WsActor: JSON-RPC "fetch_nonce" (pubkey)
WsActor-->>-Client: JSON-RPC Response (nonce)
Note over Client: Request nonce via WebSocket
Client->>+WsActor: JSON-RPC "fetch_nonce" (pubkey)
WsActor-->>-Client: JSON-RPC Response (nonce)
Client->>+CryptoUtils: sign_message(private_key, nonce)
CryptoUtils-->>-Client: signature
Client->>+CryptoUtils: sign_message(private_key, nonce)
CryptoUtils-->>-Client: signature
Note over Client: Send credentials via WebSocket
Client->>+WsActor: JSON-RPC "authenticate" (pubkey, signature)
WsActor-->>-Client: JSON-RPC Response (authenticated: true/false)
alt Authentication Successful
Client-->>-User: Ok(true)
else Authentication Fails
Client-->>-User: Ok(false) or Err(...)
Note over Client: Send credentials via WebSocket
Client->>+WsActor: JSON-RPC "authenticate" (pubkey, signature)
WsActor-->>-Client: JSON-RPC Response (authenticated: true/false)
end
Client-->>-User: Connection established and authenticated
```
This architecture ensures that the cryptographic operations are isolated, the platform-specific code is cleanly separated, and the main client struct provides a simple and consistent API to the end-user.
### Self-Management Features
- **Automatic Keep-Alive**: Each client runs its own keep-alive loop to detect connection issues
- **Transparent Reconnection**: Failed connections are automatically retried with exponential backoff
- **Status Monitoring**: Connection status is tracked internally and can be queried via `is_connected()`
- **Resource Cleanup**: Proper cleanup of resources when clients are dropped
This architecture ensures that the cryptographic operations are isolated, the platform-specific code is cleanly separated, and each client is completely autonomous in managing its connection lifecycle.

View File

@ -21,6 +21,7 @@ http = "0.2"
# Authentication dependencies
hex = { workspace = true }
rand = { workspace = true }
getrandom = { version = "0.2", features = ["js"] }
# Optional crypto dependencies (enabled by default)
secp256k1 = { workspace = true, optional = true }
@ -32,6 +33,7 @@ circle_ws_lib = { path = "../server", optional = true }
# WASM-specific dependencies
[target.'cfg(target_arch = "wasm32")'.dependencies]
gloo-net = { version = "0.4.0", features = ["websocket"] }
gloo-timers = { version = "0.3.0", features = ["futures"] }
wasm-bindgen-futures = "0.4"
gloo-console = "0.3.0"
wasm-bindgen = "0.2"

View File

@ -1,14 +1,26 @@
# Circle WebSocket Client
A Rust library for connecting to Circle WebSocket servers with authentication support.
A Rust library for connecting to Circle WebSocket servers with authentication support and self-managing connection lifecycle.
## Features
- Cross-platform WebSocket client (native and WASM)
- secp256k1 cryptographic authentication
- JSON-RPC 2.0 protocol support
- Async/await interface with Tokio
- Built on tokio-tungstenite for reliable WebSocket connections
- **Cross-platform WebSocket client** (native and WASM)
- **secp256k1 cryptographic authentication** with automatic challenge-response flow
- **JSON-RPC 2.0 protocol support** for server communication
- **Self-managing connections** with automatic keep-alive and reconnection
- **Async/await interface** with modern Rust async patterns
- **Built on tokio-tungstenite** for reliable WebSocket connections (native)
- **Built on gloo-net** for WASM browser compatibility
## Architecture
Each `CircleWsClient` is completely self-managing:
- **Automatic Connection Management**: Handles WebSocket connection establishment
- **Built-in Authentication**: Seamless secp256k1 authentication when private keys are provided
- **Keep-Alive Monitoring**: Periodic health checks to detect connection issues
- **Transparent Reconnection**: Automatic reconnection with exponential backoff on failures
- **Connection Status Tracking**: Real-time connection state monitoring
## Usage
@ -19,7 +31,7 @@ Add this to your `Cargo.toml`:
circle_client_ws = { path = "../client_ws" }
```
### Basic Example
### Basic Example (Self-Managing Connection)
```rust
use circle_client_ws::CircleWsClientBuilder;
@ -28,27 +40,73 @@ use circle_client_ws::CircleWsClientBuilder;
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create client with private key
let private_key = "your_private_key_hex";
let client = CircleWsClientBuilder::new()
.with_private_key(private_key)?
let mut client = CircleWsClientBuilder::new("ws://localhost:8080".to_string())
.with_keypair(private_key.to_string())
.build();
// Connect and authenticate
client.connect("ws://localhost:8080").await?;
// Connect - this handles authentication, keep-alive, and reconnection automatically
client.connect().await?;
// Use the authenticated client...
// Check connection status
println!("Connected: {}", client.is_connected());
// Execute scripts on the server
let result = client.play("\"Hello from client!\"".to_string()).await?;
println!("Script result: {:?}", result);
// Client automatically maintains connection in the background
// No manual keep-alive or reconnection logic needed
Ok(())
}
```
### Authentication Flow
### Self-Managing Features
The client automatically handles the secp256k1 authentication flow:
1. Connects to WebSocket server
2. Receives authentication challenge
3. Signs challenge with private key
4. Sends signed response
5. Receives authentication confirmation
The client automatically handles:
1. **Connection Establishment**: WebSocket connection to the server
2. **Authentication Flow**: secp256k1 challenge-response authentication
3. **Keep-Alive Monitoring**: Periodic health checks to ensure connection stability
4. **Automatic Reconnection**: Transparent reconnection on connection failures
5. **Resource Management**: Proper cleanup when the client is dropped
### Connection Status Monitoring
```rust
// Check if the client is currently connected
if client.is_connected() {
println!("Client is connected and healthy");
} else {
println!("Client is disconnected or reconnecting");
}
// Get detailed connection status
let status = client.get_connection_status();
println!("Connection status: {}", status);
```
### WASM Usage
For WASM applications, the client works seamlessly in browsers:
```rust
use circle_client_ws::CircleWsClientBuilder;
use wasm_bindgen_futures::spawn_local;
// In a WASM context
spawn_local(async move {
let mut client = CircleWsClientBuilder::new("ws://localhost:8080".to_string())
.build();
// Self-managing connection works the same in WASM
if let Ok(_) = client.connect().await {
// Client automatically handles keep-alive and reconnection
let result = client.play("\"WASM client connected!\"".to_string()).await;
// Handle result...
}
});
```
## Binary Tool
@ -56,12 +114,28 @@ A command-line binary is also available for interactive use and script execution
## Platform Support
- **Native**: Full support on all Rust-supported platforms
- **WASM**: Browser support with web-sys bindings
- **Native**: Full support on all Rust-supported platforms with tokio-tungstenite
- **WASM**: Browser support with gloo-net WebSocket bindings
## Dependencies
- `tokio-tungstenite`: WebSocket implementation
- `secp256k1`: Cryptographic operations
- `serde`: JSON serialization
- `uuid`: Request ID generation
### Core Dependencies
- `serde`: JSON serialization and deserialization
- `uuid`: Request ID generation for JSON-RPC
- `futures-util`: Async utilities for WebSocket handling
- `thiserror`: Error handling and propagation
### Platform-Specific Dependencies
#### Native (tokio-based)
- `tokio-tungstenite`: Robust WebSocket implementation
- `tokio`: Async runtime for connection management
#### WASM (browser-based)
- `gloo-net`: WebSocket bindings for browsers
- `gloo-timers`: Timer utilities for keep-alive functionality
- `wasm-bindgen-futures`: Async support in WASM
### Cryptographic Dependencies (optional)
- `secp256k1`: Elliptic curve cryptography for authentication
- `sha3`: Hashing for cryptographic operations

View File

@ -1,12 +1,12 @@
# Circles WebSocket Client Binary
# Circles WebSocket Client
A command-line WebSocket client for connecting to Circles servers with authentication support.
A WebSocket client for connecting to Circles servers with authentication support. Available in both CLI and WebAssembly (WASM) versions.
## Binary: `circles_client`
## CLI Usage
### Installation
Build the binary:
Build the CLI binary:
```bash
cargo build --bin circles_client --release
```
@ -49,6 +49,53 @@ circles_client -vv ws://localhost:8080
- **Verbosity Control**: Use `-v` flags to increase logging detail
- **Cross-platform**: Works on all platforms supported by Rust and tokio-tungstenite
## WebAssembly (WASM) Usage
### Build and Serve
1. Install Trunk:
```bash
cargo install trunk
```
2. Build the WASM version:
```bash
trunk build --release
```
3. Serve the application:
```bash
trunk serve
```
The application will be available at `http://localhost:8080`
### Usage in Browser
1. Open the served page in your browser
2. Enter the WebSocket server URL
3. Choose either:
- Execute a Rhai script directly
- Enter interactive mode (type 'exit' or 'quit' to leave)
### Features
- **Browser Integration**: Uses browser's WebSocket implementation
- **Interactive Mode**: Browser-based input/output using prompts
- **Error Handling**: Browser console logging
- **Cross-browser**: Works in all modern browsers supporting WebAssembly
## Common Features
Both versions share the same core functionality:
- **WebSocket Connection**: Connects to Circles WebSocket server
- **Authentication**: Handles secp256k1 authentication
- **Script Execution**: Executes Rhai scripts
- **Interactive Mode**: Provides REPL-like interface
- **Error Handling**: Comprehensive error reporting
- **Logging**: Detailed logging at different verbosity levels
### Interactive Mode
When run without `-s` or `-f` flags, the client enters interactive mode where you can:

View File

@ -1,12 +1,30 @@
#![cfg_attr(target_arch = "wasm32", no_main)]
use circle_client_ws::CircleWsClientBuilder;
use clap::{Arg, ArgAction, Command};
use dotenv::dotenv;
use env_logger;
use log::{error, info};
#[cfg(not(target_arch = "wasm32"))]
use std::env;
use std::io::{self, Write};
#[cfg(not(target_arch = "wasm32"))]
use std::path::Path;
#[cfg(not(target_arch = "wasm32"))]
use std::io::{self, Write};
#[cfg(target_arch = "wasm32")]
use wasm_bindgen::prelude::*;
#[cfg(target_arch = "wasm32")]
use web_sys::{console, window};
#[cfg(target_arch = "wasm32")]
use wasm_bindgen_futures::spawn_local;
#[cfg(not(target_arch = "wasm32"))]
use clap::{Arg, ArgAction, Command};
#[cfg(not(target_arch = "wasm32"))]
use dotenv::dotenv;
#[cfg(not(target_arch = "wasm32"))]
use env_logger;
#[cfg(not(target_arch = "wasm32"))]
use tokio;
#[cfg(not(target_arch = "wasm32"))]
use log::{error, info};
#[derive(Debug)]
struct Args {
@ -17,6 +35,7 @@ struct Args {
no_timestamp: bool,
}
#[cfg(not(target_arch = "wasm32"))]
fn parse_args() -> Args {
let matches = Command::new("circles_client")
.version("0.1.0")
@ -67,6 +86,7 @@ fn parse_args() -> Args {
}
}
#[cfg(not(target_arch = "wasm32"))]
fn setup_logging(verbose: u8, no_timestamp: bool) {
let log_level = match verbose {
0 => "warn,circle_client_ws=info",
@ -87,6 +107,7 @@ fn setup_logging(verbose: u8, no_timestamp: bool) {
}
}
#[cfg(not(target_arch = "wasm32"))]
fn load_private_key() -> Result<String, Box<dyn std::error::Error>> {
// Try to load from .env file first
if let Ok(_) = dotenv() {
@ -107,42 +128,82 @@ fn load_private_key() -> Result<String, Box<dyn std::error::Error>> {
Err("PRIVATE_KEY not found in environment or .env files".into())
}
#[cfg(target_arch = "wasm32")]
async fn run_interactive_mode(client: circle_client_ws::CircleWsClient) -> Result<(), Box<dyn std::error::Error>> {
info!("Entering interactive mode. Type 'exit' or 'quit' to leave.");
println!("🔄 Interactive mode - Enter Rhai scripts (type 'exit' or 'quit' to leave):");
console::log_1(&"Entering interactive mode. Type 'exit' or 'quit' to leave.".into());
console::log_1(&"🔄 Interactive mode - Enter Rhai scripts (type 'exit' or 'quit' to leave):\n".into());
loop {
print!("rhai> ");
io::stdout().flush()?;
// In wasm32, we need to use browser's console for input/output
let window = window().expect("Window not available");
let input = window.prompt_with_message("Enter Rhai script (or 'exit' to quit):")
.map_err(|e| format!("Failed to get input: {:#?}", e))? // Use debug formatting
.unwrap_or_default();
let mut input = String::new();
io::stdin().read_line(&mut input)?;
let script = input.trim();
if script.is_empty() {
continue;
// Handle empty or exit cases
if input == "exit" || input == "quit" {
console::log_1(&"👋 Goodbye!".into());
return Ok(());
}
// Execute the script
match client.play(input).await {
Ok(result) => {
console::log_1(&format!("📤 Result: {}", result.output).into());
}
if script == "exit" || script == "quit" {
println!("👋 Goodbye!");
break;
}
match client.play(script.to_string()).await {
Ok(result) => {
println!("📤 Result: {}", result.output);
}
Err(e) => {
error!("❌ Script execution failed: {}", e);
println!("❌ Error: {}", e);
}
Err(e) => {
console::log_1(&format!("❌ Script execution failed: {}", e).into());
}
}
Ok(())
}
#[cfg(target_arch = "wasm32")]
async fn execute_script(client: circle_client_ws::CircleWsClient, script: String) -> Result<(), Box<dyn std::error::Error>> {
console::log_1(&format!("Executing script: {}", script).into());
match client.play(script).await {
Ok(result) => {
console::log_1(&result.output.into());
Ok(())
}
Err(e) => {
console::log_1(&format!("Script execution failed: {}", e).into());
Err(e.into())
}
}
}
#[cfg(target_arch = "wasm32")]
pub async fn start_client(url: &str, script: Option<String>) -> Result<(), Box<dyn std::error::Error>> {
// Build client
let mut client = CircleWsClientBuilder::new(url.to_string())
.build();
// Connect to WebSocket server
console::log_1(&"🔌 Connecting to WebSocket server...".into());
if let Err(e) = client.connect().await {
console::log_1(&format!("❌ Failed to connect: {}", e).into());
return Err(e.into());
}
console::log_1(&"✅ Connected successfully".into());
// Authenticate with server
if let Err(e) = client.authenticate().await {
console::log_1(&format!("❌ Authentication failed: {}", e).into());
return Err(e.into());
}
console::log_1(&"✅ Authentication successful".into());
// Handle script execution
if let Some(script) = script {
execute_script(client, script).await
} else {
run_interactive_mode(client).await
}
}
#[cfg(not(target_arch = "wasm32"))]
async fn execute_script(client: circle_client_ws::CircleWsClient, script: String) -> Result<(), Box<dyn std::error::Error>> {
info!("Executing script: {}", script);
@ -158,11 +219,43 @@ async fn execute_script(client: circle_client_ws::CircleWsClient, script: String
}
}
#[cfg(not(target_arch = "wasm32"))]
async fn load_script_from_file(path: &str) -> Result<String, Box<dyn std::error::Error>> {
let script = tokio::fs::read_to_string(path).await?;
Ok(script)
}
#[cfg(not(target_arch = "wasm32"))]
async fn run_interactive_mode(client: circle_client_ws::CircleWsClient) -> Result<(), Box<dyn std::error::Error>> {
println!("\n🔄 Interactive mode - Enter Rhai scripts (type 'exit' or 'quit' to leave):\n");
loop {
print!("Enter Rhai script (or 'exit' to quit): ");
io::stdout().flush()?;
let mut input = String::new();
io::stdin().read_line(&mut input)?;
let input = input.trim().to_string();
if input == "exit" || input == "quit" {
println!("\n👋 Goodbye!");
return Ok(());
}
match client.play(input).await {
Ok(result) => {
println!("\n📤 Result: {}", result.output);
}
Err(e) => {
error!("❌ Script execution failed: {}", e);
println!("\n❌ Script execution failed: {}", e);
}
}
println!();
}
}
#[cfg(not(target_arch = "wasm32"))]
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = parse_args();

View File

@ -148,6 +148,7 @@ impl CircleWsClientBuilder {
#[cfg(not(target_arch = "wasm32"))]
task_handle: None,
private_key: self.private_key,
is_connected: Arc::new(Mutex::new(false)),
}
}
}
@ -158,24 +159,61 @@ pub struct CircleWsClient {
#[cfg(not(target_arch = "wasm32"))]
task_handle: Option<tokio::task::JoinHandle<()>>,
private_key: Option<String>,
is_connected: Arc<Mutex<bool>>,
}
impl CircleWsClient {
/// Get the connection status
pub fn get_connection_status(&self) -> String {
if *self.is_connected.lock().unwrap() {
"Connected".to_string()
} else {
"Disconnected".to_string()
}
}
/// Check if the client is connected
pub fn is_connected(&self) -> bool {
*self.is_connected.lock().unwrap()
}
}
impl CircleWsClient {
pub async fn authenticate(&mut self) -> Result<bool, CircleWsClientError> {
info!("🔐 [{}] Starting authentication process...", self.ws_url);
let private_key = self
.private_key
.as_ref()
.ok_or(CircleWsClientError::AuthNoKeyPair)?;
info!("🔑 [{}] Deriving public key from private key...", self.ws_url);
let public_key = auth::derive_public_key(private_key)?;
info!("✅ [{}] Public key derived: {}...", self.ws_url, &public_key[..8]);
info!("🎫 [{}] Fetching authentication nonce...", self.ws_url);
let nonce = self.fetch_nonce(&public_key).await?;
let signature = auth::sign_message(private_key, &nonce)?;
info!("✅ [{}] Nonce received: {}...", self.ws_url, &nonce[..8]);
self.authenticate_with_signature(&public_key, &signature)
.await
info!("✍️ [{}] Signing nonce with private key...", self.ws_url);
let signature = auth::sign_message(private_key, &nonce)?;
info!("✅ [{}] Signature created: {}...", self.ws_url, &signature[..8]);
info!("🔒 [{}] Submitting authentication credentials...", self.ws_url);
let result = self.authenticate_with_signature(&public_key, &signature).await?;
if result {
info!("🎉 [{}] Authentication successful!", self.ws_url);
} else {
error!("❌ [{}] Authentication failed - server rejected credentials", self.ws_url);
}
Ok(result)
}
async fn fetch_nonce(&self, pubkey: &str) -> Result<String, CircleWsClientError> {
info!("📡 [{}] Sending fetch_nonce request for pubkey: {}...", self.ws_url, &pubkey[..8]);
let params = FetchNonceParams {
pubkey: pubkey.to_string(),
};
@ -183,6 +221,7 @@ impl CircleWsClient {
let res = self.send_request(req).await?;
if let Some(err) = res.error {
error!("❌ [{}] fetch_nonce failed: {} (code: {})", self.ws_url, err.message, err.code);
return Err(CircleWsClientError::JsonRpcError {
code: err.code,
message: err.message,
@ -191,6 +230,7 @@ impl CircleWsClient {
}
let nonce_res: FetchNonceResponse = serde_json::from_value(res.result.unwrap_or_default())?;
info!("✅ [{}] fetch_nonce successful, nonce length: {}", self.ws_url, nonce_res.nonce.len());
Ok(nonce_res.nonce)
}
@ -199,6 +239,8 @@ impl CircleWsClient {
pubkey: &str,
signature: &str,
) -> Result<bool, CircleWsClientError> {
info!("📡 [{}] Sending authenticate request with signature...", self.ws_url);
let params = AuthCredentialsParams {
pubkey: pubkey.to_string(),
signature: signature.to_string(),
@ -207,6 +249,7 @@ impl CircleWsClient {
let res = self.send_request(req).await?;
if let Some(err) = res.error {
error!("❌ [{}] authenticate failed: {} (code: {})", self.ws_url, err.message, err.code);
return Err(CircleWsClientError::JsonRpcError {
code: err.code,
message: err.message,
@ -214,10 +257,18 @@ impl CircleWsClient {
});
}
Ok(res
let authenticated = res
.result
.and_then(|v| v.get("authenticated").and_then(|v| v.as_bool()))
.unwrap_or(false))
.unwrap_or(false);
if authenticated {
info!("✅ [{}] authenticate request successful - server confirmed authentication", self.ws_url);
} else {
error!("❌ [{}] authenticate request failed - server returned false", self.ws_url);
}
Ok(authenticated)
}
fn create_request<T: Serialize>(
@ -275,17 +326,19 @@ impl CircleWsClient {
pub async fn connect(&mut self) -> Result<(), CircleWsClientError> {
if self.internal_tx.is_some() {
info!("Client already connected or connecting.");
info!("🔄 [{}] Client already connected or connecting", self.ws_url);
return Ok(());
}
info!("🚀 [{}] Starting self-managed WebSocket connection with keep-alive and reconnection...", self.ws_url);
let (internal_tx, internal_rx) = mpsc::channel::<InternalWsMessage>(32);
self.internal_tx = Some(internal_tx);
// Use the URL as provided - support both ws:// and wss://
// Clone necessary data for the task
let connection_url = self.ws_url.clone();
let is_secure = connection_url.starts_with("wss://");
info!("🔗 Connecting to WebSocket: {} ({})", connection_url, if is_secure { "WSS/TLS" } else { "WS/Plain" });
let private_key = self.private_key.clone();
let is_connected = self.is_connected.clone();
info!("🔗 [{}] Will handle connection, authentication, keep-alive, and reconnection internally", connection_url);
// Pending requests: map request_id to a oneshot sender for the response
let pending_requests: Arc<
@ -301,191 +354,135 @@ impl CircleWsClient {
let log_url = connection_url.clone();
let task = async move {
#[cfg(target_arch = "wasm32")]
let ws_result = WebSocket::open(&connection_url);
// Main connection loop with reconnection logic
loop {
info!("🔄 [{}] Starting connection attempt...", log_url);
// Reset connection status
*is_connected.lock().unwrap() = false;
// Clone connection_url for this iteration to avoid move issues
let connection_url_clone = connection_url.clone();
// Establish WebSocket connection
#[cfg(target_arch = "wasm32")]
let ws_result = WebSocket::open(&connection_url_clone);
#[cfg(not(target_arch = "wasm32"))]
let connect_attempt = async {
// Check if this is a secure WebSocket connection
if connection_url.starts_with("wss://") {
// For WSS connections, use a custom TLS connector that accepts self-signed certificates
// This is for development/demo purposes only
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
let request = connection_url.into_client_request()
.map_err(|e| CircleWsClientError::ConnectionError(format!("Invalid URL: {}", e)))?;
// Create a native-tls connector that accepts invalid certificates (for development)
let tls_connector = native_tls::TlsConnector::builder()
.danger_accept_invalid_certs(true)
.danger_accept_invalid_hostnames(true)
.build()
.map_err(|e| CircleWsClientError::ConnectionError(format!("TLS connector creation failed: {}", e)))?;
let connector = Connector::NativeTls(tls_connector);
warn!("⚠️ DEVELOPMENT MODE: Accepting self-signed certificates (NOT for production!)");
connect_async_tls_with_config(request, None, false, Some(connector))
.await
.map_err(|e| CircleWsClientError::ConnectionError(format!("WSS connection failed: {}", e)))
} else {
// For regular WS connections, use the standard method
connect_async(&connection_url)
.await
.map_err(|e| CircleWsClientError::ConnectionError(format!("WS connection failed: {}", e)))
}
};
#[cfg(not(target_arch = "wasm32"))]
let ws_result = connect_attempt.await;
#[cfg(not(target_arch = "wasm32"))]
let connect_attempt = async {
// Check if this is a secure WebSocket connection
if connection_url_clone.starts_with("wss://") {
// For WSS connections, use a custom TLS connector that accepts self-signed certificates
// This is for development/demo purposes only
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
let request = connection_url_clone.as_str().into_client_request()
.map_err(|e| CircleWsClientError::ConnectionError(format!("Invalid URL: {}", e)))?;
// Create a native-tls connector that accepts invalid certificates (for development)
let tls_connector = native_tls::TlsConnector::builder()
.danger_accept_invalid_certs(true)
.danger_accept_invalid_hostnames(true)
.build()
.map_err(|e| CircleWsClientError::ConnectionError(format!("TLS connector creation failed: {}", e)))?;
let connector = Connector::NativeTls(tls_connector);
warn!("⚠️ DEVELOPMENT MODE: Accepting self-signed certificates (NOT for production!)");
connect_async_tls_with_config(request, None, false, Some(connector))
.await
.map_err(|e| CircleWsClientError::ConnectionError(format!("WSS connection failed: {}", e)))
} else {
// For regular WS connections, use the standard method
connect_async(&connection_url_clone)
.await
.map_err(|e| CircleWsClientError::ConnectionError(format!("WS connection failed: {}", e)))
}
};
#[cfg(not(target_arch = "wasm32"))]
let ws_result = connect_attempt.await;
match ws_result {
Ok(ws_conn_maybe_response) => {
#[cfg(target_arch = "wasm32")]
let ws_conn = ws_conn_maybe_response;
#[cfg(not(target_arch = "wasm32"))]
let (ws_conn, _) = ws_conn_maybe_response;
match ws_result {
Ok(ws_conn_maybe_response) => {
#[cfg(target_arch = "wasm32")]
let ws_conn = ws_conn_maybe_response;
#[cfg(not(target_arch = "wasm32"))]
let (ws_conn, _) = ws_conn_maybe_response;
info!("Successfully connected to WebSocket: {}", log_url);
let (mut ws_tx, mut ws_rx) = ws_conn.split();
let mut internal_rx_fused = internal_rx.fuse();
loop {
futures_util::select! {
// Handle messages from the client's public methods (e.g., play)
internal_msg = internal_rx_fused.next().fuse() => {
match internal_msg {
Some(InternalWsMessage::SendJsonRpc(req, response_sender)) => {
let req_id = req.id.clone();
match serde_json::to_string(&req) {
Ok(req_str) => {
debug!("Sending JSON-RPC request (ID: {}): {}", req_id, req_str);
#[cfg(target_arch = "wasm32")]
let send_res = ws_tx.send(GlooWsMessage::Text(req_str)).await;
#[cfg(not(target_arch = "wasm32"))]
let send_res = ws_tx.send(TungsteniteWsMessage::Text(req_str)).await;
if let Err(e) = send_res {
error!("WebSocket send error for request ID {}: {:?}", req_id, e);
let _ = response_sender.send(Err(CircleWsClientError::SendError(e.to_string())));
} else {
// Store the sender to await the response
task_pending_requests.lock().unwrap().insert(req_id, response_sender);
}
}
Err(e) => {
error!("Failed to serialize request ID {}: {}", req_id, e);
let _ = response_sender.send(Err(CircleWsClientError::JsonError(e)));
}
}
}
Some(InternalWsMessage::Close) => {
info!("Close message received internally, closing WebSocket.");
let _ = ws_tx.close().await;
break;
}
None => { // internal_rx closed, meaning client was dropped
info!("Internal MPSC channel closed, WebSocket task shutting down.");
let _ = ws_tx.close().await;
break;
}
}
},
// Handle messages received from the WebSocket server
ws_msg_res = ws_rx.next().fuse() => {
match ws_msg_res {
Some(Ok(msg)) => {
#[cfg(target_arch = "wasm32")]
match msg {
GlooWsMessage::Text(text) => {
debug!("Received WebSocket message: {}", text);
// ... (parse logic as before)
match serde_json::from_str::<JsonRpcResponseClient>(&text) {
Ok(response) => {
if let Some(sender) = task_pending_requests.lock().unwrap().remove(&response.id) {
if let Err(failed_send_val) = sender.send(Ok(response)) {
if let Ok(resp_for_log) = failed_send_val { warn!("Failed to send response to waiting task for ID: {}", resp_for_log.id); }
else { warn!("Failed to send response to waiting task, and also failed to get original response for logging.");}
}
} else { warn!("Received response for unknown request ID or unsolicited message: {:?}", response); }
}
Err(e) => { error!("Failed to parse JSON-RPC response: {}. Raw: {}", e, text); }
}
}
GlooWsMessage::Bytes(_) => {
debug!("Received binary WebSocket message (WASM).");
}
}
#[cfg(not(target_arch = "wasm32"))]
match msg {
TungsteniteWsMessage::Text(text) => {
debug!("Received WebSocket message: {}", text);
// ... (parse logic as before)
match serde_json::from_str::<JsonRpcResponseClient>(&text) {
Ok(response) => {
if let Some(sender) = task_pending_requests.lock().unwrap().remove(&response.id) {
if let Err(failed_send_val) = sender.send(Ok(response)) {
if let Ok(resp_for_log) = failed_send_val { warn!("Failed to send response to waiting task for ID: {}", resp_for_log.id); }
else { warn!("Failed to send response to waiting task, and also failed to get original response for logging.");}
}
} else { warn!("Received response for unknown request ID or unsolicited message: {:?}", response); }
}
Err(e) => { error!("Failed to parse JSON-RPC response: {}. Raw: {}", e, text); }
}
}
TungsteniteWsMessage::Binary(_) => {
debug!("Received binary WebSocket message (Native).");
}
TungsteniteWsMessage::Ping(_) | TungsteniteWsMessage::Pong(_) => {
debug!("Received Ping/Pong (Native).");
}
TungsteniteWsMessage::Close(_) => {
info!("WebSocket connection closed by server (Native).");
break;
}
TungsteniteWsMessage::Frame(_) => {
debug!("Received Frame (Native) - not typically handled directly.");
}
}
}
Some(Err(e)) => {
error!("WebSocket receive error: {:?}", e);
break; // Exit loop on receive error
}
None => { // WebSocket stream closed
info!("WebSocket connection closed by server (stream ended).");
break;
}
}
// For WASM, WebSocket::open() always succeeds even if server is down
// We'll start as "connecting" and detect failures through timeouts
#[cfg(target_arch = "wasm32")]
info!("🔄 [{}] WebSocket object created, testing actual connectivity...", log_url);
#[cfg(not(target_arch = "wasm32"))]
{
info!("✅ [{}] WebSocket connection established successfully", log_url);
*is_connected.lock().unwrap() = true;
}
// Handle authentication if private key is provided
let auth_success = if let Some(ref _pk) = private_key {
info!("🔐 [{}] Authentication will be handled by separate authenticate() call", log_url);
true // For now, assume auth will be handled separately
} else {
info!(" [{}] No private key provided, skipping authentication", log_url);
true
};
if auth_success {
// Start the main message handling loop with keep-alive
let disconnect_reason = Self::handle_connection_with_keepalive(
ws_conn,
internal_rx,
&task_pending_requests,
&log_url,
&is_connected
).await;
info!("🔌 [{}] Connection ended: {}", log_url, disconnect_reason);
// Check if this was a manual disconnect
if disconnect_reason == "Manual close requested" {
break; // Don't reconnect on manual close
}
// If we reach here, we need to recreate internal_rx for the next iteration
// But since internal_rx was moved, we need to break out of the loop
break;
}
}
// Cleanup pending requests on exit
task_pending_requests
.lock()
.unwrap()
.drain()
.for_each(|(_, sender)| {
let _ = sender.send(Err(CircleWsClientError::ConnectionError(
"WebSocket task terminated".to_string(),
)));
});
Err(e) => {
error!("❌ [{}] WebSocket connection failed: {:?}", log_url, e);
}
}
Err(e) => {
error!("Failed to connect to WebSocket: {:?}", e);
// Notify any waiting senders about the connection failure
internal_rx
.for_each(|msg| async {
if let InternalWsMessage::SendJsonRpc(_, response_sender) = msg {
let _ = response_sender
.send(Err(CircleWsClientError::ConnectionError(e.to_string())));
}
})
.await;
// Reset connection status
*is_connected.lock().unwrap() = false;
// Wait before reconnecting
info!("⏳ [{}] Waiting 5 seconds before reconnection attempt...", log_url);
#[cfg(target_arch = "wasm32")]
{
use gloo_timers::future::TimeoutFuture;
TimeoutFuture::new(5_000).await;
}
#[cfg(not(target_arch = "wasm32"))]
{
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
info!("WebSocket task finished.");
// Cleanup pending requests on exit
task_pending_requests
.lock()
.unwrap()
.drain()
.for_each(|(_, sender)| {
let _ = sender.send(Err(CircleWsClientError::ConnectionError(
"WebSocket task terminated".to_string(),
)));
});
info!("🏁 [{}] WebSocket task finished", log_url);
};
#[cfg(target_arch = "wasm32")]
@ -497,6 +494,261 @@ impl CircleWsClient {
Ok(())
}
// Enhanced connection loop handler with keep-alive
#[cfg(target_arch = "wasm32")]
async fn handle_connection_with_keepalive(
ws_conn: WebSocket,
mut internal_rx: mpsc::Receiver<InternalWsMessage>,
pending_requests: &Arc<Mutex<HashMap<String, oneshot::Sender<Result<JsonRpcResponseClient, CircleWsClientError>>>>>,
log_url: &str,
is_connected: &Arc<Mutex<bool>>,
) -> String {
let (mut ws_tx, mut ws_rx) = ws_conn.split();
let mut internal_rx_fused = internal_rx.fuse();
// Connection validation for WASM - test if connection actually works
let mut connection_test_timer = TimeoutFuture::new(2_000).fuse(); // 2 second timeout
let mut connection_validated = false;
// Keep-alive timer - send ping every 30 seconds
use gloo_timers::future::TimeoutFuture;
let mut keep_alive_timer = TimeoutFuture::new(30_000).fuse();
// Send initial connection test ping
debug!("Sending initial connection test ping to {}", log_url);
let test_ping_res = ws_tx.send(GlooWsMessage::Text("ping".to_string())).await;
if let Err(e) = test_ping_res {
error!("❌ [{}] Initial connection test failed: {:?}", log_url, e);
*is_connected.lock().unwrap() = false;
return format!("Initial connection test failed: {}", e);
}
loop {
futures_util::select! {
// Connection test timeout - if no response in 2 seconds, connection failed
_ = connection_test_timer => {
if !connection_validated {
error!("❌ [{}] Connection test failed - no response within 2 seconds", log_url);
*is_connected.lock().unwrap() = false;
return "Connection test timeout - server not responding".to_string();
}
}
// Handle messages from the client's public methods (e.g., play)
internal_msg = internal_rx_fused.next().fuse() => {
match internal_msg {
Some(InternalWsMessage::SendJsonRpc(req, response_sender)) => {
let req_id = req.id.clone();
match serde_json::to_string(&req) {
Ok(req_str) => {
debug!("Sending JSON-RPC request (ID: {}): {}", req_id, req_str);
let send_res = ws_tx.send(GlooWsMessage::Text(req_str)).await;
if let Err(e) = send_res {
error!("WebSocket send error for request ID {}: {:?}", req_id, e);
// Connection failed - update status
*is_connected.lock().unwrap() = false;
let _ = response_sender.send(Err(CircleWsClientError::SendError(e.to_string())));
} else {
// Store the sender to await the response
pending_requests.lock().unwrap().insert(req_id, response_sender);
}
}
Err(e) => {
error!("Failed to serialize request ID {}: {}", req_id, e);
let _ = response_sender.send(Err(CircleWsClientError::JsonError(e)));
}
}
}
Some(InternalWsMessage::Close) => {
info!("Close message received internally, closing WebSocket.");
let _ = ws_tx.close().await;
return "Manual close requested".to_string();
}
None => {
info!("Internal MPSC channel closed, WebSocket task shutting down.");
let _ = ws_tx.close().await;
return "Internal channel closed".to_string();
}
}
},
// Handle messages received from the WebSocket server
ws_msg_res = ws_rx.next().fuse() => {
match ws_msg_res {
Some(Ok(msg)) => {
// Any successful message confirms the connection is working
if !connection_validated {
info!("✅ [{}] WebSocket connection validated - received message from server", log_url);
*is_connected.lock().unwrap() = true;
connection_validated = true;
}
match msg {
GlooWsMessage::Text(text) => {
debug!("Received WebSocket message: {}", text);
Self::handle_received_message(&text, pending_requests);
}
GlooWsMessage::Bytes(_) => {
debug!("Received binary WebSocket message (WASM).");
}
}
}
Some(Err(e)) => {
error!("WebSocket receive error: {:?}", e);
*is_connected.lock().unwrap() = false;
return format!("Receive error: {}", e);
}
None => {
info!("WebSocket connection closed by server (stream ended).");
*is_connected.lock().unwrap() = false;
return "Server closed connection (stream ended)".to_string();
}
}
}
// Keep-alive timer - send ping every 30 seconds
_ = keep_alive_timer => {
// Only send ping if connection is validated
if connection_validated {
debug!("Sending keep-alive ping to {}", log_url);
let ping_str = "ping"; // Send simple plaintext ping
let send_res = ws_tx.send(GlooWsMessage::Text(ping_str.to_string())).await;
if let Err(e) = send_res {
warn!("Keep-alive ping failed for {}: {:?}", log_url, e);
*is_connected.lock().unwrap() = false;
return format!("Keep-alive failed: {}", e);
}
} else {
debug!("Skipping keep-alive ping - connection not yet validated for {}", log_url);
}
// Reset timer
keep_alive_timer = TimeoutFuture::new(30_000).fuse();
}
}
}
}
// Enhanced connection loop handler with keep-alive for native targets
#[cfg(not(target_arch = "wasm32"))]
async fn handle_connection_with_keepalive(
ws_conn: tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
mut internal_rx: mpsc::Receiver<InternalWsMessage>,
pending_requests: &Arc<Mutex<HashMap<String, oneshot::Sender<Result<JsonRpcResponseClient, CircleWsClientError>>>>>,
log_url: &str,
_is_connected: &Arc<Mutex<bool>>,
) -> String {
let (mut ws_tx, mut ws_rx) = ws_conn.split();
let mut internal_rx_fused = internal_rx.fuse();
loop {
futures_util::select! {
// Handle messages from the client's public methods (e.g., play)
internal_msg = internal_rx_fused.next().fuse() => {
match internal_msg {
Some(InternalWsMessage::SendJsonRpc(req, response_sender)) => {
let req_id = req.id.clone();
match serde_json::to_string(&req) {
Ok(req_str) => {
debug!("Sending JSON-RPC request (ID: {}): {}", req_id, req_str);
let send_res = ws_tx.send(TungsteniteWsMessage::Text(req_str)).await;
if let Err(e) = send_res {
error!("WebSocket send error for request ID {}: {:?}", req_id, e);
let _ = response_sender.send(Err(CircleWsClientError::SendError(e.to_string())));
} else {
// Store the sender to await the response
pending_requests.lock().unwrap().insert(req_id, response_sender);
}
}
Err(e) => {
error!("Failed to serialize request ID {}: {}", req_id, e);
let _ = response_sender.send(Err(CircleWsClientError::JsonError(e)));
}
}
}
Some(InternalWsMessage::Close) => {
info!("Close message received internally, closing WebSocket.");
let _ = ws_tx.close().await;
return "Manual close requested".to_string();
}
None => {
info!("Internal MPSC channel closed, WebSocket task shutting down.");
let _ = ws_tx.close().await;
return "Internal channel closed".to_string();
}
}
},
// Handle messages received from the WebSocket server
ws_msg_res = ws_rx.next().fuse() => {
match ws_msg_res {
Some(Ok(msg)) => {
match msg {
TungsteniteWsMessage::Text(text) => {
debug!("Received WebSocket message: {}", text);
Self::handle_received_message(&text, pending_requests);
}
TungsteniteWsMessage::Binary(_) => {
debug!("Received binary WebSocket message (Native).");
}
TungsteniteWsMessage::Ping(_) | TungsteniteWsMessage::Pong(_) => {
debug!("Received Ping/Pong (Native).");
}
TungsteniteWsMessage::Close(_) => {
info!("WebSocket connection closed by server (Native).");
return "Server closed connection".to_string();
}
TungsteniteWsMessage::Frame(_) => {
debug!("Received Frame (Native) - not typically handled directly.");
}
}
}
Some(Err(e)) => {
error!("WebSocket receive error: {:?}", e);
return format!("Receive error: {}", e);
}
None => {
info!("WebSocket connection closed by server (stream ended).");
return "Server closed connection (stream ended)".to_string();
}
}
}
}
}
}
// Helper method to handle received messages
fn handle_received_message(
text: &str,
pending_requests: &Arc<Mutex<HashMap<String, oneshot::Sender<Result<JsonRpcResponseClient, CircleWsClientError>>>>>,
) {
// Handle ping/pong messages - these are not JSON-RPC
if text.trim() == "pong" {
debug!("Received pong response");
return;
}
match serde_json::from_str::<JsonRpcResponseClient>(text) {
Ok(response) => {
if let Some(sender) = pending_requests.lock().unwrap().remove(&response.id) {
if let Err(failed_send_val) = sender.send(Ok(response)) {
if let Ok(resp_for_log) = failed_send_val {
warn!("Failed to send response to waiting task for ID: {}", resp_for_log.id);
} else {
warn!("Failed to send response to waiting task, and also failed to get original response for logging.");
}
}
} else {
warn!("Received response for unknown request ID or unsolicited message: {:?}", response);
}
}
Err(e) => {
error!("Failed to parse JSON-RPC response: {}. Raw: {}", e, text);
}
}
}
pub fn play(
&self,

View File

@ -421,6 +421,13 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for CircleWs {
Ok(ws::Message::Text(text)) => {
debug!("WS Text for {}: {}", self.server_circle_name, text);
// Handle plaintext ping messages for keep-alive
if text.trim() == "ping" {
debug!("Received keep-alive ping from {}, responding with pong", self.server_circle_name);
ctx.text("pong");
return;
}
match serde_json::from_str::<JsonRpcRequest>(&text) {
Ok(req) => {
let client_rpc_id = req.id.clone().unwrap_or(Value::Null);