Compare commits
2 Commits
d059af9a18
...
main
Author | SHA1 | Date | |
---|---|---|---|
|
70b5b336f1 | ||
|
29ff40d1a4 |
35
Cargo.lock
generated
35
Cargo.lock
generated
@@ -695,7 +695,7 @@ dependencies = [
|
||||
"rand 0.8.5",
|
||||
"redis 0.23.3",
|
||||
"redis 0.25.4",
|
||||
"rhai_client",
|
||||
"rhai_dispatcher",
|
||||
"rhailib_engine",
|
||||
"rhailib_worker",
|
||||
"rustls",
|
||||
@@ -2834,21 +2834,6 @@ dependencies = [
|
||||
"thin-vec",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rhai_client"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"clap",
|
||||
"env_logger",
|
||||
"log",
|
||||
"redis 0.25.4",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rhai_client_macros"
|
||||
version = "0.1.0"
|
||||
@@ -2870,6 +2855,21 @@ dependencies = [
|
||||
"syn 2.0.103",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rhai_dispatcher"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"clap",
|
||||
"env_logger",
|
||||
"log",
|
||||
"redis 0.25.4",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rhailib_dsl"
|
||||
version = "0.1.0"
|
||||
@@ -2883,6 +2883,7 @@ dependencies = [
|
||||
"macros",
|
||||
"reqwest",
|
||||
"rhai",
|
||||
"rhai_dispatcher",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
@@ -2911,7 +2912,7 @@ dependencies = [
|
||||
"log",
|
||||
"redis 0.25.4",
|
||||
"rhai",
|
||||
"rhai_client",
|
||||
"rhai_dispatcher",
|
||||
"rhailib_engine",
|
||||
"serde",
|
||||
"serde_json",
|
||||
|
@@ -1,5 +1,5 @@
|
||||
use clap::Parser;
|
||||
use rhai_client::{RhaiClient, RhaiClientBuilder};
|
||||
use rhai_dispatcher::{RhaiDispatcher, RhaiDispatcherBuilder};
|
||||
use log::{error, info};
|
||||
use std::io::{self, Write};
|
||||
use std::time::Duration;
|
||||
@@ -46,8 +46,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
|
||||
// Configure logging based on verbosity level
|
||||
let log_config = match args.verbose {
|
||||
0 => "warn,circles_client=info,rhai_client=info",
|
||||
1 => "info,circles_client=debug,rhai_client=debug",
|
||||
0 => "warn,circles_client=info,rhai_dispatcher=info",
|
||||
1 => "info,circles_client=debug,rhai_dispatcher=debug",
|
||||
2 => "debug",
|
||||
_ => "trace",
|
||||
};
|
||||
@@ -68,7 +68,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
info!();
|
||||
|
||||
// Create the Rhai client
|
||||
let client = RhaiClientBuilder::new()
|
||||
let client = RhaiDispatcherBuilder::new()
|
||||
.caller_id(&args.caller_public_key)
|
||||
.redis_url(&args.redis_url)
|
||||
.build()?;
|
||||
@@ -97,7 +97,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
}
|
||||
|
||||
async fn execute_script(
|
||||
client: &RhaiClient,
|
||||
client: &RhaiDispatcher,
|
||||
worker_key: &str,
|
||||
script: String,
|
||||
timeout_secs: u64,
|
||||
@@ -134,7 +134,7 @@ async fn execute_script(
|
||||
}
|
||||
|
||||
async fn run_interactive_mode(
|
||||
client: &RhaiClient,
|
||||
client: &RhaiDispatcher,
|
||||
worker_key: &str,
|
||||
timeout_secs: u64,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
|
@@ -160,7 +160,7 @@ Modify `setup_and_spawn_circles` to:
|
||||
Update `handle_play` to route to correct worker:
|
||||
```rust
|
||||
// Use circle_public_key from URL path for worker routing
|
||||
rhai_client
|
||||
rhai_dispatcher
|
||||
.new_play_request()
|
||||
.recipient_id(&self.circle_public_key) // From URL path
|
||||
.script_path(&script_content)
|
||||
|
4
examples/wss_demo/Cargo.lock
generated
4
examples/wss_demo/Cargo.lock
generated
@@ -595,7 +595,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"rand 0.8.5",
|
||||
"redis",
|
||||
"rhai_client",
|
||||
"rhai_dispatcher",
|
||||
"rustls",
|
||||
"rustls-pemfile",
|
||||
"secp256k1",
|
||||
@@ -1849,7 +1849,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
|
||||
|
||||
[[package]]
|
||||
name = "rhai_client"
|
||||
name = "rhai_dispatcher"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
|
6
research/launcher/Cargo.lock
generated
6
research/launcher/Cargo.lock
generated
@@ -521,7 +521,7 @@ dependencies = [
|
||||
"env_logger",
|
||||
"log",
|
||||
"redis",
|
||||
"rhai_client",
|
||||
"rhai_dispatcher",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
@@ -1669,7 +1669,7 @@ dependencies = [
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rhai_client"
|
||||
name = "rhai_dispatcher"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
@@ -2514,7 +2514,7 @@ dependencies = [
|
||||
"log",
|
||||
"redis",
|
||||
"rhai",
|
||||
"rhai_client",
|
||||
"rhai_dispatcher",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
|
@@ -28,7 +28,7 @@ rhai = "1.18.0"
|
||||
heromodels = { path = "../../../db/heromodels" }
|
||||
rhailib_engine = { path = "../../../rhailib/src/engine" }
|
||||
rhailib_worker = { path = "../../../rhailib/src/worker" }
|
||||
rhai_client = { path = "../../../rhailib/src/client" }
|
||||
rhai_dispatcher = { path = "../../../rhailib/src/dispatcher" }
|
||||
ourdb = { path = "../../../db/ourdb" } # Added for IdSequence
|
||||
sal-service-manager = { path = "../../../sal/service_manager" }
|
||||
tokio-tungstenite = "0.23"
|
||||
|
@@ -128,7 +128,7 @@ When a circle configuration includes an initialization script:
|
||||
|
||||
1. Worker starts and connects to Redis
|
||||
2. Launcher waits 2 seconds for worker startup
|
||||
3. Launcher sends script content via RhaiClient to worker's queue
|
||||
3. Launcher sends script content via RhaiDispatcher to worker's queue
|
||||
4. Worker executes the initialization script
|
||||
|
||||
## Configuration
|
||||
|
@@ -42,7 +42,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Wait a moment for the launcher to start services
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
|
||||
let client = rhai_client::RhaiClientBuilder::new()
|
||||
let client = rhai_dispatcher::RhaiDispatcherBuilder::new()
|
||||
.redis_url(REDIS_URL)
|
||||
.caller_id("test_launcher")
|
||||
.build()?;
|
||||
@@ -78,7 +78,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
.await?;
|
||||
println!("Received task details: {:?}", task_details_caller_pk);
|
||||
assert_eq!(task_details_caller_pk.status, "completed");
|
||||
// The caller should be "launcher" as set in the RhaiClient
|
||||
// The caller should be "launcher" as set in the RhaiDispatcher
|
||||
println!("✅ SUCCESS: Worker correctly reported CALLER_PUBLIC_KEY for init script.");
|
||||
|
||||
// Test 3: Simple script execution
|
||||
|
@@ -1,5 +1,5 @@
|
||||
use log::{info, debug};
|
||||
use rhai_client::RhaiClientBuilder;
|
||||
use rhai_dispatcher::RhaiDispatcherBuilder;
|
||||
use sal_service_manager::{ServiceConfig as ServiceManagerConfig, ServiceStatus};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
@@ -217,8 +217,8 @@ async fn send_init_script_to_worker(
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
println!("Sending initialization script '{}' to worker for circle: {}", init_script, public_key);
|
||||
|
||||
// Create RhaiClient and send script
|
||||
let client = RhaiClientBuilder::new()
|
||||
// Create RhaiDispatcher and send script
|
||||
let client = RhaiDispatcherBuilder::new()
|
||||
.redis_url(redis_url)
|
||||
.caller_id("launcher")
|
||||
.build()?;
|
||||
|
@@ -10,6 +10,7 @@ The `client_ws` is built on the following principles:
|
||||
- **Modularity**: The crate is divided into logical modules, with a clear separation of concerns between the main client logic, authentication procedures, and cryptographic utilities.
|
||||
- **Asynchronous Operations**: All network I/O is asynchronous, using `async/await` to ensure the client is non-blocking and efficient.
|
||||
- **Fluent Configuration**: A builder pattern (`CircleWsClientBuilder`) is used for clear and flexible client construction.
|
||||
- **Self-Managing Clients**: Each `CircleWsClient` handles its own lifecycle including connection, authentication, keep-alive, and reconnection logic internally.
|
||||
|
||||
## 2. Cross-Platform Implementation
|
||||
|
||||
@@ -29,9 +30,19 @@ The `client_ws` crate is organized into the following key modules:
|
||||
- **`types.rs`**: Defines the core data structures used in authentication, such as `AuthError` and `AuthCredentials`.
|
||||
- **`crypto_utils.rs`**: A self-contained utility module for handling all `secp256k1` cryptographic operations, including key generation, public key derivation, and message signing.
|
||||
|
||||
## 4. Authentication Flow Deep Dive
|
||||
## 4. Self-Managing Client Architecture
|
||||
|
||||
The `authenticate` method in `CircleWsClient` orchestrates the entire authentication process over the WebSocket connection. The sequence diagram below illustrates the internal interactions within the client during this flow.
|
||||
Each `CircleWsClient` is designed to be completely self-managing, handling its entire lifecycle internally. This includes:
|
||||
|
||||
- **Connection Management**: Establishing and maintaining WebSocket connections
|
||||
- **Authentication**: Automatic secp256k1 authentication flow when private keys are provided
|
||||
- **Keep-Alive**: Periodic health checks to ensure connection stability
|
||||
- **Reconnection**: Automatic reconnection with exponential backoff on connection failures
|
||||
- **Connection Status Tracking**: Internal state management for connection health
|
||||
|
||||
### Connection Flow
|
||||
|
||||
The `connect()` method orchestrates the complete connection and authentication process:
|
||||
|
||||
```mermaid
|
||||
sequenceDiagram
|
||||
@@ -46,27 +57,38 @@ sequenceDiagram
|
||||
User->>+Builder: build()
|
||||
Builder-->>-User: client
|
||||
|
||||
User->>+Client: authenticate()
|
||||
Client->>Client: Check for private_key
|
||||
Client->>+CryptoUtils: derive_public_key(private_key)
|
||||
CryptoUtils-->>-Client: public_key
|
||||
User->>+Client: connect()
|
||||
|
||||
Note over Client: Self-managing connection process
|
||||
Client->>Client: Establish WebSocket connection
|
||||
Client->>Client: Start keep-alive loop
|
||||
Client->>Client: Start reconnection handler
|
||||
|
||||
alt Has Private Key
|
||||
Client->>Client: Check for private_key
|
||||
Client->>+CryptoUtils: derive_public_key(private_key)
|
||||
CryptoUtils-->>-Client: public_key
|
||||
|
||||
Note over Client: Request nonce via WebSocket
|
||||
Client->>+WsActor: JSON-RPC "fetch_nonce" (pubkey)
|
||||
WsActor-->>-Client: JSON-RPC Response (nonce)
|
||||
Note over Client: Request nonce via WebSocket
|
||||
Client->>+WsActor: JSON-RPC "fetch_nonce" (pubkey)
|
||||
WsActor-->>-Client: JSON-RPC Response (nonce)
|
||||
|
||||
Client->>+CryptoUtils: sign_message(private_key, nonce)
|
||||
CryptoUtils-->>-Client: signature
|
||||
Client->>+CryptoUtils: sign_message(private_key, nonce)
|
||||
CryptoUtils-->>-Client: signature
|
||||
|
||||
Note over Client: Send credentials via WebSocket
|
||||
Client->>+WsActor: JSON-RPC "authenticate" (pubkey, signature)
|
||||
WsActor-->>-Client: JSON-RPC Response (authenticated: true/false)
|
||||
|
||||
alt Authentication Successful
|
||||
Client-->>-User: Ok(true)
|
||||
else Authentication Fails
|
||||
Client-->>-User: Ok(false) or Err(...)
|
||||
Note over Client: Send credentials via WebSocket
|
||||
Client->>+WsActor: JSON-RPC "authenticate" (pubkey, signature)
|
||||
WsActor-->>-Client: JSON-RPC Response (authenticated: true/false)
|
||||
end
|
||||
|
||||
Client-->>-User: Connection established and authenticated
|
||||
```
|
||||
|
||||
This architecture ensures that the cryptographic operations are isolated, the platform-specific code is cleanly separated, and the main client struct provides a simple and consistent API to the end-user.
|
||||
### Self-Management Features
|
||||
|
||||
- **Automatic Keep-Alive**: Each client runs its own keep-alive loop to detect connection issues
|
||||
- **Transparent Reconnection**: Failed connections are automatically retried with exponential backoff
|
||||
- **Status Monitoring**: Connection status is tracked internally and can be queried via `is_connected()`
|
||||
- **Resource Cleanup**: Proper cleanup of resources when clients are dropped
|
||||
|
||||
This architecture ensures that the cryptographic operations are isolated, the platform-specific code is cleanly separated, and each client is completely autonomous in managing its connection lifecycle.
|
4
src/client_ws/Cargo.lock
generated
4
src/client_ws/Cargo.lock
generated
@@ -595,7 +595,7 @@ dependencies = [
|
||||
"log",
|
||||
"once_cell",
|
||||
"redis",
|
||||
"rhai_client",
|
||||
"rhai_dispatcher",
|
||||
"rustls",
|
||||
"rustls-pemfile",
|
||||
"serde",
|
||||
@@ -1765,7 +1765,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
|
||||
|
||||
[[package]]
|
||||
name = "rhai_client"
|
||||
name = "rhai_dispatcher"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
|
@@ -21,6 +21,7 @@ http = "0.2"
|
||||
# Authentication dependencies
|
||||
hex = { workspace = true }
|
||||
rand = { workspace = true }
|
||||
getrandom = { version = "0.2", features = ["js"] }
|
||||
|
||||
# Optional crypto dependencies (enabled by default)
|
||||
secp256k1 = { workspace = true, optional = true }
|
||||
@@ -32,6 +33,7 @@ circle_ws_lib = { path = "../server", optional = true }
|
||||
# WASM-specific dependencies
|
||||
[target.'cfg(target_arch = "wasm32")'.dependencies]
|
||||
gloo-net = { version = "0.4.0", features = ["websocket"] }
|
||||
gloo-timers = { version = "0.3.0", features = ["futures"] }
|
||||
wasm-bindgen-futures = "0.4"
|
||||
gloo-console = "0.3.0"
|
||||
wasm-bindgen = "0.2"
|
||||
|
@@ -1,14 +1,26 @@
|
||||
# Circle WebSocket Client
|
||||
|
||||
A Rust library for connecting to Circle WebSocket servers with authentication support.
|
||||
A Rust library for connecting to Circle WebSocket servers with authentication support and self-managing connection lifecycle.
|
||||
|
||||
## Features
|
||||
|
||||
- Cross-platform WebSocket client (native and WASM)
|
||||
- secp256k1 cryptographic authentication
|
||||
- JSON-RPC 2.0 protocol support
|
||||
- Async/await interface with Tokio
|
||||
- Built on tokio-tungstenite for reliable WebSocket connections
|
||||
- **Cross-platform WebSocket client** (native and WASM)
|
||||
- **secp256k1 cryptographic authentication** with automatic challenge-response flow
|
||||
- **JSON-RPC 2.0 protocol support** for server communication
|
||||
- **Self-managing connections** with automatic keep-alive and reconnection
|
||||
- **Async/await interface** with modern Rust async patterns
|
||||
- **Built on tokio-tungstenite** for reliable WebSocket connections (native)
|
||||
- **Built on gloo-net** for WASM browser compatibility
|
||||
|
||||
## Architecture
|
||||
|
||||
Each `CircleWsClient` is completely self-managing:
|
||||
|
||||
- **Automatic Connection Management**: Handles WebSocket connection establishment
|
||||
- **Built-in Authentication**: Seamless secp256k1 authentication when private keys are provided
|
||||
- **Keep-Alive Monitoring**: Periodic health checks to detect connection issues
|
||||
- **Transparent Reconnection**: Automatic reconnection with exponential backoff on failures
|
||||
- **Connection Status Tracking**: Real-time connection state monitoring
|
||||
|
||||
## Usage
|
||||
|
||||
@@ -19,7 +31,7 @@ Add this to your `Cargo.toml`:
|
||||
circle_client_ws = { path = "../client_ws" }
|
||||
```
|
||||
|
||||
### Basic Example
|
||||
### Basic Example (Self-Managing Connection)
|
||||
|
||||
```rust
|
||||
use circle_client_ws::CircleWsClientBuilder;
|
||||
@@ -28,27 +40,73 @@ use circle_client_ws::CircleWsClientBuilder;
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Create client with private key
|
||||
let private_key = "your_private_key_hex";
|
||||
let client = CircleWsClientBuilder::new()
|
||||
.with_private_key(private_key)?
|
||||
let mut client = CircleWsClientBuilder::new("ws://localhost:8080".to_string())
|
||||
.with_keypair(private_key.to_string())
|
||||
.build();
|
||||
|
||||
// Connect and authenticate
|
||||
client.connect("ws://localhost:8080").await?;
|
||||
// Connect - this handles authentication, keep-alive, and reconnection automatically
|
||||
client.connect().await?;
|
||||
|
||||
// Use the authenticated client...
|
||||
// Check connection status
|
||||
println!("Connected: {}", client.is_connected());
|
||||
|
||||
// Execute scripts on the server
|
||||
let result = client.play("\"Hello from client!\"".to_string()).await?;
|
||||
println!("Script result: {:?}", result);
|
||||
|
||||
// Client automatically maintains connection in the background
|
||||
// No manual keep-alive or reconnection logic needed
|
||||
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
### Authentication Flow
|
||||
### Self-Managing Features
|
||||
|
||||
The client automatically handles the secp256k1 authentication flow:
|
||||
1. Connects to WebSocket server
|
||||
2. Receives authentication challenge
|
||||
3. Signs challenge with private key
|
||||
4. Sends signed response
|
||||
5. Receives authentication confirmation
|
||||
The client automatically handles:
|
||||
|
||||
1. **Connection Establishment**: WebSocket connection to the server
|
||||
2. **Authentication Flow**: secp256k1 challenge-response authentication
|
||||
3. **Keep-Alive Monitoring**: Periodic health checks to ensure connection stability
|
||||
4. **Automatic Reconnection**: Transparent reconnection on connection failures
|
||||
5. **Resource Management**: Proper cleanup when the client is dropped
|
||||
|
||||
### Connection Status Monitoring
|
||||
|
||||
```rust
|
||||
// Check if the client is currently connected
|
||||
if client.is_connected() {
|
||||
println!("Client is connected and healthy");
|
||||
} else {
|
||||
println!("Client is disconnected or reconnecting");
|
||||
}
|
||||
|
||||
// Get detailed connection status
|
||||
let status = client.get_connection_status();
|
||||
println!("Connection status: {}", status);
|
||||
```
|
||||
|
||||
### WASM Usage
|
||||
|
||||
For WASM applications, the client works seamlessly in browsers:
|
||||
|
||||
```rust
|
||||
use circle_client_ws::CircleWsClientBuilder;
|
||||
use wasm_bindgen_futures::spawn_local;
|
||||
|
||||
// In a WASM context
|
||||
spawn_local(async move {
|
||||
let mut client = CircleWsClientBuilder::new("ws://localhost:8080".to_string())
|
||||
.build();
|
||||
|
||||
// Self-managing connection works the same in WASM
|
||||
if let Ok(_) = client.connect().await {
|
||||
// Client automatically handles keep-alive and reconnection
|
||||
let result = client.play("\"WASM client connected!\"".to_string()).await;
|
||||
// Handle result...
|
||||
}
|
||||
});
|
||||
```
|
||||
|
||||
## Binary Tool
|
||||
|
||||
@@ -56,12 +114,28 @@ A command-line binary is also available for interactive use and script execution
|
||||
|
||||
## Platform Support
|
||||
|
||||
- **Native**: Full support on all Rust-supported platforms
|
||||
- **WASM**: Browser support with web-sys bindings
|
||||
- **Native**: Full support on all Rust-supported platforms with tokio-tungstenite
|
||||
- **WASM**: Browser support with gloo-net WebSocket bindings
|
||||
|
||||
## Dependencies
|
||||
|
||||
- `tokio-tungstenite`: WebSocket implementation
|
||||
- `secp256k1`: Cryptographic operations
|
||||
- `serde`: JSON serialization
|
||||
- `uuid`: Request ID generation
|
||||
### Core Dependencies
|
||||
- `serde`: JSON serialization and deserialization
|
||||
- `uuid`: Request ID generation for JSON-RPC
|
||||
- `futures-util`: Async utilities for WebSocket handling
|
||||
- `thiserror`: Error handling and propagation
|
||||
|
||||
### Platform-Specific Dependencies
|
||||
|
||||
#### Native (tokio-based)
|
||||
- `tokio-tungstenite`: Robust WebSocket implementation
|
||||
- `tokio`: Async runtime for connection management
|
||||
|
||||
#### WASM (browser-based)
|
||||
- `gloo-net`: WebSocket bindings for browsers
|
||||
- `gloo-timers`: Timer utilities for keep-alive functionality
|
||||
- `wasm-bindgen-futures`: Async support in WASM
|
||||
|
||||
### Cryptographic Dependencies (optional)
|
||||
- `secp256k1`: Elliptic curve cryptography for authentication
|
||||
- `sha3`: Hashing for cryptographic operations
|
||||
|
@@ -1,12 +1,12 @@
|
||||
# Circles WebSocket Client Binary
|
||||
# Circles WebSocket Client
|
||||
|
||||
A command-line WebSocket client for connecting to Circles servers with authentication support.
|
||||
A WebSocket client for connecting to Circles servers with authentication support. Available in both CLI and WebAssembly (WASM) versions.
|
||||
|
||||
## Binary: `circles_client`
|
||||
## CLI Usage
|
||||
|
||||
### Installation
|
||||
|
||||
Build the binary:
|
||||
Build the CLI binary:
|
||||
```bash
|
||||
cargo build --bin circles_client --release
|
||||
```
|
||||
@@ -49,6 +49,53 @@ circles_client -vv ws://localhost:8080
|
||||
- **Verbosity Control**: Use `-v` flags to increase logging detail
|
||||
- **Cross-platform**: Works on all platforms supported by Rust and tokio-tungstenite
|
||||
|
||||
## WebAssembly (WASM) Usage
|
||||
|
||||
### Build and Serve
|
||||
|
||||
1. Install Trunk:
|
||||
```bash
|
||||
cargo install trunk
|
||||
```
|
||||
|
||||
2. Build the WASM version:
|
||||
```bash
|
||||
trunk build --release
|
||||
```
|
||||
|
||||
3. Serve the application:
|
||||
```bash
|
||||
trunk serve
|
||||
```
|
||||
|
||||
The application will be available at `http://localhost:8080`
|
||||
|
||||
### Usage in Browser
|
||||
|
||||
1. Open the served page in your browser
|
||||
2. Enter the WebSocket server URL
|
||||
3. Choose either:
|
||||
- Execute a Rhai script directly
|
||||
- Enter interactive mode (type 'exit' or 'quit' to leave)
|
||||
|
||||
### Features
|
||||
|
||||
- **Browser Integration**: Uses browser's WebSocket implementation
|
||||
- **Interactive Mode**: Browser-based input/output using prompts
|
||||
- **Error Handling**: Browser console logging
|
||||
- **Cross-browser**: Works in all modern browsers supporting WebAssembly
|
||||
|
||||
## Common Features
|
||||
|
||||
Both versions share the same core functionality:
|
||||
|
||||
- **WebSocket Connection**: Connects to Circles WebSocket server
|
||||
- **Authentication**: Handles secp256k1 authentication
|
||||
- **Script Execution**: Executes Rhai scripts
|
||||
- **Interactive Mode**: Provides REPL-like interface
|
||||
- **Error Handling**: Comprehensive error reporting
|
||||
- **Logging**: Detailed logging at different verbosity levels
|
||||
|
||||
### Interactive Mode
|
||||
|
||||
When run without `-s` or `-f` flags, the client enters interactive mode where you can:
|
||||
|
@@ -1,12 +1,30 @@
|
||||
#![cfg_attr(target_arch = "wasm32", no_main)]
|
||||
|
||||
use circle_client_ws::CircleWsClientBuilder;
|
||||
use clap::{Arg, ArgAction, Command};
|
||||
use dotenv::dotenv;
|
||||
use env_logger;
|
||||
use log::{error, info};
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use std::env;
|
||||
use std::io::{self, Write};
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use std::path::Path;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use std::io::{self, Write};
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
use wasm_bindgen::prelude::*;
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
use web_sys::{console, window};
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
use wasm_bindgen_futures::spawn_local;
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use clap::{Arg, ArgAction, Command};
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use dotenv::dotenv;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use env_logger;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use tokio;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use log::{error, info};
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Args {
|
||||
@@ -17,6 +35,7 @@ struct Args {
|
||||
no_timestamp: bool,
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
fn parse_args() -> Args {
|
||||
let matches = Command::new("circles_client")
|
||||
.version("0.1.0")
|
||||
@@ -67,6 +86,7 @@ fn parse_args() -> Args {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
fn setup_logging(verbose: u8, no_timestamp: bool) {
|
||||
let log_level = match verbose {
|
||||
0 => "warn,circle_client_ws=info",
|
||||
@@ -87,6 +107,7 @@ fn setup_logging(verbose: u8, no_timestamp: bool) {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
fn load_private_key() -> Result<String, Box<dyn std::error::Error>> {
|
||||
// Try to load from .env file first
|
||||
if let Ok(_) = dotenv() {
|
||||
@@ -107,42 +128,82 @@ fn load_private_key() -> Result<String, Box<dyn std::error::Error>> {
|
||||
Err("PRIVATE_KEY not found in environment or .env files".into())
|
||||
}
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
async fn run_interactive_mode(client: circle_client_ws::CircleWsClient) -> Result<(), Box<dyn std::error::Error>> {
|
||||
info!("Entering interactive mode. Type 'exit' or 'quit' to leave.");
|
||||
println!("🔄 Interactive mode - Enter Rhai scripts (type 'exit' or 'quit' to leave):");
|
||||
console::log_1(&"Entering interactive mode. Type 'exit' or 'quit' to leave.".into());
|
||||
console::log_1(&"🔄 Interactive mode - Enter Rhai scripts (type 'exit' or 'quit' to leave):\n".into());
|
||||
|
||||
loop {
|
||||
print!("rhai> ");
|
||||
io::stdout().flush()?;
|
||||
// In wasm32, we need to use browser's console for input/output
|
||||
let window = window().expect("Window not available");
|
||||
let input = window.prompt_with_message("Enter Rhai script (or 'exit' to quit):")
|
||||
.map_err(|e| format!("Failed to get input: {:#?}", e))? // Use debug formatting
|
||||
.unwrap_or_default();
|
||||
|
||||
let mut input = String::new();
|
||||
io::stdin().read_line(&mut input)?;
|
||||
|
||||
let script = input.trim();
|
||||
|
||||
if script.is_empty() {
|
||||
continue;
|
||||
// Handle empty or exit cases
|
||||
if input == "exit" || input == "quit" {
|
||||
console::log_1(&"👋 Goodbye!".into());
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Execute the script
|
||||
match client.play(input).await {
|
||||
Ok(result) => {
|
||||
console::log_1(&format!("📤 Result: {}", result.output).into());
|
||||
}
|
||||
|
||||
if script == "exit" || script == "quit" {
|
||||
println!("👋 Goodbye!");
|
||||
break;
|
||||
}
|
||||
|
||||
match client.play(script.to_string()).await {
|
||||
Ok(result) => {
|
||||
println!("📤 Result: {}", result.output);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("❌ Script execution failed: {}", e);
|
||||
println!("❌ Error: {}", e);
|
||||
}
|
||||
Err(e) => {
|
||||
console::log_1(&format!("❌ Script execution failed: {}", e).into());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
async fn execute_script(client: circle_client_ws::CircleWsClient, script: String) -> Result<(), Box<dyn std::error::Error>> {
|
||||
console::log_1(&format!("Executing script: {}", script).into());
|
||||
|
||||
match client.play(script).await {
|
||||
Ok(result) => {
|
||||
console::log_1(&result.output.into());
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
console::log_1(&format!("Script execution failed: {}", e).into());
|
||||
Err(e.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
pub async fn start_client(url: &str, script: Option<String>) -> Result<(), Box<dyn std::error::Error>> {
|
||||
// Build client
|
||||
let mut client = CircleWsClientBuilder::new(url.to_string())
|
||||
.build();
|
||||
|
||||
// Connect to WebSocket server
|
||||
console::log_1(&"🔌 Connecting to WebSocket server...".into());
|
||||
if let Err(e) = client.connect().await {
|
||||
console::log_1(&format!("❌ Failed to connect: {}", e).into());
|
||||
return Err(e.into());
|
||||
}
|
||||
console::log_1(&"✅ Connected successfully".into());
|
||||
|
||||
// Authenticate with server
|
||||
if let Err(e) = client.authenticate().await {
|
||||
console::log_1(&format!("❌ Authentication failed: {}", e).into());
|
||||
return Err(e.into());
|
||||
}
|
||||
console::log_1(&"✅ Authentication successful".into());
|
||||
|
||||
// Handle script execution
|
||||
if let Some(script) = script {
|
||||
execute_script(client, script).await
|
||||
} else {
|
||||
run_interactive_mode(client).await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
async fn execute_script(client: circle_client_ws::CircleWsClient, script: String) -> Result<(), Box<dyn std::error::Error>> {
|
||||
info!("Executing script: {}", script);
|
||||
|
||||
@@ -158,11 +219,43 @@ async fn execute_script(client: circle_client_ws::CircleWsClient, script: String
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
async fn load_script_from_file(path: &str) -> Result<String, Box<dyn std::error::Error>> {
|
||||
let script = tokio::fs::read_to_string(path).await?;
|
||||
Ok(script)
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
async fn run_interactive_mode(client: circle_client_ws::CircleWsClient) -> Result<(), Box<dyn std::error::Error>> {
|
||||
println!("\n🔄 Interactive mode - Enter Rhai scripts (type 'exit' or 'quit' to leave):\n");
|
||||
|
||||
loop {
|
||||
print!("Enter Rhai script (or 'exit' to quit): ");
|
||||
io::stdout().flush()?;
|
||||
|
||||
let mut input = String::new();
|
||||
io::stdin().read_line(&mut input)?;
|
||||
let input = input.trim().to_string();
|
||||
|
||||
if input == "exit" || input == "quit" {
|
||||
println!("\n👋 Goodbye!");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
match client.play(input).await {
|
||||
Ok(result) => {
|
||||
println!("\n📤 Result: {}", result.output);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("❌ Script execution failed: {}", e);
|
||||
println!("\n❌ Script execution failed: {}", e);
|
||||
}
|
||||
}
|
||||
println!();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let args = parse_args();
|
||||
|
@@ -148,6 +148,7 @@ impl CircleWsClientBuilder {
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
task_handle: None,
|
||||
private_key: self.private_key,
|
||||
is_connected: Arc::new(Mutex::new(false)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -158,24 +159,61 @@ pub struct CircleWsClient {
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
task_handle: Option<tokio::task::JoinHandle<()>>,
|
||||
private_key: Option<String>,
|
||||
is_connected: Arc<Mutex<bool>>,
|
||||
}
|
||||
|
||||
impl CircleWsClient {
|
||||
/// Get the connection status
|
||||
pub fn get_connection_status(&self) -> String {
|
||||
if *self.is_connected.lock().unwrap() {
|
||||
"Connected".to_string()
|
||||
} else {
|
||||
"Disconnected".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if the client is connected
|
||||
pub fn is_connected(&self) -> bool {
|
||||
*self.is_connected.lock().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl CircleWsClient {
|
||||
pub async fn authenticate(&mut self) -> Result<bool, CircleWsClientError> {
|
||||
info!("🔐 [{}] Starting authentication process...", self.ws_url);
|
||||
|
||||
let private_key = self
|
||||
.private_key
|
||||
.as_ref()
|
||||
.ok_or(CircleWsClientError::AuthNoKeyPair)?;
|
||||
|
||||
info!("🔑 [{}] Deriving public key from private key...", self.ws_url);
|
||||
let public_key = auth::derive_public_key(private_key)?;
|
||||
info!("✅ [{}] Public key derived: {}...", self.ws_url, &public_key[..8]);
|
||||
|
||||
info!("🎫 [{}] Fetching authentication nonce...", self.ws_url);
|
||||
let nonce = self.fetch_nonce(&public_key).await?;
|
||||
let signature = auth::sign_message(private_key, &nonce)?;
|
||||
info!("✅ [{}] Nonce received: {}...", self.ws_url, &nonce[..8]);
|
||||
|
||||
self.authenticate_with_signature(&public_key, &signature)
|
||||
.await
|
||||
info!("✍️ [{}] Signing nonce with private key...", self.ws_url);
|
||||
let signature = auth::sign_message(private_key, &nonce)?;
|
||||
info!("✅ [{}] Signature created: {}...", self.ws_url, &signature[..8]);
|
||||
|
||||
info!("🔒 [{}] Submitting authentication credentials...", self.ws_url);
|
||||
let result = self.authenticate_with_signature(&public_key, &signature).await?;
|
||||
|
||||
if result {
|
||||
info!("🎉 [{}] Authentication successful!", self.ws_url);
|
||||
} else {
|
||||
error!("❌ [{}] Authentication failed - server rejected credentials", self.ws_url);
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn fetch_nonce(&self, pubkey: &str) -> Result<String, CircleWsClientError> {
|
||||
info!("📡 [{}] Sending fetch_nonce request for pubkey: {}...", self.ws_url, &pubkey[..8]);
|
||||
|
||||
let params = FetchNonceParams {
|
||||
pubkey: pubkey.to_string(),
|
||||
};
|
||||
@@ -183,6 +221,7 @@ impl CircleWsClient {
|
||||
let res = self.send_request(req).await?;
|
||||
|
||||
if let Some(err) = res.error {
|
||||
error!("❌ [{}] fetch_nonce failed: {} (code: {})", self.ws_url, err.message, err.code);
|
||||
return Err(CircleWsClientError::JsonRpcError {
|
||||
code: err.code,
|
||||
message: err.message,
|
||||
@@ -191,6 +230,7 @@ impl CircleWsClient {
|
||||
}
|
||||
|
||||
let nonce_res: FetchNonceResponse = serde_json::from_value(res.result.unwrap_or_default())?;
|
||||
info!("✅ [{}] fetch_nonce successful, nonce length: {}", self.ws_url, nonce_res.nonce.len());
|
||||
Ok(nonce_res.nonce)
|
||||
}
|
||||
|
||||
@@ -199,6 +239,8 @@ impl CircleWsClient {
|
||||
pubkey: &str,
|
||||
signature: &str,
|
||||
) -> Result<bool, CircleWsClientError> {
|
||||
info!("📡 [{}] Sending authenticate request with signature...", self.ws_url);
|
||||
|
||||
let params = AuthCredentialsParams {
|
||||
pubkey: pubkey.to_string(),
|
||||
signature: signature.to_string(),
|
||||
@@ -207,6 +249,7 @@ impl CircleWsClient {
|
||||
let res = self.send_request(req).await?;
|
||||
|
||||
if let Some(err) = res.error {
|
||||
error!("❌ [{}] authenticate failed: {} (code: {})", self.ws_url, err.message, err.code);
|
||||
return Err(CircleWsClientError::JsonRpcError {
|
||||
code: err.code,
|
||||
message: err.message,
|
||||
@@ -214,10 +257,18 @@ impl CircleWsClient {
|
||||
});
|
||||
}
|
||||
|
||||
Ok(res
|
||||
let authenticated = res
|
||||
.result
|
||||
.and_then(|v| v.get("authenticated").and_then(|v| v.as_bool()))
|
||||
.unwrap_or(false))
|
||||
.unwrap_or(false);
|
||||
|
||||
if authenticated {
|
||||
info!("✅ [{}] authenticate request successful - server confirmed authentication", self.ws_url);
|
||||
} else {
|
||||
error!("❌ [{}] authenticate request failed - server returned false", self.ws_url);
|
||||
}
|
||||
|
||||
Ok(authenticated)
|
||||
}
|
||||
|
||||
fn create_request<T: Serialize>(
|
||||
@@ -275,17 +326,19 @@ impl CircleWsClient {
|
||||
|
||||
pub async fn connect(&mut self) -> Result<(), CircleWsClientError> {
|
||||
if self.internal_tx.is_some() {
|
||||
info!("Client already connected or connecting.");
|
||||
info!("🔄 [{}] Client already connected or connecting", self.ws_url);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
info!("🚀 [{}] Starting self-managed WebSocket connection with keep-alive and reconnection...", self.ws_url);
|
||||
let (internal_tx, internal_rx) = mpsc::channel::<InternalWsMessage>(32);
|
||||
self.internal_tx = Some(internal_tx);
|
||||
|
||||
// Use the URL as provided - support both ws:// and wss://
|
||||
// Clone necessary data for the task
|
||||
let connection_url = self.ws_url.clone();
|
||||
let is_secure = connection_url.starts_with("wss://");
|
||||
info!("🔗 Connecting to WebSocket: {} ({})", connection_url, if is_secure { "WSS/TLS" } else { "WS/Plain" });
|
||||
let private_key = self.private_key.clone();
|
||||
let is_connected = self.is_connected.clone();
|
||||
info!("🔗 [{}] Will handle connection, authentication, keep-alive, and reconnection internally", connection_url);
|
||||
|
||||
// Pending requests: map request_id to a oneshot sender for the response
|
||||
let pending_requests: Arc<
|
||||
@@ -301,191 +354,135 @@ impl CircleWsClient {
|
||||
let log_url = connection_url.clone();
|
||||
|
||||
let task = async move {
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
let ws_result = WebSocket::open(&connection_url);
|
||||
// Main connection loop with reconnection logic
|
||||
loop {
|
||||
info!("🔄 [{}] Starting connection attempt...", log_url);
|
||||
|
||||
// Reset connection status
|
||||
*is_connected.lock().unwrap() = false;
|
||||
|
||||
// Clone connection_url for this iteration to avoid move issues
|
||||
let connection_url_clone = connection_url.clone();
|
||||
|
||||
// Establish WebSocket connection
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
let ws_result = WebSocket::open(&connection_url_clone);
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
let connect_attempt = async {
|
||||
// Check if this is a secure WebSocket connection
|
||||
if connection_url.starts_with("wss://") {
|
||||
// For WSS connections, use a custom TLS connector that accepts self-signed certificates
|
||||
// This is for development/demo purposes only
|
||||
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
|
||||
|
||||
let request = connection_url.into_client_request()
|
||||
.map_err(|e| CircleWsClientError::ConnectionError(format!("Invalid URL: {}", e)))?;
|
||||
|
||||
// Create a native-tls connector that accepts invalid certificates (for development)
|
||||
let tls_connector = native_tls::TlsConnector::builder()
|
||||
.danger_accept_invalid_certs(true)
|
||||
.danger_accept_invalid_hostnames(true)
|
||||
.build()
|
||||
.map_err(|e| CircleWsClientError::ConnectionError(format!("TLS connector creation failed: {}", e)))?;
|
||||
|
||||
let connector = Connector::NativeTls(tls_connector);
|
||||
|
||||
warn!("⚠️ DEVELOPMENT MODE: Accepting self-signed certificates (NOT for production!)");
|
||||
connect_async_tls_with_config(request, None, false, Some(connector))
|
||||
.await
|
||||
.map_err(|e| CircleWsClientError::ConnectionError(format!("WSS connection failed: {}", e)))
|
||||
} else {
|
||||
// For regular WS connections, use the standard method
|
||||
connect_async(&connection_url)
|
||||
.await
|
||||
.map_err(|e| CircleWsClientError::ConnectionError(format!("WS connection failed: {}", e)))
|
||||
}
|
||||
};
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
let ws_result = connect_attempt.await;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
let connect_attempt = async {
|
||||
// Check if this is a secure WebSocket connection
|
||||
if connection_url_clone.starts_with("wss://") {
|
||||
// For WSS connections, use a custom TLS connector that accepts self-signed certificates
|
||||
// This is for development/demo purposes only
|
||||
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
|
||||
|
||||
let request = connection_url_clone.as_str().into_client_request()
|
||||
.map_err(|e| CircleWsClientError::ConnectionError(format!("Invalid URL: {}", e)))?;
|
||||
|
||||
// Create a native-tls connector that accepts invalid certificates (for development)
|
||||
let tls_connector = native_tls::TlsConnector::builder()
|
||||
.danger_accept_invalid_certs(true)
|
||||
.danger_accept_invalid_hostnames(true)
|
||||
.build()
|
||||
.map_err(|e| CircleWsClientError::ConnectionError(format!("TLS connector creation failed: {}", e)))?;
|
||||
|
||||
let connector = Connector::NativeTls(tls_connector);
|
||||
|
||||
warn!("⚠️ DEVELOPMENT MODE: Accepting self-signed certificates (NOT for production!)");
|
||||
connect_async_tls_with_config(request, None, false, Some(connector))
|
||||
.await
|
||||
.map_err(|e| CircleWsClientError::ConnectionError(format!("WSS connection failed: {}", e)))
|
||||
} else {
|
||||
// For regular WS connections, use the standard method
|
||||
connect_async(&connection_url_clone)
|
||||
.await
|
||||
.map_err(|e| CircleWsClientError::ConnectionError(format!("WS connection failed: {}", e)))
|
||||
}
|
||||
};
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
let ws_result = connect_attempt.await;
|
||||
|
||||
match ws_result {
|
||||
Ok(ws_conn_maybe_response) => {
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
let ws_conn = ws_conn_maybe_response;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
let (ws_conn, _) = ws_conn_maybe_response;
|
||||
|
||||
match ws_result {
|
||||
Ok(ws_conn_maybe_response) => {
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
let ws_conn = ws_conn_maybe_response;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
let (ws_conn, _) = ws_conn_maybe_response;
|
||||
|
||||
info!("Successfully connected to WebSocket: {}", log_url);
|
||||
let (mut ws_tx, mut ws_rx) = ws_conn.split();
|
||||
let mut internal_rx_fused = internal_rx.fuse();
|
||||
|
||||
loop {
|
||||
futures_util::select! {
|
||||
// Handle messages from the client's public methods (e.g., play)
|
||||
internal_msg = internal_rx_fused.next().fuse() => {
|
||||
match internal_msg {
|
||||
Some(InternalWsMessage::SendJsonRpc(req, response_sender)) => {
|
||||
let req_id = req.id.clone();
|
||||
match serde_json::to_string(&req) {
|
||||
Ok(req_str) => {
|
||||
debug!("Sending JSON-RPC request (ID: {}): {}", req_id, req_str);
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
let send_res = ws_tx.send(GlooWsMessage::Text(req_str)).await;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
let send_res = ws_tx.send(TungsteniteWsMessage::Text(req_str)).await;
|
||||
|
||||
if let Err(e) = send_res {
|
||||
error!("WebSocket send error for request ID {}: {:?}", req_id, e);
|
||||
let _ = response_sender.send(Err(CircleWsClientError::SendError(e.to_string())));
|
||||
} else {
|
||||
// Store the sender to await the response
|
||||
task_pending_requests.lock().unwrap().insert(req_id, response_sender);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to serialize request ID {}: {}", req_id, e);
|
||||
let _ = response_sender.send(Err(CircleWsClientError::JsonError(e)));
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(InternalWsMessage::Close) => {
|
||||
info!("Close message received internally, closing WebSocket.");
|
||||
let _ = ws_tx.close().await;
|
||||
break;
|
||||
}
|
||||
None => { // internal_rx closed, meaning client was dropped
|
||||
info!("Internal MPSC channel closed, WebSocket task shutting down.");
|
||||
let _ = ws_tx.close().await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
// Handle messages received from the WebSocket server
|
||||
ws_msg_res = ws_rx.next().fuse() => {
|
||||
match ws_msg_res {
|
||||
Some(Ok(msg)) => {
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
match msg {
|
||||
GlooWsMessage::Text(text) => {
|
||||
debug!("Received WebSocket message: {}", text);
|
||||
// ... (parse logic as before)
|
||||
match serde_json::from_str::<JsonRpcResponseClient>(&text) {
|
||||
Ok(response) => {
|
||||
if let Some(sender) = task_pending_requests.lock().unwrap().remove(&response.id) {
|
||||
if let Err(failed_send_val) = sender.send(Ok(response)) {
|
||||
if let Ok(resp_for_log) = failed_send_val { warn!("Failed to send response to waiting task for ID: {}", resp_for_log.id); }
|
||||
else { warn!("Failed to send response to waiting task, and also failed to get original response for logging.");}
|
||||
}
|
||||
} else { warn!("Received response for unknown request ID or unsolicited message: {:?}", response); }
|
||||
}
|
||||
Err(e) => { error!("Failed to parse JSON-RPC response: {}. Raw: {}", e, text); }
|
||||
}
|
||||
}
|
||||
GlooWsMessage::Bytes(_) => {
|
||||
debug!("Received binary WebSocket message (WASM).");
|
||||
}
|
||||
}
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
match msg {
|
||||
TungsteniteWsMessage::Text(text) => {
|
||||
debug!("Received WebSocket message: {}", text);
|
||||
// ... (parse logic as before)
|
||||
match serde_json::from_str::<JsonRpcResponseClient>(&text) {
|
||||
Ok(response) => {
|
||||
if let Some(sender) = task_pending_requests.lock().unwrap().remove(&response.id) {
|
||||
if let Err(failed_send_val) = sender.send(Ok(response)) {
|
||||
if let Ok(resp_for_log) = failed_send_val { warn!("Failed to send response to waiting task for ID: {}", resp_for_log.id); }
|
||||
else { warn!("Failed to send response to waiting task, and also failed to get original response for logging.");}
|
||||
}
|
||||
} else { warn!("Received response for unknown request ID or unsolicited message: {:?}", response); }
|
||||
}
|
||||
Err(e) => { error!("Failed to parse JSON-RPC response: {}. Raw: {}", e, text); }
|
||||
}
|
||||
}
|
||||
TungsteniteWsMessage::Binary(_) => {
|
||||
debug!("Received binary WebSocket message (Native).");
|
||||
}
|
||||
TungsteniteWsMessage::Ping(_) | TungsteniteWsMessage::Pong(_) => {
|
||||
debug!("Received Ping/Pong (Native).");
|
||||
}
|
||||
TungsteniteWsMessage::Close(_) => {
|
||||
info!("WebSocket connection closed by server (Native).");
|
||||
break;
|
||||
}
|
||||
TungsteniteWsMessage::Frame(_) => {
|
||||
debug!("Received Frame (Native) - not typically handled directly.");
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!("WebSocket receive error: {:?}", e);
|
||||
break; // Exit loop on receive error
|
||||
}
|
||||
None => { // WebSocket stream closed
|
||||
info!("WebSocket connection closed by server (stream ended).");
|
||||
break;
|
||||
}
|
||||
}
|
||||
// For WASM, WebSocket::open() always succeeds even if server is down
|
||||
// We'll start as "connecting" and detect failures through timeouts
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
info!("🔄 [{}] WebSocket object created, testing actual connectivity...", log_url);
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
{
|
||||
info!("✅ [{}] WebSocket connection established successfully", log_url);
|
||||
*is_connected.lock().unwrap() = true;
|
||||
}
|
||||
|
||||
// Handle authentication if private key is provided
|
||||
let auth_success = if let Some(ref _pk) = private_key {
|
||||
info!("🔐 [{}] Authentication will be handled by separate authenticate() call", log_url);
|
||||
true // For now, assume auth will be handled separately
|
||||
} else {
|
||||
info!("ℹ️ [{}] No private key provided, skipping authentication", log_url);
|
||||
true
|
||||
};
|
||||
|
||||
if auth_success {
|
||||
// Start the main message handling loop with keep-alive
|
||||
let disconnect_reason = Self::handle_connection_with_keepalive(
|
||||
ws_conn,
|
||||
internal_rx,
|
||||
&task_pending_requests,
|
||||
&log_url,
|
||||
&is_connected
|
||||
).await;
|
||||
|
||||
info!("🔌 [{}] Connection ended: {}", log_url, disconnect_reason);
|
||||
|
||||
// Check if this was a manual disconnect
|
||||
if disconnect_reason == "Manual close requested" {
|
||||
break; // Don't reconnect on manual close
|
||||
}
|
||||
|
||||
// If we reach here, we need to recreate internal_rx for the next iteration
|
||||
// But since internal_rx was moved, we need to break out of the loop
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Cleanup pending requests on exit
|
||||
task_pending_requests
|
||||
.lock()
|
||||
.unwrap()
|
||||
.drain()
|
||||
.for_each(|(_, sender)| {
|
||||
let _ = sender.send(Err(CircleWsClientError::ConnectionError(
|
||||
"WebSocket task terminated".to_string(),
|
||||
)));
|
||||
});
|
||||
Err(e) => {
|
||||
error!("❌ [{}] WebSocket connection failed: {:?}", log_url, e);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to connect to WebSocket: {:?}", e);
|
||||
// Notify any waiting senders about the connection failure
|
||||
internal_rx
|
||||
.for_each(|msg| async {
|
||||
if let InternalWsMessage::SendJsonRpc(_, response_sender) = msg {
|
||||
let _ = response_sender
|
||||
.send(Err(CircleWsClientError::ConnectionError(e.to_string())));
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
||||
// Reset connection status
|
||||
*is_connected.lock().unwrap() = false;
|
||||
|
||||
// Wait before reconnecting
|
||||
info!("⏳ [{}] Waiting 5 seconds before reconnection attempt...", log_url);
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
{
|
||||
use gloo_timers::future::TimeoutFuture;
|
||||
TimeoutFuture::new(5_000).await;
|
||||
}
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
{
|
||||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||
}
|
||||
}
|
||||
info!("WebSocket task finished.");
|
||||
|
||||
// Cleanup pending requests on exit
|
||||
task_pending_requests
|
||||
.lock()
|
||||
.unwrap()
|
||||
.drain()
|
||||
.for_each(|(_, sender)| {
|
||||
let _ = sender.send(Err(CircleWsClientError::ConnectionError(
|
||||
"WebSocket task terminated".to_string(),
|
||||
)));
|
||||
});
|
||||
|
||||
info!("🏁 [{}] WebSocket task finished", log_url);
|
||||
};
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
@@ -497,6 +494,261 @@ impl CircleWsClient {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Enhanced connection loop handler with keep-alive
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
async fn handle_connection_with_keepalive(
|
||||
ws_conn: WebSocket,
|
||||
mut internal_rx: mpsc::Receiver<InternalWsMessage>,
|
||||
pending_requests: &Arc<Mutex<HashMap<String, oneshot::Sender<Result<JsonRpcResponseClient, CircleWsClientError>>>>>,
|
||||
log_url: &str,
|
||||
is_connected: &Arc<Mutex<bool>>,
|
||||
) -> String {
|
||||
let (mut ws_tx, mut ws_rx) = ws_conn.split();
|
||||
let mut internal_rx_fused = internal_rx.fuse();
|
||||
|
||||
// Connection validation for WASM - test if connection actually works
|
||||
let mut connection_test_timer = TimeoutFuture::new(2_000).fuse(); // 2 second timeout
|
||||
let mut connection_validated = false;
|
||||
|
||||
// Keep-alive timer - send ping every 30 seconds
|
||||
use gloo_timers::future::TimeoutFuture;
|
||||
let mut keep_alive_timer = TimeoutFuture::new(30_000).fuse();
|
||||
|
||||
// Send initial connection test ping
|
||||
debug!("Sending initial connection test ping to {}", log_url);
|
||||
let test_ping_res = ws_tx.send(GlooWsMessage::Text("ping".to_string())).await;
|
||||
if let Err(e) = test_ping_res {
|
||||
error!("❌ [{}] Initial connection test failed: {:?}", log_url, e);
|
||||
*is_connected.lock().unwrap() = false;
|
||||
return format!("Initial connection test failed: {}", e);
|
||||
}
|
||||
|
||||
loop {
|
||||
futures_util::select! {
|
||||
// Connection test timeout - if no response in 2 seconds, connection failed
|
||||
_ = connection_test_timer => {
|
||||
if !connection_validated {
|
||||
error!("❌ [{}] Connection test failed - no response within 2 seconds", log_url);
|
||||
*is_connected.lock().unwrap() = false;
|
||||
return "Connection test timeout - server not responding".to_string();
|
||||
}
|
||||
}
|
||||
|
||||
// Handle messages from the client's public methods (e.g., play)
|
||||
internal_msg = internal_rx_fused.next().fuse() => {
|
||||
match internal_msg {
|
||||
Some(InternalWsMessage::SendJsonRpc(req, response_sender)) => {
|
||||
let req_id = req.id.clone();
|
||||
match serde_json::to_string(&req) {
|
||||
Ok(req_str) => {
|
||||
debug!("Sending JSON-RPC request (ID: {}): {}", req_id, req_str);
|
||||
let send_res = ws_tx.send(GlooWsMessage::Text(req_str)).await;
|
||||
if let Err(e) = send_res {
|
||||
error!("WebSocket send error for request ID {}: {:?}", req_id, e);
|
||||
// Connection failed - update status
|
||||
*is_connected.lock().unwrap() = false;
|
||||
let _ = response_sender.send(Err(CircleWsClientError::SendError(e.to_string())));
|
||||
} else {
|
||||
// Store the sender to await the response
|
||||
pending_requests.lock().unwrap().insert(req_id, response_sender);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to serialize request ID {}: {}", req_id, e);
|
||||
let _ = response_sender.send(Err(CircleWsClientError::JsonError(e)));
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(InternalWsMessage::Close) => {
|
||||
info!("Close message received internally, closing WebSocket.");
|
||||
let _ = ws_tx.close().await;
|
||||
return "Manual close requested".to_string();
|
||||
}
|
||||
None => {
|
||||
info!("Internal MPSC channel closed, WebSocket task shutting down.");
|
||||
let _ = ws_tx.close().await;
|
||||
return "Internal channel closed".to_string();
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
// Handle messages received from the WebSocket server
|
||||
ws_msg_res = ws_rx.next().fuse() => {
|
||||
match ws_msg_res {
|
||||
Some(Ok(msg)) => {
|
||||
// Any successful message confirms the connection is working
|
||||
if !connection_validated {
|
||||
info!("✅ [{}] WebSocket connection validated - received message from server", log_url);
|
||||
*is_connected.lock().unwrap() = true;
|
||||
connection_validated = true;
|
||||
}
|
||||
|
||||
match msg {
|
||||
GlooWsMessage::Text(text) => {
|
||||
debug!("Received WebSocket message: {}", text);
|
||||
Self::handle_received_message(&text, pending_requests);
|
||||
}
|
||||
GlooWsMessage::Bytes(_) => {
|
||||
debug!("Received binary WebSocket message (WASM).");
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!("WebSocket receive error: {:?}", e);
|
||||
*is_connected.lock().unwrap() = false;
|
||||
return format!("Receive error: {}", e);
|
||||
}
|
||||
None => {
|
||||
info!("WebSocket connection closed by server (stream ended).");
|
||||
*is_connected.lock().unwrap() = false;
|
||||
return "Server closed connection (stream ended)".to_string();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Keep-alive timer - send ping every 30 seconds
|
||||
_ = keep_alive_timer => {
|
||||
// Only send ping if connection is validated
|
||||
if connection_validated {
|
||||
debug!("Sending keep-alive ping to {}", log_url);
|
||||
let ping_str = "ping"; // Send simple plaintext ping
|
||||
|
||||
let send_res = ws_tx.send(GlooWsMessage::Text(ping_str.to_string())).await;
|
||||
if let Err(e) = send_res {
|
||||
warn!("Keep-alive ping failed for {}: {:?}", log_url, e);
|
||||
*is_connected.lock().unwrap() = false;
|
||||
return format!("Keep-alive failed: {}", e);
|
||||
}
|
||||
} else {
|
||||
debug!("Skipping keep-alive ping - connection not yet validated for {}", log_url);
|
||||
}
|
||||
|
||||
// Reset timer
|
||||
keep_alive_timer = TimeoutFuture::new(30_000).fuse();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Enhanced connection loop handler with keep-alive for native targets
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
async fn handle_connection_with_keepalive(
|
||||
ws_conn: tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
|
||||
mut internal_rx: mpsc::Receiver<InternalWsMessage>,
|
||||
pending_requests: &Arc<Mutex<HashMap<String, oneshot::Sender<Result<JsonRpcResponseClient, CircleWsClientError>>>>>,
|
||||
log_url: &str,
|
||||
_is_connected: &Arc<Mutex<bool>>,
|
||||
) -> String {
|
||||
let (mut ws_tx, mut ws_rx) = ws_conn.split();
|
||||
let mut internal_rx_fused = internal_rx.fuse();
|
||||
|
||||
loop {
|
||||
futures_util::select! {
|
||||
// Handle messages from the client's public methods (e.g., play)
|
||||
internal_msg = internal_rx_fused.next().fuse() => {
|
||||
match internal_msg {
|
||||
Some(InternalWsMessage::SendJsonRpc(req, response_sender)) => {
|
||||
let req_id = req.id.clone();
|
||||
match serde_json::to_string(&req) {
|
||||
Ok(req_str) => {
|
||||
debug!("Sending JSON-RPC request (ID: {}): {}", req_id, req_str);
|
||||
let send_res = ws_tx.send(TungsteniteWsMessage::Text(req_str)).await;
|
||||
if let Err(e) = send_res {
|
||||
error!("WebSocket send error for request ID {}: {:?}", req_id, e);
|
||||
let _ = response_sender.send(Err(CircleWsClientError::SendError(e.to_string())));
|
||||
} else {
|
||||
// Store the sender to await the response
|
||||
pending_requests.lock().unwrap().insert(req_id, response_sender);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to serialize request ID {}: {}", req_id, e);
|
||||
let _ = response_sender.send(Err(CircleWsClientError::JsonError(e)));
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(InternalWsMessage::Close) => {
|
||||
info!("Close message received internally, closing WebSocket.");
|
||||
let _ = ws_tx.close().await;
|
||||
return "Manual close requested".to_string();
|
||||
}
|
||||
None => {
|
||||
info!("Internal MPSC channel closed, WebSocket task shutting down.");
|
||||
let _ = ws_tx.close().await;
|
||||
return "Internal channel closed".to_string();
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
// Handle messages received from the WebSocket server
|
||||
ws_msg_res = ws_rx.next().fuse() => {
|
||||
match ws_msg_res {
|
||||
Some(Ok(msg)) => {
|
||||
match msg {
|
||||
TungsteniteWsMessage::Text(text) => {
|
||||
debug!("Received WebSocket message: {}", text);
|
||||
Self::handle_received_message(&text, pending_requests);
|
||||
}
|
||||
TungsteniteWsMessage::Binary(_) => {
|
||||
debug!("Received binary WebSocket message (Native).");
|
||||
}
|
||||
TungsteniteWsMessage::Ping(_) | TungsteniteWsMessage::Pong(_) => {
|
||||
debug!("Received Ping/Pong (Native).");
|
||||
}
|
||||
TungsteniteWsMessage::Close(_) => {
|
||||
info!("WebSocket connection closed by server (Native).");
|
||||
return "Server closed connection".to_string();
|
||||
}
|
||||
TungsteniteWsMessage::Frame(_) => {
|
||||
debug!("Received Frame (Native) - not typically handled directly.");
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!("WebSocket receive error: {:?}", e);
|
||||
return format!("Receive error: {}", e);
|
||||
}
|
||||
None => {
|
||||
info!("WebSocket connection closed by server (stream ended).");
|
||||
return "Server closed connection (stream ended)".to_string();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Helper method to handle received messages
|
||||
fn handle_received_message(
|
||||
text: &str,
|
||||
pending_requests: &Arc<Mutex<HashMap<String, oneshot::Sender<Result<JsonRpcResponseClient, CircleWsClientError>>>>>,
|
||||
) {
|
||||
// Handle ping/pong messages - these are not JSON-RPC
|
||||
if text.trim() == "pong" {
|
||||
debug!("Received pong response");
|
||||
return;
|
||||
}
|
||||
|
||||
match serde_json::from_str::<JsonRpcResponseClient>(text) {
|
||||
Ok(response) => {
|
||||
if let Some(sender) = pending_requests.lock().unwrap().remove(&response.id) {
|
||||
if let Err(failed_send_val) = sender.send(Ok(response)) {
|
||||
if let Ok(resp_for_log) = failed_send_val {
|
||||
warn!("Failed to send response to waiting task for ID: {}", resp_for_log.id);
|
||||
} else {
|
||||
warn!("Failed to send response to waiting task, and also failed to get original response for logging.");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
warn!("Received response for unknown request ID or unsolicited message: {:?}", response);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to parse JSON-RPC response: {}. Raw: {}", e, text);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn play(
|
||||
&self,
|
||||
|
4
src/server/Cargo.lock
generated
4
src/server/Cargo.lock
generated
@@ -584,7 +584,7 @@ dependencies = [
|
||||
"once_cell",
|
||||
"rand 0.8.5",
|
||||
"redis",
|
||||
"rhai_client",
|
||||
"rhai_dispatcher",
|
||||
"rustls",
|
||||
"rustls-pemfile",
|
||||
"secp256k1",
|
||||
@@ -1769,7 +1769,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
|
||||
|
||||
[[package]]
|
||||
name = "rhai_client"
|
||||
name = "rhai_dispatcher"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
|
@@ -44,7 +44,7 @@ redis = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
rhai_client = { path = "../../../rhailib/src/client" } # Corrected relative path
|
||||
rhai_dispatcher = { path = "../../../rhailib/src/dispatcher" } # Corrected relative path
|
||||
thiserror = { workspace = true }
|
||||
heromodels = { path = "../../../db/heromodels" }
|
||||
|
||||
|
@@ -37,6 +37,9 @@ struct Args {
|
||||
|
||||
#[clap(long, help = "Enable webhook handling")]
|
||||
webhooks: bool,
|
||||
|
||||
#[clap(long, value_parser, help = "Worker ID for the server")]
|
||||
worker_id: String,
|
||||
}
|
||||
|
||||
#[actix_web::main]
|
||||
@@ -90,17 +93,35 @@ async fn main() -> std::io::Result<()> {
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
let config = ServerConfig {
|
||||
host: args.host,
|
||||
port: args.port,
|
||||
redis_url: args.redis_url,
|
||||
enable_auth: args.auth,
|
||||
enable_tls: args.tls,
|
||||
cert_path: args.cert,
|
||||
key_path: args.key,
|
||||
tls_port: args.tls_port,
|
||||
enable_webhooks: args.webhooks,
|
||||
};
|
||||
let mut builder = ServerConfig::builder(
|
||||
args.host,
|
||||
args.port,
|
||||
args.redis_url,
|
||||
args.worker_id,
|
||||
);
|
||||
|
||||
if args.auth {
|
||||
builder = builder.with_auth();
|
||||
}
|
||||
|
||||
if args.webhooks {
|
||||
builder = builder.with_webhooks();
|
||||
}
|
||||
|
||||
if args.tls {
|
||||
if let (Some(cert), Some(key)) = (args.cert, args.key) {
|
||||
builder = builder.with_tls(cert, key);
|
||||
} else {
|
||||
eprintln!("Error: TLS is enabled but --cert or --key is missing.");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(tls_port) = args.tls_port {
|
||||
builder = builder.with_tls_port(tls_port);
|
||||
}
|
||||
|
||||
let config = builder.build();
|
||||
|
||||
println!("🚀 Starting Circles WebSocket Server");
|
||||
println!("📋 Configuration:");
|
||||
|
@@ -90,7 +90,7 @@ sequenceDiagram
|
||||
participant HS as HttpServer
|
||||
participant WH as Webhook Handler
|
||||
participant WV as Webhook Verifier
|
||||
participant RC as RhaiClient
|
||||
participant RC as RhaiDispatcher
|
||||
participant Redis as Redis
|
||||
|
||||
WS->>+HS: POST /webhooks/{provider}/{circle_pk}
|
||||
@@ -102,7 +102,7 @@ sequenceDiagram
|
||||
|
||||
alt Signature Valid
|
||||
WH->>WH: Parse webhook payload (heromodels types)
|
||||
WH->>+RC: Create RhaiClient with caller_id
|
||||
WH->>+RC: Create RhaiDispatcher with caller_id
|
||||
RC->>+Redis: Execute webhook script
|
||||
Redis-->>-RC: Script result
|
||||
RC-->>-WH: Execution result
|
||||
@@ -128,6 +128,6 @@ sequenceDiagram
|
||||
| **Connection Type** | Persistent, bidirectional | HTTP request/response |
|
||||
| **Authentication** | secp256k1 signature-based | HMAC signature verification |
|
||||
| **State Management** | Stateful sessions via CircleWs actor | Stateless HTTP requests |
|
||||
| **Script Execution** | Direct via authenticated session | Via RhaiClient with provider caller_id |
|
||||
| **Script Execution** | Direct via authenticated session | Via RhaiDispatcher with provider caller_id |
|
||||
| **Use Case** | Interactive client applications | External service notifications |
|
||||
| **Data Types** | JSON-RPC messages | Provider-specific webhook payloads (heromodels) |
|
@@ -20,7 +20,7 @@ graph TB
|
||||
F[Stripe Verifier]
|
||||
G[iDenfy Verifier]
|
||||
H[Script Dispatcher]
|
||||
I[RhaiClientBuilder]
|
||||
I[RhaiDispatcherBuilder]
|
||||
end
|
||||
|
||||
subgraph "Configuration"
|
||||
@@ -93,7 +93,7 @@ sequenceDiagram
|
||||
participant CS as Circle Server
|
||||
participant WV as Webhook Verifier
|
||||
participant SD as Script Dispatcher
|
||||
participant RC as RhaiClient
|
||||
participant RC as RhaiDispatcher
|
||||
participant RW as Rhai Worker
|
||||
|
||||
WS->>CS: POST /webhooks/stripe/{circle_pk}
|
||||
@@ -113,7 +113,7 @@ sequenceDiagram
|
||||
|
||||
alt Verification Success
|
||||
CS->>SD: Dispatch appropriate script
|
||||
SD->>RC: Create RhaiClientBuilder
|
||||
SD->>RC: Create RhaiDispatcherBuilder
|
||||
RC->>RC: Set caller_id="stripe" or "idenfy"
|
||||
RC->>RC: Set recipient_id=circle_pk
|
||||
RC->>RC: Set script="stripe_webhook_received" or "idenfy_webhook_received"
|
||||
@@ -249,7 +249,7 @@ heromodels/src/models/
|
||||
- **Type Organization**: Webhook payload types moved to `heromodels` library for reusability
|
||||
- **Modular Handlers**: Separate handler files for each webhook provider
|
||||
- **Simplified Architecture**: Removed unnecessary dispatcher complexity
|
||||
- **Direct Script Execution**: Handlers directly use `RhaiClient` for script execution
|
||||
- **Direct Script Execution**: Handlers directly use `RhaiDispatcher` for script execution
|
||||
|
||||
### Modified Files
|
||||
- `src/lib.rs` - Add webhook routes and module imports
|
||||
|
@@ -3,7 +3,7 @@ use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer};
|
||||
use actix_web_actors::ws;
|
||||
use log::{debug, info, error}; // Added error for better logging
|
||||
use once_cell::sync::Lazy;
|
||||
use rhai_client::{RhaiClientBuilder, RhaiClientError};
|
||||
use rhai_dispatcher::{RhaiDispatcherBuilder, RhaiDispatcherError};
|
||||
use rustls::pki_types::PrivateKeyDer;
|
||||
use rustls::ServerConfig as RustlsServerConfig;
|
||||
use rustls_pemfile::{certs, pkcs8_private_keys};
|
||||
@@ -106,6 +106,7 @@ struct CircleWs {
|
||||
nonce_store: HashMap<String, NonceResponse>,
|
||||
auth_enabled_on_server: bool,
|
||||
authenticated_pubkey: Option<String>,
|
||||
circle_worker_id: String,
|
||||
}
|
||||
|
||||
impl CircleWs {
|
||||
@@ -114,6 +115,7 @@ impl CircleWs {
|
||||
server_circle_public_key: String,
|
||||
redis_url_for_client: String,
|
||||
auth_enabled_on_server: bool,
|
||||
circle_worker_id: String,
|
||||
) -> Self {
|
||||
Self {
|
||||
server_circle_name,
|
||||
@@ -122,6 +124,7 @@ impl CircleWs {
|
||||
nonce_store: HashMap::new(),
|
||||
auth_enabled_on_server,
|
||||
authenticated_pubkey: None,
|
||||
circle_worker_id,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -284,17 +287,19 @@ impl CircleWs {
|
||||
let redis_url_clone = self.redis_url_for_client.clone();
|
||||
let _rpc_id_clone = client_rpc_id.clone();
|
||||
let public_key = self.authenticated_pubkey.clone();
|
||||
let worker_id_clone = self.circle_worker_id.clone();
|
||||
|
||||
let fut = async move {
|
||||
let caller_id = public_key.unwrap_or_else(|| "anonymous".to_string());
|
||||
match RhaiClientBuilder::new()
|
||||
match RhaiDispatcherBuilder::new()
|
||||
.redis_url(&redis_url_clone)
|
||||
.caller_id(&caller_id)
|
||||
.build() {
|
||||
Ok(rhai_client) => {
|
||||
rhai_client
|
||||
Ok(rhai_dispatcher) => {
|
||||
rhai_dispatcher
|
||||
.new_play_request()
|
||||
.recipient_id(&circle_pk_clone)
|
||||
.context_id(&circle_pk_clone)
|
||||
.worker_id(&worker_id_clone)
|
||||
.script(&script_content)
|
||||
.timeout(TASK_TIMEOUT_DURATION)
|
||||
.await_response()
|
||||
@@ -339,7 +344,7 @@ impl CircleWs {
|
||||
}
|
||||
Err(e) => {
|
||||
let (code, message) = match e {
|
||||
RhaiClientError::Timeout(task_id) => (
|
||||
RhaiDispatcherError::Timeout(task_id) => (
|
||||
-32002,
|
||||
format!(
|
||||
"Timeout waiting for Rhai script (task: {})",
|
||||
@@ -416,6 +421,13 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for CircleWs {
|
||||
Ok(ws::Message::Text(text)) => {
|
||||
debug!("WS Text for {}: {}", self.server_circle_name, text);
|
||||
|
||||
// Handle plaintext ping messages for keep-alive
|
||||
if text.trim() == "ping" {
|
||||
debug!("Received keep-alive ping from {}, responding with pong", self.server_circle_name);
|
||||
ctx.text("pong");
|
||||
return;
|
||||
}
|
||||
|
||||
match serde_json::from_str::<JsonRpcRequest>(&text) {
|
||||
Ok(req) => {
|
||||
let client_rpc_id = req.id.clone().unwrap_or(Value::Null);
|
||||
@@ -490,58 +502,23 @@ pub struct ServerConfig {
|
||||
pub host: String,
|
||||
pub port: u16,
|
||||
pub redis_url: String,
|
||||
pub enable_auth: bool,
|
||||
pub enable_tls: bool,
|
||||
pub cert_path: Option<String>,
|
||||
pub key_path: Option<String>,
|
||||
pub tls_port: Option<u16>,
|
||||
pub enable_auth: bool,
|
||||
pub enable_webhooks: bool,
|
||||
pub circle_worker_id: String,
|
||||
}
|
||||
|
||||
impl ServerConfig {
|
||||
/// Create a new server configuration with TLS disabled
|
||||
pub fn new(
|
||||
pub fn builder(
|
||||
host: String,
|
||||
port: u16,
|
||||
redis_url: String,
|
||||
) -> Self {
|
||||
Self {
|
||||
host,
|
||||
port,
|
||||
redis_url,
|
||||
enable_auth: false,
|
||||
enable_tls: false,
|
||||
cert_path: None,
|
||||
key_path: None,
|
||||
tls_port: None,
|
||||
enable_webhooks: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Enable TLS with certificate and key paths
|
||||
pub fn with_tls(mut self, cert_path: String, key_path: String) -> Self {
|
||||
self.enable_tls = true;
|
||||
self.cert_path = Some(cert_path);
|
||||
self.key_path = Some(key_path);
|
||||
self
|
||||
}
|
||||
|
||||
/// Set a separate port for TLS connections
|
||||
pub fn with_tls_port(mut self, tls_port: u16) -> Self {
|
||||
self.tls_port = Some(tls_port);
|
||||
self
|
||||
}
|
||||
|
||||
/// Enable authentication
|
||||
pub fn with_auth(mut self) -> Self {
|
||||
self.enable_auth = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Enable webhooks
|
||||
pub fn with_webhooks(mut self) -> Self {
|
||||
self.enable_webhooks = true;
|
||||
self
|
||||
worker_id: String,
|
||||
) -> ServerConfigBuilder {
|
||||
ServerConfigBuilder::new(host, port, redis_url, worker_id)
|
||||
}
|
||||
|
||||
/// Get the effective port for TLS connections
|
||||
@@ -551,7 +528,75 @@ impl ServerConfig {
|
||||
|
||||
/// Check if TLS is properly configured
|
||||
pub fn is_tls_configured(&self) -> bool {
|
||||
self.enable_tls && self.cert_path.is_some() && self.key_path.is_some()
|
||||
self.cert_path.is_some() && self.key_path.is_some()
|
||||
}
|
||||
}
|
||||
|
||||
/// ServerConfigBuilder
|
||||
pub struct ServerConfigBuilder {
|
||||
host: String,
|
||||
port: u16,
|
||||
redis_url: String,
|
||||
enable_tls: bool,
|
||||
cert_path: Option<String>,
|
||||
key_path: Option<String>,
|
||||
tls_port: Option<u16>,
|
||||
enable_auth: bool,
|
||||
enable_webhooks: bool,
|
||||
circle_worker_id: String,
|
||||
}
|
||||
|
||||
impl ServerConfigBuilder {
|
||||
pub fn new(host: String, port: u16, redis_url: String, worker_id: String) -> Self {
|
||||
Self {
|
||||
host,
|
||||
port,
|
||||
redis_url,
|
||||
enable_tls: false,
|
||||
cert_path: None,
|
||||
key_path: None,
|
||||
tls_port: None,
|
||||
enable_auth: false,
|
||||
enable_webhooks: false,
|
||||
circle_worker_id: worker_id,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_tls(mut self, cert_path: String, key_path: String) -> Self {
|
||||
self.enable_tls = true;
|
||||
self.cert_path = Some(cert_path);
|
||||
self.key_path = Some(key_path);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_tls_port(mut self, tls_port: u16) -> Self {
|
||||
self.tls_port = Some(tls_port);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_auth(mut self) -> Self {
|
||||
self.enable_auth = true;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_webhooks(mut self) -> Self {
|
||||
self.enable_webhooks = true;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(self) -> ServerConfig {
|
||||
ServerConfig {
|
||||
host: self.host,
|
||||
port: self.port,
|
||||
redis_url: self.redis_url,
|
||||
enable_tls: self.enable_tls,
|
||||
cert_path: self.cert_path,
|
||||
key_path: self.key_path,
|
||||
tls_port: self.tls_port,
|
||||
enable_auth: self.enable_auth,
|
||||
enable_webhooks: self.enable_webhooks,
|
||||
circle_worker_id: self.circle_worker_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -613,23 +658,23 @@ fn load_rustls_config(
|
||||
async fn ws_handler(
|
||||
req: HttpRequest,
|
||||
stream: web::Payload,
|
||||
path: web::Path<String>,
|
||||
server_config: web::Data<ServerConfig>,
|
||||
config: web::Data<ServerConfig>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
let circle_pk = path.into_inner();
|
||||
|
||||
info!(
|
||||
"Incoming WebSocket connection for circle: {} (auth_enabled: {})",
|
||||
circle_pk, server_config.enable_auth
|
||||
);
|
||||
let server_circle_name = req.match_info().get("circle_pk").unwrap_or("unknown").to_string();
|
||||
let server_circle_public_key = server_circle_name.clone(); // Assuming pk is the name for now
|
||||
|
||||
let ws_actor = CircleWs::new_configured(
|
||||
format!("circle-{}", &circle_pk[..8]), // Use first 8 chars as display name
|
||||
circle_pk,
|
||||
server_config.redis_url.clone(),
|
||||
server_config.enable_auth,
|
||||
);
|
||||
ws::start(ws_actor, &req, stream)
|
||||
// Create and start the WebSocket actor
|
||||
ws::start(
|
||||
CircleWs::new_configured(
|
||||
server_circle_name,
|
||||
server_circle_public_key,
|
||||
config.redis_url.clone(),
|
||||
config.enable_auth,
|
||||
config.circle_worker_id.clone(),
|
||||
),
|
||||
&req,
|
||||
stream,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn spawn_circle_server(
|
||||
@@ -657,7 +702,7 @@ pub fn spawn_circle_server(
|
||||
let webhook_app_state = create_webhook_app_state(
|
||||
webhook_config,
|
||||
config.redis_url.clone(),
|
||||
"webhook_system".to_string()
|
||||
config.circle_worker_id.clone(),
|
||||
);
|
||||
Some(web::Data::new(webhook_app_state))
|
||||
}
|
||||
|
@@ -9,18 +9,24 @@ pub struct WebhookAppState {
|
||||
pub config: WebhookConfig,
|
||||
pub redis_url: String,
|
||||
pub caller_id: String,
|
||||
pub worker_id: String,
|
||||
}
|
||||
|
||||
/// Create webhook application state
|
||||
pub fn create_webhook_app_state(
|
||||
config: WebhookConfig,
|
||||
redis_url: String,
|
||||
caller_id: String,
|
||||
worker_id: String,
|
||||
) -> WebhookAppState {
|
||||
// For now, we'll use the worker_id as the caller_id for webhooks.
|
||||
// This can be changed if a more specific caller_id is needed.
|
||||
let caller_id = worker_id.clone();
|
||||
|
||||
WebhookAppState {
|
||||
config,
|
||||
redis_url,
|
||||
caller_id,
|
||||
worker_id,
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -10,7 +10,7 @@ use actix_web::{web, HttpRequest, HttpResponse, Result as ActixResult};
|
||||
use bytes::Bytes;
|
||||
use log::{debug, error, info, warn};
|
||||
use serde_json;
|
||||
use rhai_client::RhaiClientBuilder;
|
||||
use rhai_dispatcher::RhaiDispatcherBuilder;
|
||||
|
||||
/// Execute an iDenfy webhook script
|
||||
async fn execute_idenfy_webhook_script(
|
||||
@@ -24,12 +24,13 @@ async fn execute_idenfy_webhook_script(
|
||||
circle_id, event.client_id, event.status
|
||||
);
|
||||
|
||||
// Create RhaiClient
|
||||
let rhai_client = RhaiClientBuilder::new()
|
||||
// Create RhaiDispatcher
|
||||
let rhai_dispatcher = RhaiDispatcherBuilder::new()
|
||||
.redis_url(redis_url)
|
||||
.caller_id(caller_id)
|
||||
.context_id(circle_id)
|
||||
.build()
|
||||
.map_err(|e| WebhookError::ScriptExecutionError(format!("Failed to create RhaiClient: {}", e)))?;
|
||||
.map_err(|e| WebhookError::ScriptExecutionError(format!("Failed to create RhaiDispatcher: {}", e)))?;
|
||||
|
||||
// Serialize the event as JSON payload
|
||||
let event_json = serde_json::to_string(event)
|
||||
@@ -43,9 +44,8 @@ async fn execute_idenfy_webhook_script(
|
||||
|
||||
debug!("Executing script: {}", script);
|
||||
|
||||
match rhai_client
|
||||
match rhai_dispatcher
|
||||
.new_play_request()
|
||||
.recipient_id(circle_id)
|
||||
.script(&script)
|
||||
.timeout(std::time::Duration::from_secs(30))
|
||||
.await_response()
|
||||
|
@@ -10,13 +10,14 @@ use actix_web::{web, HttpRequest, HttpResponse, Result as ActixResult};
|
||||
use bytes::Bytes;
|
||||
use log::{debug, error, info, warn};
|
||||
use serde_json;
|
||||
use rhai_client::RhaiClientBuilder;
|
||||
use rhai_dispatcher::RhaiDispatcherBuilder;
|
||||
|
||||
/// Execute a Stripe webhook script
|
||||
async fn execute_stripe_webhook_script(
|
||||
redis_url: &str,
|
||||
caller_id: &str,
|
||||
circle_id: &str,
|
||||
worker_id: &str,
|
||||
event: &StripeWebhookEvent,
|
||||
) -> Result<serde_json::Value, WebhookError> {
|
||||
info!(
|
||||
@@ -24,12 +25,14 @@ async fn execute_stripe_webhook_script(
|
||||
circle_id, event.event_type
|
||||
);
|
||||
|
||||
// Create RhaiClient
|
||||
let rhai_client = RhaiClientBuilder::new()
|
||||
// Create RhaiDispatcher
|
||||
let rhai_dispatcher = RhaiDispatcherBuilder::new()
|
||||
.redis_url(redis_url)
|
||||
.caller_id(caller_id)
|
||||
.worker_id(worker_id)
|
||||
.context_id(circle_id)
|
||||
.build()
|
||||
.map_err(|e| WebhookError::ScriptExecutionError(format!("Failed to create RhaiClient: {}", e)))?;
|
||||
.map_err(|e| WebhookError::ScriptExecutionError(format!("Failed to create RhaiDispatcher: {}", e)))?;
|
||||
|
||||
// Serialize the event as JSON payload
|
||||
let event_json = serde_json::to_string(event)
|
||||
@@ -43,9 +46,8 @@ async fn execute_stripe_webhook_script(
|
||||
|
||||
debug!("Executing script: {}", script);
|
||||
|
||||
match rhai_client
|
||||
match rhai_dispatcher
|
||||
.new_play_request()
|
||||
.recipient_id(circle_id)
|
||||
.script(&script)
|
||||
.timeout(std::time::Duration::from_secs(30))
|
||||
.await_response()
|
||||
@@ -161,6 +163,7 @@ pub async fn handle_stripe_webhook(
|
||||
&data.redis_url,
|
||||
&verification_result.caller_id,
|
||||
&circle_pk,
|
||||
&data.worker_id,
|
||||
&stripe_event,
|
||||
).await {
|
||||
Ok(script_result) => {
|
||||
|
Reference in New Issue
Block a user