Compare commits

...

2 Commits

Author SHA1 Message Date
Timur Gordon
70b5b336f1 ws client - server improvements 2025-07-21 20:57:58 +02:00
Timur Gordon
29ff40d1a4 rename rhai client to dispatcher 2025-07-09 23:39:48 +02:00
25 changed files with 968 additions and 402 deletions

35
Cargo.lock generated
View File

@@ -695,7 +695,7 @@ dependencies = [
"rand 0.8.5", "rand 0.8.5",
"redis 0.23.3", "redis 0.23.3",
"redis 0.25.4", "redis 0.25.4",
"rhai_client", "rhai_dispatcher",
"rhailib_engine", "rhailib_engine",
"rhailib_worker", "rhailib_worker",
"rustls", "rustls",
@@ -2834,21 +2834,6 @@ dependencies = [
"thin-vec", "thin-vec",
] ]
[[package]]
name = "rhai_client"
version = "0.1.0"
dependencies = [
"chrono",
"clap",
"env_logger",
"log",
"redis 0.25.4",
"serde",
"serde_json",
"tokio",
"uuid",
]
[[package]] [[package]]
name = "rhai_client_macros" name = "rhai_client_macros"
version = "0.1.0" version = "0.1.0"
@@ -2870,6 +2855,21 @@ dependencies = [
"syn 2.0.103", "syn 2.0.103",
] ]
[[package]]
name = "rhai_dispatcher"
version = "0.1.0"
dependencies = [
"chrono",
"clap",
"env_logger",
"log",
"redis 0.25.4",
"serde",
"serde_json",
"tokio",
"uuid",
]
[[package]] [[package]]
name = "rhailib_dsl" name = "rhailib_dsl"
version = "0.1.0" version = "0.1.0"
@@ -2883,6 +2883,7 @@ dependencies = [
"macros", "macros",
"reqwest", "reqwest",
"rhai", "rhai",
"rhai_dispatcher",
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
@@ -2911,7 +2912,7 @@ dependencies = [
"log", "log",
"redis 0.25.4", "redis 0.25.4",
"rhai", "rhai",
"rhai_client", "rhai_dispatcher",
"rhailib_engine", "rhailib_engine",
"serde", "serde",
"serde_json", "serde_json",

View File

@@ -1,5 +1,5 @@
use clap::Parser; use clap::Parser;
use rhai_client::{RhaiClient, RhaiClientBuilder}; use rhai_dispatcher::{RhaiDispatcher, RhaiDispatcherBuilder};
use log::{error, info}; use log::{error, info};
use std::io::{self, Write}; use std::io::{self, Write};
use std::time::Duration; use std::time::Duration;
@@ -46,8 +46,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure logging based on verbosity level // Configure logging based on verbosity level
let log_config = match args.verbose { let log_config = match args.verbose {
0 => "warn,circles_client=info,rhai_client=info", 0 => "warn,circles_client=info,rhai_dispatcher=info",
1 => "info,circles_client=debug,rhai_client=debug", 1 => "info,circles_client=debug,rhai_dispatcher=debug",
2 => "debug", 2 => "debug",
_ => "trace", _ => "trace",
}; };
@@ -68,7 +68,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!(); info!();
// Create the Rhai client // Create the Rhai client
let client = RhaiClientBuilder::new() let client = RhaiDispatcherBuilder::new()
.caller_id(&args.caller_public_key) .caller_id(&args.caller_public_key)
.redis_url(&args.redis_url) .redis_url(&args.redis_url)
.build()?; .build()?;
@@ -97,7 +97,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
} }
async fn execute_script( async fn execute_script(
client: &RhaiClient, client: &RhaiDispatcher,
worker_key: &str, worker_key: &str,
script: String, script: String,
timeout_secs: u64, timeout_secs: u64,
@@ -134,7 +134,7 @@ async fn execute_script(
} }
async fn run_interactive_mode( async fn run_interactive_mode(
client: &RhaiClient, client: &RhaiDispatcher,
worker_key: &str, worker_key: &str,
timeout_secs: u64, timeout_secs: u64,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {

View File

@@ -160,7 +160,7 @@ Modify `setup_and_spawn_circles` to:
Update `handle_play` to route to correct worker: Update `handle_play` to route to correct worker:
```rust ```rust
// Use circle_public_key from URL path for worker routing // Use circle_public_key from URL path for worker routing
rhai_client rhai_dispatcher
.new_play_request() .new_play_request()
.recipient_id(&self.circle_public_key) // From URL path .recipient_id(&self.circle_public_key) // From URL path
.script_path(&script_content) .script_path(&script_content)

View File

@@ -595,7 +595,7 @@ dependencies = [
"once_cell", "once_cell",
"rand 0.8.5", "rand 0.8.5",
"redis", "redis",
"rhai_client", "rhai_dispatcher",
"rustls", "rustls",
"rustls-pemfile", "rustls-pemfile",
"secp256k1", "secp256k1",
@@ -1849,7 +1849,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]] [[package]]
name = "rhai_client" name = "rhai_dispatcher"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"chrono", "chrono",

View File

@@ -521,7 +521,7 @@ dependencies = [
"env_logger", "env_logger",
"log", "log",
"redis", "redis",
"rhai_client", "rhai_dispatcher",
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",
@@ -1669,7 +1669,7 @@ dependencies = [
] ]
[[package]] [[package]]
name = "rhai_client" name = "rhai_dispatcher"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"chrono", "chrono",
@@ -2514,7 +2514,7 @@ dependencies = [
"log", "log",
"redis", "redis",
"rhai", "rhai",
"rhai_client", "rhai_dispatcher",
"serde", "serde",
"serde_json", "serde_json",
"tokio", "tokio",

View File

@@ -28,7 +28,7 @@ rhai = "1.18.0"
heromodels = { path = "../../../db/heromodels" } heromodels = { path = "../../../db/heromodels" }
rhailib_engine = { path = "../../../rhailib/src/engine" } rhailib_engine = { path = "../../../rhailib/src/engine" }
rhailib_worker = { path = "../../../rhailib/src/worker" } rhailib_worker = { path = "../../../rhailib/src/worker" }
rhai_client = { path = "../../../rhailib/src/client" } rhai_dispatcher = { path = "../../../rhailib/src/dispatcher" }
ourdb = { path = "../../../db/ourdb" } # Added for IdSequence ourdb = { path = "../../../db/ourdb" } # Added for IdSequence
sal-service-manager = { path = "../../../sal/service_manager" } sal-service-manager = { path = "../../../sal/service_manager" }
tokio-tungstenite = "0.23" tokio-tungstenite = "0.23"

View File

@@ -128,7 +128,7 @@ When a circle configuration includes an initialization script:
1. Worker starts and connects to Redis 1. Worker starts and connects to Redis
2. Launcher waits 2 seconds for worker startup 2. Launcher waits 2 seconds for worker startup
3. Launcher sends script content via RhaiClient to worker's queue 3. Launcher sends script content via RhaiDispatcher to worker's queue
4. Worker executes the initialization script 4. Worker executes the initialization script
## Configuration ## Configuration

View File

@@ -42,7 +42,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Wait a moment for the launcher to start services // Wait a moment for the launcher to start services
tokio::time::sleep(Duration::from_secs(5)).await; tokio::time::sleep(Duration::from_secs(5)).await;
let client = rhai_client::RhaiClientBuilder::new() let client = rhai_dispatcher::RhaiDispatcherBuilder::new()
.redis_url(REDIS_URL) .redis_url(REDIS_URL)
.caller_id("test_launcher") .caller_id("test_launcher")
.build()?; .build()?;
@@ -78,7 +78,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.await?; .await?;
println!("Received task details: {:?}", task_details_caller_pk); println!("Received task details: {:?}", task_details_caller_pk);
assert_eq!(task_details_caller_pk.status, "completed"); assert_eq!(task_details_caller_pk.status, "completed");
// The caller should be "launcher" as set in the RhaiClient // The caller should be "launcher" as set in the RhaiDispatcher
println!("✅ SUCCESS: Worker correctly reported CALLER_PUBLIC_KEY for init script."); println!("✅ SUCCESS: Worker correctly reported CALLER_PUBLIC_KEY for init script.");
// Test 3: Simple script execution // Test 3: Simple script execution

View File

@@ -1,5 +1,5 @@
use log::{info, debug}; use log::{info, debug};
use rhai_client::RhaiClientBuilder; use rhai_dispatcher::RhaiDispatcherBuilder;
use sal_service_manager::{ServiceConfig as ServiceManagerConfig, ServiceStatus}; use sal_service_manager::{ServiceConfig as ServiceManagerConfig, ServiceStatus};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@@ -217,8 +217,8 @@ async fn send_init_script_to_worker(
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
println!("Sending initialization script '{}' to worker for circle: {}", init_script, public_key); println!("Sending initialization script '{}' to worker for circle: {}", init_script, public_key);
// Create RhaiClient and send script // Create RhaiDispatcher and send script
let client = RhaiClientBuilder::new() let client = RhaiDispatcherBuilder::new()
.redis_url(redis_url) .redis_url(redis_url)
.caller_id("launcher") .caller_id("launcher")
.build()?; .build()?;

View File

@@ -10,6 +10,7 @@ The `client_ws` is built on the following principles:
- **Modularity**: The crate is divided into logical modules, with a clear separation of concerns between the main client logic, authentication procedures, and cryptographic utilities. - **Modularity**: The crate is divided into logical modules, with a clear separation of concerns between the main client logic, authentication procedures, and cryptographic utilities.
- **Asynchronous Operations**: All network I/O is asynchronous, using `async/await` to ensure the client is non-blocking and efficient. - **Asynchronous Operations**: All network I/O is asynchronous, using `async/await` to ensure the client is non-blocking and efficient.
- **Fluent Configuration**: A builder pattern (`CircleWsClientBuilder`) is used for clear and flexible client construction. - **Fluent Configuration**: A builder pattern (`CircleWsClientBuilder`) is used for clear and flexible client construction.
- **Self-Managing Clients**: Each `CircleWsClient` handles its own lifecycle including connection, authentication, keep-alive, and reconnection logic internally.
## 2. Cross-Platform Implementation ## 2. Cross-Platform Implementation
@@ -29,9 +30,19 @@ The `client_ws` crate is organized into the following key modules:
- **`types.rs`**: Defines the core data structures used in authentication, such as `AuthError` and `AuthCredentials`. - **`types.rs`**: Defines the core data structures used in authentication, such as `AuthError` and `AuthCredentials`.
- **`crypto_utils.rs`**: A self-contained utility module for handling all `secp256k1` cryptographic operations, including key generation, public key derivation, and message signing. - **`crypto_utils.rs`**: A self-contained utility module for handling all `secp256k1` cryptographic operations, including key generation, public key derivation, and message signing.
## 4. Authentication Flow Deep Dive ## 4. Self-Managing Client Architecture
The `authenticate` method in `CircleWsClient` orchestrates the entire authentication process over the WebSocket connection. The sequence diagram below illustrates the internal interactions within the client during this flow. Each `CircleWsClient` is designed to be completely self-managing, handling its entire lifecycle internally. This includes:
- **Connection Management**: Establishing and maintaining WebSocket connections
- **Authentication**: Automatic secp256k1 authentication flow when private keys are provided
- **Keep-Alive**: Periodic health checks to ensure connection stability
- **Reconnection**: Automatic reconnection with exponential backoff on connection failures
- **Connection Status Tracking**: Internal state management for connection health
### Connection Flow
The `connect()` method orchestrates the complete connection and authentication process:
```mermaid ```mermaid
sequenceDiagram sequenceDiagram
@@ -46,27 +57,38 @@ sequenceDiagram
User->>+Builder: build() User->>+Builder: build()
Builder-->>-User: client Builder-->>-User: client
User->>+Client: authenticate() User->>+Client: connect()
Client->>Client: Check for private_key
Client->>+CryptoUtils: derive_public_key(private_key) Note over Client: Self-managing connection process
CryptoUtils-->>-Client: public_key Client->>Client: Establish WebSocket connection
Client->>Client: Start keep-alive loop
Client->>Client: Start reconnection handler
alt Has Private Key
Client->>Client: Check for private_key
Client->>+CryptoUtils: derive_public_key(private_key)
CryptoUtils-->>-Client: public_key
Note over Client: Request nonce via WebSocket Note over Client: Request nonce via WebSocket
Client->>+WsActor: JSON-RPC "fetch_nonce" (pubkey) Client->>+WsActor: JSON-RPC "fetch_nonce" (pubkey)
WsActor-->>-Client: JSON-RPC Response (nonce) WsActor-->>-Client: JSON-RPC Response (nonce)
Client->>+CryptoUtils: sign_message(private_key, nonce) Client->>+CryptoUtils: sign_message(private_key, nonce)
CryptoUtils-->>-Client: signature CryptoUtils-->>-Client: signature
Note over Client: Send credentials via WebSocket Note over Client: Send credentials via WebSocket
Client->>+WsActor: JSON-RPC "authenticate" (pubkey, signature) Client->>+WsActor: JSON-RPC "authenticate" (pubkey, signature)
WsActor-->>-Client: JSON-RPC Response (authenticated: true/false) WsActor-->>-Client: JSON-RPC Response (authenticated: true/false)
alt Authentication Successful
Client-->>-User: Ok(true)
else Authentication Fails
Client-->>-User: Ok(false) or Err(...)
end end
Client-->>-User: Connection established and authenticated
``` ```
This architecture ensures that the cryptographic operations are isolated, the platform-specific code is cleanly separated, and the main client struct provides a simple and consistent API to the end-user. ### Self-Management Features
- **Automatic Keep-Alive**: Each client runs its own keep-alive loop to detect connection issues
- **Transparent Reconnection**: Failed connections are automatically retried with exponential backoff
- **Status Monitoring**: Connection status is tracked internally and can be queried via `is_connected()`
- **Resource Cleanup**: Proper cleanup of resources when clients are dropped
This architecture ensures that the cryptographic operations are isolated, the platform-specific code is cleanly separated, and each client is completely autonomous in managing its connection lifecycle.

View File

@@ -595,7 +595,7 @@ dependencies = [
"log", "log",
"once_cell", "once_cell",
"redis", "redis",
"rhai_client", "rhai_dispatcher",
"rustls", "rustls",
"rustls-pemfile", "rustls-pemfile",
"serde", "serde",
@@ -1765,7 +1765,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]] [[package]]
name = "rhai_client" name = "rhai_dispatcher"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"chrono", "chrono",

View File

@@ -21,6 +21,7 @@ http = "0.2"
# Authentication dependencies # Authentication dependencies
hex = { workspace = true } hex = { workspace = true }
rand = { workspace = true } rand = { workspace = true }
getrandom = { version = "0.2", features = ["js"] }
# Optional crypto dependencies (enabled by default) # Optional crypto dependencies (enabled by default)
secp256k1 = { workspace = true, optional = true } secp256k1 = { workspace = true, optional = true }
@@ -32,6 +33,7 @@ circle_ws_lib = { path = "../server", optional = true }
# WASM-specific dependencies # WASM-specific dependencies
[target.'cfg(target_arch = "wasm32")'.dependencies] [target.'cfg(target_arch = "wasm32")'.dependencies]
gloo-net = { version = "0.4.0", features = ["websocket"] } gloo-net = { version = "0.4.0", features = ["websocket"] }
gloo-timers = { version = "0.3.0", features = ["futures"] }
wasm-bindgen-futures = "0.4" wasm-bindgen-futures = "0.4"
gloo-console = "0.3.0" gloo-console = "0.3.0"
wasm-bindgen = "0.2" wasm-bindgen = "0.2"

View File

@@ -1,14 +1,26 @@
# Circle WebSocket Client # Circle WebSocket Client
A Rust library for connecting to Circle WebSocket servers with authentication support. A Rust library for connecting to Circle WebSocket servers with authentication support and self-managing connection lifecycle.
## Features ## Features
- Cross-platform WebSocket client (native and WASM) - **Cross-platform WebSocket client** (native and WASM)
- secp256k1 cryptographic authentication - **secp256k1 cryptographic authentication** with automatic challenge-response flow
- JSON-RPC 2.0 protocol support - **JSON-RPC 2.0 protocol support** for server communication
- Async/await interface with Tokio - **Self-managing connections** with automatic keep-alive and reconnection
- Built on tokio-tungstenite for reliable WebSocket connections - **Async/await interface** with modern Rust async patterns
- **Built on tokio-tungstenite** for reliable WebSocket connections (native)
- **Built on gloo-net** for WASM browser compatibility
## Architecture
Each `CircleWsClient` is completely self-managing:
- **Automatic Connection Management**: Handles WebSocket connection establishment
- **Built-in Authentication**: Seamless secp256k1 authentication when private keys are provided
- **Keep-Alive Monitoring**: Periodic health checks to detect connection issues
- **Transparent Reconnection**: Automatic reconnection with exponential backoff on failures
- **Connection Status Tracking**: Real-time connection state monitoring
## Usage ## Usage
@@ -19,7 +31,7 @@ Add this to your `Cargo.toml`:
circle_client_ws = { path = "../client_ws" } circle_client_ws = { path = "../client_ws" }
``` ```
### Basic Example ### Basic Example (Self-Managing Connection)
```rust ```rust
use circle_client_ws::CircleWsClientBuilder; use circle_client_ws::CircleWsClientBuilder;
@@ -28,27 +40,73 @@ use circle_client_ws::CircleWsClientBuilder;
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create client with private key // Create client with private key
let private_key = "your_private_key_hex"; let private_key = "your_private_key_hex";
let client = CircleWsClientBuilder::new() let mut client = CircleWsClientBuilder::new("ws://localhost:8080".to_string())
.with_private_key(private_key)? .with_keypair(private_key.to_string())
.build(); .build();
// Connect and authenticate // Connect - this handles authentication, keep-alive, and reconnection automatically
client.connect("ws://localhost:8080").await?; client.connect().await?;
// Use the authenticated client... // Check connection status
println!("Connected: {}", client.is_connected());
// Execute scripts on the server
let result = client.play("\"Hello from client!\"".to_string()).await?;
println!("Script result: {:?}", result);
// Client automatically maintains connection in the background
// No manual keep-alive or reconnection logic needed
Ok(()) Ok(())
} }
``` ```
### Authentication Flow ### Self-Managing Features
The client automatically handles the secp256k1 authentication flow: The client automatically handles:
1. Connects to WebSocket server
2. Receives authentication challenge 1. **Connection Establishment**: WebSocket connection to the server
3. Signs challenge with private key 2. **Authentication Flow**: secp256k1 challenge-response authentication
4. Sends signed response 3. **Keep-Alive Monitoring**: Periodic health checks to ensure connection stability
5. Receives authentication confirmation 4. **Automatic Reconnection**: Transparent reconnection on connection failures
5. **Resource Management**: Proper cleanup when the client is dropped
### Connection Status Monitoring
```rust
// Check if the client is currently connected
if client.is_connected() {
println!("Client is connected and healthy");
} else {
println!("Client is disconnected or reconnecting");
}
// Get detailed connection status
let status = client.get_connection_status();
println!("Connection status: {}", status);
```
### WASM Usage
For WASM applications, the client works seamlessly in browsers:
```rust
use circle_client_ws::CircleWsClientBuilder;
use wasm_bindgen_futures::spawn_local;
// In a WASM context
spawn_local(async move {
let mut client = CircleWsClientBuilder::new("ws://localhost:8080".to_string())
.build();
// Self-managing connection works the same in WASM
if let Ok(_) = client.connect().await {
// Client automatically handles keep-alive and reconnection
let result = client.play("\"WASM client connected!\"".to_string()).await;
// Handle result...
}
});
```
## Binary Tool ## Binary Tool
@@ -56,12 +114,28 @@ A command-line binary is also available for interactive use and script execution
## Platform Support ## Platform Support
- **Native**: Full support on all Rust-supported platforms - **Native**: Full support on all Rust-supported platforms with tokio-tungstenite
- **WASM**: Browser support with web-sys bindings - **WASM**: Browser support with gloo-net WebSocket bindings
## Dependencies ## Dependencies
- `tokio-tungstenite`: WebSocket implementation ### Core Dependencies
- `secp256k1`: Cryptographic operations - `serde`: JSON serialization and deserialization
- `serde`: JSON serialization - `uuid`: Request ID generation for JSON-RPC
- `uuid`: Request ID generation - `futures-util`: Async utilities for WebSocket handling
- `thiserror`: Error handling and propagation
### Platform-Specific Dependencies
#### Native (tokio-based)
- `tokio-tungstenite`: Robust WebSocket implementation
- `tokio`: Async runtime for connection management
#### WASM (browser-based)
- `gloo-net`: WebSocket bindings for browsers
- `gloo-timers`: Timer utilities for keep-alive functionality
- `wasm-bindgen-futures`: Async support in WASM
### Cryptographic Dependencies (optional)
- `secp256k1`: Elliptic curve cryptography for authentication
- `sha3`: Hashing for cryptographic operations

View File

@@ -1,12 +1,12 @@
# Circles WebSocket Client Binary # Circles WebSocket Client
A command-line WebSocket client for connecting to Circles servers with authentication support. A WebSocket client for connecting to Circles servers with authentication support. Available in both CLI and WebAssembly (WASM) versions.
## Binary: `circles_client` ## CLI Usage
### Installation ### Installation
Build the binary: Build the CLI binary:
```bash ```bash
cargo build --bin circles_client --release cargo build --bin circles_client --release
``` ```
@@ -49,6 +49,53 @@ circles_client -vv ws://localhost:8080
- **Verbosity Control**: Use `-v` flags to increase logging detail - **Verbosity Control**: Use `-v` flags to increase logging detail
- **Cross-platform**: Works on all platforms supported by Rust and tokio-tungstenite - **Cross-platform**: Works on all platforms supported by Rust and tokio-tungstenite
## WebAssembly (WASM) Usage
### Build and Serve
1. Install Trunk:
```bash
cargo install trunk
```
2. Build the WASM version:
```bash
trunk build --release
```
3. Serve the application:
```bash
trunk serve
```
The application will be available at `http://localhost:8080`
### Usage in Browser
1. Open the served page in your browser
2. Enter the WebSocket server URL
3. Choose either:
- Execute a Rhai script directly
- Enter interactive mode (type 'exit' or 'quit' to leave)
### Features
- **Browser Integration**: Uses browser's WebSocket implementation
- **Interactive Mode**: Browser-based input/output using prompts
- **Error Handling**: Browser console logging
- **Cross-browser**: Works in all modern browsers supporting WebAssembly
## Common Features
Both versions share the same core functionality:
- **WebSocket Connection**: Connects to Circles WebSocket server
- **Authentication**: Handles secp256k1 authentication
- **Script Execution**: Executes Rhai scripts
- **Interactive Mode**: Provides REPL-like interface
- **Error Handling**: Comprehensive error reporting
- **Logging**: Detailed logging at different verbosity levels
### Interactive Mode ### Interactive Mode
When run without `-s` or `-f` flags, the client enters interactive mode where you can: When run without `-s` or `-f` flags, the client enters interactive mode where you can:

View File

@@ -1,12 +1,30 @@
#![cfg_attr(target_arch = "wasm32", no_main)]
use circle_client_ws::CircleWsClientBuilder; use circle_client_ws::CircleWsClientBuilder;
use clap::{Arg, ArgAction, Command}; #[cfg(not(target_arch = "wasm32"))]
use dotenv::dotenv;
use env_logger;
use log::{error, info};
use std::env; use std::env;
use std::io::{self, Write}; #[cfg(not(target_arch = "wasm32"))]
use std::path::Path; use std::path::Path;
#[cfg(not(target_arch = "wasm32"))]
use std::io::{self, Write};
#[cfg(target_arch = "wasm32")]
use wasm_bindgen::prelude::*;
#[cfg(target_arch = "wasm32")]
use web_sys::{console, window};
#[cfg(target_arch = "wasm32")]
use wasm_bindgen_futures::spawn_local;
#[cfg(not(target_arch = "wasm32"))]
use clap::{Arg, ArgAction, Command};
#[cfg(not(target_arch = "wasm32"))]
use dotenv::dotenv;
#[cfg(not(target_arch = "wasm32"))]
use env_logger;
#[cfg(not(target_arch = "wasm32"))]
use tokio; use tokio;
#[cfg(not(target_arch = "wasm32"))]
use log::{error, info};
#[derive(Debug)] #[derive(Debug)]
struct Args { struct Args {
@@ -17,6 +35,7 @@ struct Args {
no_timestamp: bool, no_timestamp: bool,
} }
#[cfg(not(target_arch = "wasm32"))]
fn parse_args() -> Args { fn parse_args() -> Args {
let matches = Command::new("circles_client") let matches = Command::new("circles_client")
.version("0.1.0") .version("0.1.0")
@@ -67,6 +86,7 @@ fn parse_args() -> Args {
} }
} }
#[cfg(not(target_arch = "wasm32"))]
fn setup_logging(verbose: u8, no_timestamp: bool) { fn setup_logging(verbose: u8, no_timestamp: bool) {
let log_level = match verbose { let log_level = match verbose {
0 => "warn,circle_client_ws=info", 0 => "warn,circle_client_ws=info",
@@ -87,6 +107,7 @@ fn setup_logging(verbose: u8, no_timestamp: bool) {
} }
} }
#[cfg(not(target_arch = "wasm32"))]
fn load_private_key() -> Result<String, Box<dyn std::error::Error>> { fn load_private_key() -> Result<String, Box<dyn std::error::Error>> {
// Try to load from .env file first // Try to load from .env file first
if let Ok(_) = dotenv() { if let Ok(_) = dotenv() {
@@ -107,42 +128,82 @@ fn load_private_key() -> Result<String, Box<dyn std::error::Error>> {
Err("PRIVATE_KEY not found in environment or .env files".into()) Err("PRIVATE_KEY not found in environment or .env files".into())
} }
#[cfg(target_arch = "wasm32")]
async fn run_interactive_mode(client: circle_client_ws::CircleWsClient) -> Result<(), Box<dyn std::error::Error>> { async fn run_interactive_mode(client: circle_client_ws::CircleWsClient) -> Result<(), Box<dyn std::error::Error>> {
info!("Entering interactive mode. Type 'exit' or 'quit' to leave."); console::log_1(&"Entering interactive mode. Type 'exit' or 'quit' to leave.".into());
println!("🔄 Interactive mode - Enter Rhai scripts (type 'exit' or 'quit' to leave):"); console::log_1(&"🔄 Interactive mode - Enter Rhai scripts (type 'exit' or 'quit' to leave):\n".into());
loop { // In wasm32, we need to use browser's console for input/output
print!("rhai> "); let window = window().expect("Window not available");
io::stdout().flush()?; let input = window.prompt_with_message("Enter Rhai script (or 'exit' to quit):")
.map_err(|e| format!("Failed to get input: {:#?}", e))? // Use debug formatting
.unwrap_or_default();
let mut input = String::new(); // Handle empty or exit cases
io::stdin().read_line(&mut input)?; if input == "exit" || input == "quit" {
console::log_1(&"👋 Goodbye!".into());
let script = input.trim(); return Ok(());
}
if script.is_empty() {
continue; // Execute the script
match client.play(input).await {
Ok(result) => {
console::log_1(&format!("📤 Result: {}", result.output).into());
} }
Err(e) => {
if script == "exit" || script == "quit" { console::log_1(&format!("❌ Script execution failed: {}", e).into());
println!("👋 Goodbye!");
break;
}
match client.play(script.to_string()).await {
Ok(result) => {
println!("📤 Result: {}", result.output);
}
Err(e) => {
error!("❌ Script execution failed: {}", e);
println!("❌ Error: {}", e);
}
} }
} }
Ok(()) Ok(())
} }
#[cfg(target_arch = "wasm32")]
async fn execute_script(client: circle_client_ws::CircleWsClient, script: String) -> Result<(), Box<dyn std::error::Error>> {
console::log_1(&format!("Executing script: {}", script).into());
match client.play(script).await {
Ok(result) => {
console::log_1(&result.output.into());
Ok(())
}
Err(e) => {
console::log_1(&format!("Script execution failed: {}", e).into());
Err(e.into())
}
}
}
#[cfg(target_arch = "wasm32")]
pub async fn start_client(url: &str, script: Option<String>) -> Result<(), Box<dyn std::error::Error>> {
// Build client
let mut client = CircleWsClientBuilder::new(url.to_string())
.build();
// Connect to WebSocket server
console::log_1(&"🔌 Connecting to WebSocket server...".into());
if let Err(e) = client.connect().await {
console::log_1(&format!("❌ Failed to connect: {}", e).into());
return Err(e.into());
}
console::log_1(&"✅ Connected successfully".into());
// Authenticate with server
if let Err(e) = client.authenticate().await {
console::log_1(&format!("❌ Authentication failed: {}", e).into());
return Err(e.into());
}
console::log_1(&"✅ Authentication successful".into());
// Handle script execution
if let Some(script) = script {
execute_script(client, script).await
} else {
run_interactive_mode(client).await
}
}
#[cfg(not(target_arch = "wasm32"))]
async fn execute_script(client: circle_client_ws::CircleWsClient, script: String) -> Result<(), Box<dyn std::error::Error>> { async fn execute_script(client: circle_client_ws::CircleWsClient, script: String) -> Result<(), Box<dyn std::error::Error>> {
info!("Executing script: {}", script); info!("Executing script: {}", script);
@@ -158,11 +219,43 @@ async fn execute_script(client: circle_client_ws::CircleWsClient, script: String
} }
} }
#[cfg(not(target_arch = "wasm32"))]
async fn load_script_from_file(path: &str) -> Result<String, Box<dyn std::error::Error>> { async fn load_script_from_file(path: &str) -> Result<String, Box<dyn std::error::Error>> {
let script = tokio::fs::read_to_string(path).await?; let script = tokio::fs::read_to_string(path).await?;
Ok(script) Ok(script)
} }
#[cfg(not(target_arch = "wasm32"))]
async fn run_interactive_mode(client: circle_client_ws::CircleWsClient) -> Result<(), Box<dyn std::error::Error>> {
println!("\n🔄 Interactive mode - Enter Rhai scripts (type 'exit' or 'quit' to leave):\n");
loop {
print!("Enter Rhai script (or 'exit' to quit): ");
io::stdout().flush()?;
let mut input = String::new();
io::stdin().read_line(&mut input)?;
let input = input.trim().to_string();
if input == "exit" || input == "quit" {
println!("\n👋 Goodbye!");
return Ok(());
}
match client.play(input).await {
Ok(result) => {
println!("\n📤 Result: {}", result.output);
}
Err(e) => {
error!("❌ Script execution failed: {}", e);
println!("\n❌ Script execution failed: {}", e);
}
}
println!();
}
}
#[cfg(not(target_arch = "wasm32"))]
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = parse_args(); let args = parse_args();

View File

@@ -148,6 +148,7 @@ impl CircleWsClientBuilder {
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
task_handle: None, task_handle: None,
private_key: self.private_key, private_key: self.private_key,
is_connected: Arc::new(Mutex::new(false)),
} }
} }
} }
@@ -158,24 +159,61 @@ pub struct CircleWsClient {
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
task_handle: Option<tokio::task::JoinHandle<()>>, task_handle: Option<tokio::task::JoinHandle<()>>,
private_key: Option<String>, private_key: Option<String>,
is_connected: Arc<Mutex<bool>>,
}
impl CircleWsClient {
/// Get the connection status
pub fn get_connection_status(&self) -> String {
if *self.is_connected.lock().unwrap() {
"Connected".to_string()
} else {
"Disconnected".to_string()
}
}
/// Check if the client is connected
pub fn is_connected(&self) -> bool {
*self.is_connected.lock().unwrap()
}
} }
impl CircleWsClient { impl CircleWsClient {
pub async fn authenticate(&mut self) -> Result<bool, CircleWsClientError> { pub async fn authenticate(&mut self) -> Result<bool, CircleWsClientError> {
info!("🔐 [{}] Starting authentication process...", self.ws_url);
let private_key = self let private_key = self
.private_key .private_key
.as_ref() .as_ref()
.ok_or(CircleWsClientError::AuthNoKeyPair)?; .ok_or(CircleWsClientError::AuthNoKeyPair)?;
info!("🔑 [{}] Deriving public key from private key...", self.ws_url);
let public_key = auth::derive_public_key(private_key)?; let public_key = auth::derive_public_key(private_key)?;
info!("✅ [{}] Public key derived: {}...", self.ws_url, &public_key[..8]);
info!("🎫 [{}] Fetching authentication nonce...", self.ws_url);
let nonce = self.fetch_nonce(&public_key).await?; let nonce = self.fetch_nonce(&public_key).await?;
let signature = auth::sign_message(private_key, &nonce)?; info!("✅ [{}] Nonce received: {}...", self.ws_url, &nonce[..8]);
self.authenticate_with_signature(&public_key, &signature) info!("✍️ [{}] Signing nonce with private key...", self.ws_url);
.await let signature = auth::sign_message(private_key, &nonce)?;
info!("✅ [{}] Signature created: {}...", self.ws_url, &signature[..8]);
info!("🔒 [{}] Submitting authentication credentials...", self.ws_url);
let result = self.authenticate_with_signature(&public_key, &signature).await?;
if result {
info!("🎉 [{}] Authentication successful!", self.ws_url);
} else {
error!("❌ [{}] Authentication failed - server rejected credentials", self.ws_url);
}
Ok(result)
} }
async fn fetch_nonce(&self, pubkey: &str) -> Result<String, CircleWsClientError> { async fn fetch_nonce(&self, pubkey: &str) -> Result<String, CircleWsClientError> {
info!("📡 [{}] Sending fetch_nonce request for pubkey: {}...", self.ws_url, &pubkey[..8]);
let params = FetchNonceParams { let params = FetchNonceParams {
pubkey: pubkey.to_string(), pubkey: pubkey.to_string(),
}; };
@@ -183,6 +221,7 @@ impl CircleWsClient {
let res = self.send_request(req).await?; let res = self.send_request(req).await?;
if let Some(err) = res.error { if let Some(err) = res.error {
error!("❌ [{}] fetch_nonce failed: {} (code: {})", self.ws_url, err.message, err.code);
return Err(CircleWsClientError::JsonRpcError { return Err(CircleWsClientError::JsonRpcError {
code: err.code, code: err.code,
message: err.message, message: err.message,
@@ -191,6 +230,7 @@ impl CircleWsClient {
} }
let nonce_res: FetchNonceResponse = serde_json::from_value(res.result.unwrap_or_default())?; let nonce_res: FetchNonceResponse = serde_json::from_value(res.result.unwrap_or_default())?;
info!("✅ [{}] fetch_nonce successful, nonce length: {}", self.ws_url, nonce_res.nonce.len());
Ok(nonce_res.nonce) Ok(nonce_res.nonce)
} }
@@ -199,6 +239,8 @@ impl CircleWsClient {
pubkey: &str, pubkey: &str,
signature: &str, signature: &str,
) -> Result<bool, CircleWsClientError> { ) -> Result<bool, CircleWsClientError> {
info!("📡 [{}] Sending authenticate request with signature...", self.ws_url);
let params = AuthCredentialsParams { let params = AuthCredentialsParams {
pubkey: pubkey.to_string(), pubkey: pubkey.to_string(),
signature: signature.to_string(), signature: signature.to_string(),
@@ -207,6 +249,7 @@ impl CircleWsClient {
let res = self.send_request(req).await?; let res = self.send_request(req).await?;
if let Some(err) = res.error { if let Some(err) = res.error {
error!("❌ [{}] authenticate failed: {} (code: {})", self.ws_url, err.message, err.code);
return Err(CircleWsClientError::JsonRpcError { return Err(CircleWsClientError::JsonRpcError {
code: err.code, code: err.code,
message: err.message, message: err.message,
@@ -214,10 +257,18 @@ impl CircleWsClient {
}); });
} }
Ok(res let authenticated = res
.result .result
.and_then(|v| v.get("authenticated").and_then(|v| v.as_bool())) .and_then(|v| v.get("authenticated").and_then(|v| v.as_bool()))
.unwrap_or(false)) .unwrap_or(false);
if authenticated {
info!("✅ [{}] authenticate request successful - server confirmed authentication", self.ws_url);
} else {
error!("❌ [{}] authenticate request failed - server returned false", self.ws_url);
}
Ok(authenticated)
} }
fn create_request<T: Serialize>( fn create_request<T: Serialize>(
@@ -275,17 +326,19 @@ impl CircleWsClient {
pub async fn connect(&mut self) -> Result<(), CircleWsClientError> { pub async fn connect(&mut self) -> Result<(), CircleWsClientError> {
if self.internal_tx.is_some() { if self.internal_tx.is_some() {
info!("Client already connected or connecting."); info!("🔄 [{}] Client already connected or connecting", self.ws_url);
return Ok(()); return Ok(());
} }
info!("🚀 [{}] Starting self-managed WebSocket connection with keep-alive and reconnection...", self.ws_url);
let (internal_tx, internal_rx) = mpsc::channel::<InternalWsMessage>(32); let (internal_tx, internal_rx) = mpsc::channel::<InternalWsMessage>(32);
self.internal_tx = Some(internal_tx); self.internal_tx = Some(internal_tx);
// Use the URL as provided - support both ws:// and wss:// // Clone necessary data for the task
let connection_url = self.ws_url.clone(); let connection_url = self.ws_url.clone();
let is_secure = connection_url.starts_with("wss://"); let private_key = self.private_key.clone();
info!("🔗 Connecting to WebSocket: {} ({})", connection_url, if is_secure { "WSS/TLS" } else { "WS/Plain" }); let is_connected = self.is_connected.clone();
info!("🔗 [{}] Will handle connection, authentication, keep-alive, and reconnection internally", connection_url);
// Pending requests: map request_id to a oneshot sender for the response // Pending requests: map request_id to a oneshot sender for the response
let pending_requests: Arc< let pending_requests: Arc<
@@ -301,191 +354,135 @@ impl CircleWsClient {
let log_url = connection_url.clone(); let log_url = connection_url.clone();
let task = async move { let task = async move {
#[cfg(target_arch = "wasm32")] // Main connection loop with reconnection logic
let ws_result = WebSocket::open(&connection_url); loop {
info!("🔄 [{}] Starting connection attempt...", log_url);
// Reset connection status
*is_connected.lock().unwrap() = false;
// Clone connection_url for this iteration to avoid move issues
let connection_url_clone = connection_url.clone();
// Establish WebSocket connection
#[cfg(target_arch = "wasm32")]
let ws_result = WebSocket::open(&connection_url_clone);
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
let connect_attempt = async { let connect_attempt = async {
// Check if this is a secure WebSocket connection // Check if this is a secure WebSocket connection
if connection_url.starts_with("wss://") { if connection_url_clone.starts_with("wss://") {
// For WSS connections, use a custom TLS connector that accepts self-signed certificates // For WSS connections, use a custom TLS connector that accepts self-signed certificates
// This is for development/demo purposes only // This is for development/demo purposes only
use tokio_tungstenite::tungstenite::client::IntoClientRequest; use tokio_tungstenite::tungstenite::client::IntoClientRequest;
let request = connection_url.into_client_request() let request = connection_url_clone.as_str().into_client_request()
.map_err(|e| CircleWsClientError::ConnectionError(format!("Invalid URL: {}", e)))?; .map_err(|e| CircleWsClientError::ConnectionError(format!("Invalid URL: {}", e)))?;
// Create a native-tls connector that accepts invalid certificates (for development) // Create a native-tls connector that accepts invalid certificates (for development)
let tls_connector = native_tls::TlsConnector::builder() let tls_connector = native_tls::TlsConnector::builder()
.danger_accept_invalid_certs(true) .danger_accept_invalid_certs(true)
.danger_accept_invalid_hostnames(true) .danger_accept_invalid_hostnames(true)
.build() .build()
.map_err(|e| CircleWsClientError::ConnectionError(format!("TLS connector creation failed: {}", e)))?; .map_err(|e| CircleWsClientError::ConnectionError(format!("TLS connector creation failed: {}", e)))?;
let connector = Connector::NativeTls(tls_connector); let connector = Connector::NativeTls(tls_connector);
warn!("⚠️ DEVELOPMENT MODE: Accepting self-signed certificates (NOT for production!)"); warn!("⚠️ DEVELOPMENT MODE: Accepting self-signed certificates (NOT for production!)");
connect_async_tls_with_config(request, None, false, Some(connector)) connect_async_tls_with_config(request, None, false, Some(connector))
.await .await
.map_err(|e| CircleWsClientError::ConnectionError(format!("WSS connection failed: {}", e))) .map_err(|e| CircleWsClientError::ConnectionError(format!("WSS connection failed: {}", e)))
} else { } else {
// For regular WS connections, use the standard method // For regular WS connections, use the standard method
connect_async(&connection_url) connect_async(&connection_url_clone)
.await .await
.map_err(|e| CircleWsClientError::ConnectionError(format!("WS connection failed: {}", e))) .map_err(|e| CircleWsClientError::ConnectionError(format!("WS connection failed: {}", e)))
} }
}; };
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
let ws_result = connect_attempt.await; let ws_result = connect_attempt.await;
match ws_result {
Ok(ws_conn_maybe_response) => {
#[cfg(target_arch = "wasm32")]
let ws_conn = ws_conn_maybe_response;
#[cfg(not(target_arch = "wasm32"))]
let (ws_conn, _) = ws_conn_maybe_response;
match ws_result { // For WASM, WebSocket::open() always succeeds even if server is down
Ok(ws_conn_maybe_response) => { // We'll start as "connecting" and detect failures through timeouts
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
let ws_conn = ws_conn_maybe_response; info!("🔄 [{}] WebSocket object created, testing actual connectivity...", log_url);
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
let (ws_conn, _) = ws_conn_maybe_response; {
info!("✅ [{}] WebSocket connection established successfully", log_url);
info!("Successfully connected to WebSocket: {}", log_url); *is_connected.lock().unwrap() = true;
let (mut ws_tx, mut ws_rx) = ws_conn.split(); }
let mut internal_rx_fused = internal_rx.fuse();
// Handle authentication if private key is provided
loop { let auth_success = if let Some(ref _pk) = private_key {
futures_util::select! { info!("🔐 [{}] Authentication will be handled by separate authenticate() call", log_url);
// Handle messages from the client's public methods (e.g., play) true // For now, assume auth will be handled separately
internal_msg = internal_rx_fused.next().fuse() => { } else {
match internal_msg { info!(" [{}] No private key provided, skipping authentication", log_url);
Some(InternalWsMessage::SendJsonRpc(req, response_sender)) => { true
let req_id = req.id.clone(); };
match serde_json::to_string(&req) {
Ok(req_str) => { if auth_success {
debug!("Sending JSON-RPC request (ID: {}): {}", req_id, req_str); // Start the main message handling loop with keep-alive
let disconnect_reason = Self::handle_connection_with_keepalive(
#[cfg(target_arch = "wasm32")] ws_conn,
let send_res = ws_tx.send(GlooWsMessage::Text(req_str)).await; internal_rx,
#[cfg(not(target_arch = "wasm32"))] &task_pending_requests,
let send_res = ws_tx.send(TungsteniteWsMessage::Text(req_str)).await; &log_url,
&is_connected
if let Err(e) = send_res { ).await;
error!("WebSocket send error for request ID {}: {:?}", req_id, e);
let _ = response_sender.send(Err(CircleWsClientError::SendError(e.to_string()))); info!("🔌 [{}] Connection ended: {}", log_url, disconnect_reason);
} else {
// Store the sender to await the response // Check if this was a manual disconnect
task_pending_requests.lock().unwrap().insert(req_id, response_sender); if disconnect_reason == "Manual close requested" {
} break; // Don't reconnect on manual close
}
Err(e) => {
error!("Failed to serialize request ID {}: {}", req_id, e);
let _ = response_sender.send(Err(CircleWsClientError::JsonError(e)));
}
}
}
Some(InternalWsMessage::Close) => {
info!("Close message received internally, closing WebSocket.");
let _ = ws_tx.close().await;
break;
}
None => { // internal_rx closed, meaning client was dropped
info!("Internal MPSC channel closed, WebSocket task shutting down.");
let _ = ws_tx.close().await;
break;
}
}
},
// Handle messages received from the WebSocket server
ws_msg_res = ws_rx.next().fuse() => {
match ws_msg_res {
Some(Ok(msg)) => {
#[cfg(target_arch = "wasm32")]
match msg {
GlooWsMessage::Text(text) => {
debug!("Received WebSocket message: {}", text);
// ... (parse logic as before)
match serde_json::from_str::<JsonRpcResponseClient>(&text) {
Ok(response) => {
if let Some(sender) = task_pending_requests.lock().unwrap().remove(&response.id) {
if let Err(failed_send_val) = sender.send(Ok(response)) {
if let Ok(resp_for_log) = failed_send_val { warn!("Failed to send response to waiting task for ID: {}", resp_for_log.id); }
else { warn!("Failed to send response to waiting task, and also failed to get original response for logging.");}
}
} else { warn!("Received response for unknown request ID or unsolicited message: {:?}", response); }
}
Err(e) => { error!("Failed to parse JSON-RPC response: {}. Raw: {}", e, text); }
}
}
GlooWsMessage::Bytes(_) => {
debug!("Received binary WebSocket message (WASM).");
}
}
#[cfg(not(target_arch = "wasm32"))]
match msg {
TungsteniteWsMessage::Text(text) => {
debug!("Received WebSocket message: {}", text);
// ... (parse logic as before)
match serde_json::from_str::<JsonRpcResponseClient>(&text) {
Ok(response) => {
if let Some(sender) = task_pending_requests.lock().unwrap().remove(&response.id) {
if let Err(failed_send_val) = sender.send(Ok(response)) {
if let Ok(resp_for_log) = failed_send_val { warn!("Failed to send response to waiting task for ID: {}", resp_for_log.id); }
else { warn!("Failed to send response to waiting task, and also failed to get original response for logging.");}
}
} else { warn!("Received response for unknown request ID or unsolicited message: {:?}", response); }
}
Err(e) => { error!("Failed to parse JSON-RPC response: {}. Raw: {}", e, text); }
}
}
TungsteniteWsMessage::Binary(_) => {
debug!("Received binary WebSocket message (Native).");
}
TungsteniteWsMessage::Ping(_) | TungsteniteWsMessage::Pong(_) => {
debug!("Received Ping/Pong (Native).");
}
TungsteniteWsMessage::Close(_) => {
info!("WebSocket connection closed by server (Native).");
break;
}
TungsteniteWsMessage::Frame(_) => {
debug!("Received Frame (Native) - not typically handled directly.");
}
}
}
Some(Err(e)) => {
error!("WebSocket receive error: {:?}", e);
break; // Exit loop on receive error
}
None => { // WebSocket stream closed
info!("WebSocket connection closed by server (stream ended).");
break;
}
}
} }
// If we reach here, we need to recreate internal_rx for the next iteration
// But since internal_rx was moved, we need to break out of the loop
break;
} }
} }
// Cleanup pending requests on exit Err(e) => {
task_pending_requests error!("❌ [{}] WebSocket connection failed: {:?}", log_url, e);
.lock() }
.unwrap()
.drain()
.for_each(|(_, sender)| {
let _ = sender.send(Err(CircleWsClientError::ConnectionError(
"WebSocket task terminated".to_string(),
)));
});
} }
Err(e) => {
error!("Failed to connect to WebSocket: {:?}", e); // Reset connection status
// Notify any waiting senders about the connection failure *is_connected.lock().unwrap() = false;
internal_rx
.for_each(|msg| async { // Wait before reconnecting
if let InternalWsMessage::SendJsonRpc(_, response_sender) = msg { info!("⏳ [{}] Waiting 5 seconds before reconnection attempt...", log_url);
let _ = response_sender #[cfg(target_arch = "wasm32")]
.send(Err(CircleWsClientError::ConnectionError(e.to_string()))); {
} use gloo_timers::future::TimeoutFuture;
}) TimeoutFuture::new(5_000).await;
.await; }
#[cfg(not(target_arch = "wasm32"))]
{
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
} }
} }
info!("WebSocket task finished.");
// Cleanup pending requests on exit
task_pending_requests
.lock()
.unwrap()
.drain()
.for_each(|(_, sender)| {
let _ = sender.send(Err(CircleWsClientError::ConnectionError(
"WebSocket task terminated".to_string(),
)));
});
info!("🏁 [{}] WebSocket task finished", log_url);
}; };
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
@@ -497,6 +494,261 @@ impl CircleWsClient {
Ok(()) Ok(())
} }
// Enhanced connection loop handler with keep-alive
#[cfg(target_arch = "wasm32")]
async fn handle_connection_with_keepalive(
ws_conn: WebSocket,
mut internal_rx: mpsc::Receiver<InternalWsMessage>,
pending_requests: &Arc<Mutex<HashMap<String, oneshot::Sender<Result<JsonRpcResponseClient, CircleWsClientError>>>>>,
log_url: &str,
is_connected: &Arc<Mutex<bool>>,
) -> String {
let (mut ws_tx, mut ws_rx) = ws_conn.split();
let mut internal_rx_fused = internal_rx.fuse();
// Connection validation for WASM - test if connection actually works
let mut connection_test_timer = TimeoutFuture::new(2_000).fuse(); // 2 second timeout
let mut connection_validated = false;
// Keep-alive timer - send ping every 30 seconds
use gloo_timers::future::TimeoutFuture;
let mut keep_alive_timer = TimeoutFuture::new(30_000).fuse();
// Send initial connection test ping
debug!("Sending initial connection test ping to {}", log_url);
let test_ping_res = ws_tx.send(GlooWsMessage::Text("ping".to_string())).await;
if let Err(e) = test_ping_res {
error!("❌ [{}] Initial connection test failed: {:?}", log_url, e);
*is_connected.lock().unwrap() = false;
return format!("Initial connection test failed: {}", e);
}
loop {
futures_util::select! {
// Connection test timeout - if no response in 2 seconds, connection failed
_ = connection_test_timer => {
if !connection_validated {
error!("❌ [{}] Connection test failed - no response within 2 seconds", log_url);
*is_connected.lock().unwrap() = false;
return "Connection test timeout - server not responding".to_string();
}
}
// Handle messages from the client's public methods (e.g., play)
internal_msg = internal_rx_fused.next().fuse() => {
match internal_msg {
Some(InternalWsMessage::SendJsonRpc(req, response_sender)) => {
let req_id = req.id.clone();
match serde_json::to_string(&req) {
Ok(req_str) => {
debug!("Sending JSON-RPC request (ID: {}): {}", req_id, req_str);
let send_res = ws_tx.send(GlooWsMessage::Text(req_str)).await;
if let Err(e) = send_res {
error!("WebSocket send error for request ID {}: {:?}", req_id, e);
// Connection failed - update status
*is_connected.lock().unwrap() = false;
let _ = response_sender.send(Err(CircleWsClientError::SendError(e.to_string())));
} else {
// Store the sender to await the response
pending_requests.lock().unwrap().insert(req_id, response_sender);
}
}
Err(e) => {
error!("Failed to serialize request ID {}: {}", req_id, e);
let _ = response_sender.send(Err(CircleWsClientError::JsonError(e)));
}
}
}
Some(InternalWsMessage::Close) => {
info!("Close message received internally, closing WebSocket.");
let _ = ws_tx.close().await;
return "Manual close requested".to_string();
}
None => {
info!("Internal MPSC channel closed, WebSocket task shutting down.");
let _ = ws_tx.close().await;
return "Internal channel closed".to_string();
}
}
},
// Handle messages received from the WebSocket server
ws_msg_res = ws_rx.next().fuse() => {
match ws_msg_res {
Some(Ok(msg)) => {
// Any successful message confirms the connection is working
if !connection_validated {
info!("✅ [{}] WebSocket connection validated - received message from server", log_url);
*is_connected.lock().unwrap() = true;
connection_validated = true;
}
match msg {
GlooWsMessage::Text(text) => {
debug!("Received WebSocket message: {}", text);
Self::handle_received_message(&text, pending_requests);
}
GlooWsMessage::Bytes(_) => {
debug!("Received binary WebSocket message (WASM).");
}
}
}
Some(Err(e)) => {
error!("WebSocket receive error: {:?}", e);
*is_connected.lock().unwrap() = false;
return format!("Receive error: {}", e);
}
None => {
info!("WebSocket connection closed by server (stream ended).");
*is_connected.lock().unwrap() = false;
return "Server closed connection (stream ended)".to_string();
}
}
}
// Keep-alive timer - send ping every 30 seconds
_ = keep_alive_timer => {
// Only send ping if connection is validated
if connection_validated {
debug!("Sending keep-alive ping to {}", log_url);
let ping_str = "ping"; // Send simple plaintext ping
let send_res = ws_tx.send(GlooWsMessage::Text(ping_str.to_string())).await;
if let Err(e) = send_res {
warn!("Keep-alive ping failed for {}: {:?}", log_url, e);
*is_connected.lock().unwrap() = false;
return format!("Keep-alive failed: {}", e);
}
} else {
debug!("Skipping keep-alive ping - connection not yet validated for {}", log_url);
}
// Reset timer
keep_alive_timer = TimeoutFuture::new(30_000).fuse();
}
}
}
}
// Enhanced connection loop handler with keep-alive for native targets
#[cfg(not(target_arch = "wasm32"))]
async fn handle_connection_with_keepalive(
ws_conn: tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
mut internal_rx: mpsc::Receiver<InternalWsMessage>,
pending_requests: &Arc<Mutex<HashMap<String, oneshot::Sender<Result<JsonRpcResponseClient, CircleWsClientError>>>>>,
log_url: &str,
_is_connected: &Arc<Mutex<bool>>,
) -> String {
let (mut ws_tx, mut ws_rx) = ws_conn.split();
let mut internal_rx_fused = internal_rx.fuse();
loop {
futures_util::select! {
// Handle messages from the client's public methods (e.g., play)
internal_msg = internal_rx_fused.next().fuse() => {
match internal_msg {
Some(InternalWsMessage::SendJsonRpc(req, response_sender)) => {
let req_id = req.id.clone();
match serde_json::to_string(&req) {
Ok(req_str) => {
debug!("Sending JSON-RPC request (ID: {}): {}", req_id, req_str);
let send_res = ws_tx.send(TungsteniteWsMessage::Text(req_str)).await;
if let Err(e) = send_res {
error!("WebSocket send error for request ID {}: {:?}", req_id, e);
let _ = response_sender.send(Err(CircleWsClientError::SendError(e.to_string())));
} else {
// Store the sender to await the response
pending_requests.lock().unwrap().insert(req_id, response_sender);
}
}
Err(e) => {
error!("Failed to serialize request ID {}: {}", req_id, e);
let _ = response_sender.send(Err(CircleWsClientError::JsonError(e)));
}
}
}
Some(InternalWsMessage::Close) => {
info!("Close message received internally, closing WebSocket.");
let _ = ws_tx.close().await;
return "Manual close requested".to_string();
}
None => {
info!("Internal MPSC channel closed, WebSocket task shutting down.");
let _ = ws_tx.close().await;
return "Internal channel closed".to_string();
}
}
},
// Handle messages received from the WebSocket server
ws_msg_res = ws_rx.next().fuse() => {
match ws_msg_res {
Some(Ok(msg)) => {
match msg {
TungsteniteWsMessage::Text(text) => {
debug!("Received WebSocket message: {}", text);
Self::handle_received_message(&text, pending_requests);
}
TungsteniteWsMessage::Binary(_) => {
debug!("Received binary WebSocket message (Native).");
}
TungsteniteWsMessage::Ping(_) | TungsteniteWsMessage::Pong(_) => {
debug!("Received Ping/Pong (Native).");
}
TungsteniteWsMessage::Close(_) => {
info!("WebSocket connection closed by server (Native).");
return "Server closed connection".to_string();
}
TungsteniteWsMessage::Frame(_) => {
debug!("Received Frame (Native) - not typically handled directly.");
}
}
}
Some(Err(e)) => {
error!("WebSocket receive error: {:?}", e);
return format!("Receive error: {}", e);
}
None => {
info!("WebSocket connection closed by server (stream ended).");
return "Server closed connection (stream ended)".to_string();
}
}
}
}
}
}
// Helper method to handle received messages
fn handle_received_message(
text: &str,
pending_requests: &Arc<Mutex<HashMap<String, oneshot::Sender<Result<JsonRpcResponseClient, CircleWsClientError>>>>>,
) {
// Handle ping/pong messages - these are not JSON-RPC
if text.trim() == "pong" {
debug!("Received pong response");
return;
}
match serde_json::from_str::<JsonRpcResponseClient>(text) {
Ok(response) => {
if let Some(sender) = pending_requests.lock().unwrap().remove(&response.id) {
if let Err(failed_send_val) = sender.send(Ok(response)) {
if let Ok(resp_for_log) = failed_send_val {
warn!("Failed to send response to waiting task for ID: {}", resp_for_log.id);
} else {
warn!("Failed to send response to waiting task, and also failed to get original response for logging.");
}
}
} else {
warn!("Received response for unknown request ID or unsolicited message: {:?}", response);
}
}
Err(e) => {
error!("Failed to parse JSON-RPC response: {}. Raw: {}", e, text);
}
}
}
pub fn play( pub fn play(
&self, &self,

4
src/server/Cargo.lock generated
View File

@@ -584,7 +584,7 @@ dependencies = [
"once_cell", "once_cell",
"rand 0.8.5", "rand 0.8.5",
"redis", "redis",
"rhai_client", "rhai_dispatcher",
"rustls", "rustls",
"rustls-pemfile", "rustls-pemfile",
"secp256k1", "secp256k1",
@@ -1769,7 +1769,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]] [[package]]
name = "rhai_client" name = "rhai_dispatcher"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"chrono", "chrono",

View File

@@ -44,7 +44,7 @@ redis = { workspace = true }
uuid = { workspace = true } uuid = { workspace = true }
tokio = { workspace = true } tokio = { workspace = true }
chrono = { workspace = true } chrono = { workspace = true }
rhai_client = { path = "../../../rhailib/src/client" } # Corrected relative path rhai_dispatcher = { path = "../../../rhailib/src/dispatcher" } # Corrected relative path
thiserror = { workspace = true } thiserror = { workspace = true }
heromodels = { path = "../../../db/heromodels" } heromodels = { path = "../../../db/heromodels" }

View File

@@ -37,6 +37,9 @@ struct Args {
#[clap(long, help = "Enable webhook handling")] #[clap(long, help = "Enable webhook handling")]
webhooks: bool, webhooks: bool,
#[clap(long, value_parser, help = "Worker ID for the server")]
worker_id: String,
} }
#[actix_web::main] #[actix_web::main]
@@ -90,17 +93,35 @@ async fn main() -> std::io::Result<()> {
std::process::exit(1); std::process::exit(1);
} }
let config = ServerConfig { let mut builder = ServerConfig::builder(
host: args.host, args.host,
port: args.port, args.port,
redis_url: args.redis_url, args.redis_url,
enable_auth: args.auth, args.worker_id,
enable_tls: args.tls, );
cert_path: args.cert,
key_path: args.key, if args.auth {
tls_port: args.tls_port, builder = builder.with_auth();
enable_webhooks: args.webhooks, }
};
if args.webhooks {
builder = builder.with_webhooks();
}
if args.tls {
if let (Some(cert), Some(key)) = (args.cert, args.key) {
builder = builder.with_tls(cert, key);
} else {
eprintln!("Error: TLS is enabled but --cert or --key is missing.");
std::process::exit(1);
}
}
if let Some(tls_port) = args.tls_port {
builder = builder.with_tls_port(tls_port);
}
let config = builder.build();
println!("🚀 Starting Circles WebSocket Server"); println!("🚀 Starting Circles WebSocket Server");
println!("📋 Configuration:"); println!("📋 Configuration:");

View File

@@ -90,7 +90,7 @@ sequenceDiagram
participant HS as HttpServer participant HS as HttpServer
participant WH as Webhook Handler participant WH as Webhook Handler
participant WV as Webhook Verifier participant WV as Webhook Verifier
participant RC as RhaiClient participant RC as RhaiDispatcher
participant Redis as Redis participant Redis as Redis
WS->>+HS: POST /webhooks/{provider}/{circle_pk} WS->>+HS: POST /webhooks/{provider}/{circle_pk}
@@ -102,7 +102,7 @@ sequenceDiagram
alt Signature Valid alt Signature Valid
WH->>WH: Parse webhook payload (heromodels types) WH->>WH: Parse webhook payload (heromodels types)
WH->>+RC: Create RhaiClient with caller_id WH->>+RC: Create RhaiDispatcher with caller_id
RC->>+Redis: Execute webhook script RC->>+Redis: Execute webhook script
Redis-->>-RC: Script result Redis-->>-RC: Script result
RC-->>-WH: Execution result RC-->>-WH: Execution result
@@ -128,6 +128,6 @@ sequenceDiagram
| **Connection Type** | Persistent, bidirectional | HTTP request/response | | **Connection Type** | Persistent, bidirectional | HTTP request/response |
| **Authentication** | secp256k1 signature-based | HMAC signature verification | | **Authentication** | secp256k1 signature-based | HMAC signature verification |
| **State Management** | Stateful sessions via CircleWs actor | Stateless HTTP requests | | **State Management** | Stateful sessions via CircleWs actor | Stateless HTTP requests |
| **Script Execution** | Direct via authenticated session | Via RhaiClient with provider caller_id | | **Script Execution** | Direct via authenticated session | Via RhaiDispatcher with provider caller_id |
| **Use Case** | Interactive client applications | External service notifications | | **Use Case** | Interactive client applications | External service notifications |
| **Data Types** | JSON-RPC messages | Provider-specific webhook payloads (heromodels) | | **Data Types** | JSON-RPC messages | Provider-specific webhook payloads (heromodels) |

View File

@@ -20,7 +20,7 @@ graph TB
F[Stripe Verifier] F[Stripe Verifier]
G[iDenfy Verifier] G[iDenfy Verifier]
H[Script Dispatcher] H[Script Dispatcher]
I[RhaiClientBuilder] I[RhaiDispatcherBuilder]
end end
subgraph "Configuration" subgraph "Configuration"
@@ -93,7 +93,7 @@ sequenceDiagram
participant CS as Circle Server participant CS as Circle Server
participant WV as Webhook Verifier participant WV as Webhook Verifier
participant SD as Script Dispatcher participant SD as Script Dispatcher
participant RC as RhaiClient participant RC as RhaiDispatcher
participant RW as Rhai Worker participant RW as Rhai Worker
WS->>CS: POST /webhooks/stripe/{circle_pk} WS->>CS: POST /webhooks/stripe/{circle_pk}
@@ -113,7 +113,7 @@ sequenceDiagram
alt Verification Success alt Verification Success
CS->>SD: Dispatch appropriate script CS->>SD: Dispatch appropriate script
SD->>RC: Create RhaiClientBuilder SD->>RC: Create RhaiDispatcherBuilder
RC->>RC: Set caller_id="stripe" or "idenfy" RC->>RC: Set caller_id="stripe" or "idenfy"
RC->>RC: Set recipient_id=circle_pk RC->>RC: Set recipient_id=circle_pk
RC->>RC: Set script="stripe_webhook_received" or "idenfy_webhook_received" RC->>RC: Set script="stripe_webhook_received" or "idenfy_webhook_received"
@@ -249,7 +249,7 @@ heromodels/src/models/
- **Type Organization**: Webhook payload types moved to `heromodels` library for reusability - **Type Organization**: Webhook payload types moved to `heromodels` library for reusability
- **Modular Handlers**: Separate handler files for each webhook provider - **Modular Handlers**: Separate handler files for each webhook provider
- **Simplified Architecture**: Removed unnecessary dispatcher complexity - **Simplified Architecture**: Removed unnecessary dispatcher complexity
- **Direct Script Execution**: Handlers directly use `RhaiClient` for script execution - **Direct Script Execution**: Handlers directly use `RhaiDispatcher` for script execution
### Modified Files ### Modified Files
- `src/lib.rs` - Add webhook routes and module imports - `src/lib.rs` - Add webhook routes and module imports

View File

@@ -3,7 +3,7 @@ use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws; use actix_web_actors::ws;
use log::{debug, info, error}; // Added error for better logging use log::{debug, info, error}; // Added error for better logging
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use rhai_client::{RhaiClientBuilder, RhaiClientError}; use rhai_dispatcher::{RhaiDispatcherBuilder, RhaiDispatcherError};
use rustls::pki_types::PrivateKeyDer; use rustls::pki_types::PrivateKeyDer;
use rustls::ServerConfig as RustlsServerConfig; use rustls::ServerConfig as RustlsServerConfig;
use rustls_pemfile::{certs, pkcs8_private_keys}; use rustls_pemfile::{certs, pkcs8_private_keys};
@@ -106,6 +106,7 @@ struct CircleWs {
nonce_store: HashMap<String, NonceResponse>, nonce_store: HashMap<String, NonceResponse>,
auth_enabled_on_server: bool, auth_enabled_on_server: bool,
authenticated_pubkey: Option<String>, authenticated_pubkey: Option<String>,
circle_worker_id: String,
} }
impl CircleWs { impl CircleWs {
@@ -114,6 +115,7 @@ impl CircleWs {
server_circle_public_key: String, server_circle_public_key: String,
redis_url_for_client: String, redis_url_for_client: String,
auth_enabled_on_server: bool, auth_enabled_on_server: bool,
circle_worker_id: String,
) -> Self { ) -> Self {
Self { Self {
server_circle_name, server_circle_name,
@@ -122,6 +124,7 @@ impl CircleWs {
nonce_store: HashMap::new(), nonce_store: HashMap::new(),
auth_enabled_on_server, auth_enabled_on_server,
authenticated_pubkey: None, authenticated_pubkey: None,
circle_worker_id,
} }
} }
@@ -284,17 +287,19 @@ impl CircleWs {
let redis_url_clone = self.redis_url_for_client.clone(); let redis_url_clone = self.redis_url_for_client.clone();
let _rpc_id_clone = client_rpc_id.clone(); let _rpc_id_clone = client_rpc_id.clone();
let public_key = self.authenticated_pubkey.clone(); let public_key = self.authenticated_pubkey.clone();
let worker_id_clone = self.circle_worker_id.clone();
let fut = async move { let fut = async move {
let caller_id = public_key.unwrap_or_else(|| "anonymous".to_string()); let caller_id = public_key.unwrap_or_else(|| "anonymous".to_string());
match RhaiClientBuilder::new() match RhaiDispatcherBuilder::new()
.redis_url(&redis_url_clone) .redis_url(&redis_url_clone)
.caller_id(&caller_id) .caller_id(&caller_id)
.build() { .build() {
Ok(rhai_client) => { Ok(rhai_dispatcher) => {
rhai_client rhai_dispatcher
.new_play_request() .new_play_request()
.recipient_id(&circle_pk_clone) .context_id(&circle_pk_clone)
.worker_id(&worker_id_clone)
.script(&script_content) .script(&script_content)
.timeout(TASK_TIMEOUT_DURATION) .timeout(TASK_TIMEOUT_DURATION)
.await_response() .await_response()
@@ -339,7 +344,7 @@ impl CircleWs {
} }
Err(e) => { Err(e) => {
let (code, message) = match e { let (code, message) = match e {
RhaiClientError::Timeout(task_id) => ( RhaiDispatcherError::Timeout(task_id) => (
-32002, -32002,
format!( format!(
"Timeout waiting for Rhai script (task: {})", "Timeout waiting for Rhai script (task: {})",
@@ -416,6 +421,13 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for CircleWs {
Ok(ws::Message::Text(text)) => { Ok(ws::Message::Text(text)) => {
debug!("WS Text for {}: {}", self.server_circle_name, text); debug!("WS Text for {}: {}", self.server_circle_name, text);
// Handle plaintext ping messages for keep-alive
if text.trim() == "ping" {
debug!("Received keep-alive ping from {}, responding with pong", self.server_circle_name);
ctx.text("pong");
return;
}
match serde_json::from_str::<JsonRpcRequest>(&text) { match serde_json::from_str::<JsonRpcRequest>(&text) {
Ok(req) => { Ok(req) => {
let client_rpc_id = req.id.clone().unwrap_or(Value::Null); let client_rpc_id = req.id.clone().unwrap_or(Value::Null);
@@ -490,58 +502,23 @@ pub struct ServerConfig {
pub host: String, pub host: String,
pub port: u16, pub port: u16,
pub redis_url: String, pub redis_url: String,
pub enable_auth: bool,
pub enable_tls: bool, pub enable_tls: bool,
pub cert_path: Option<String>, pub cert_path: Option<String>,
pub key_path: Option<String>, pub key_path: Option<String>,
pub tls_port: Option<u16>, pub tls_port: Option<u16>,
pub enable_auth: bool,
pub enable_webhooks: bool, pub enable_webhooks: bool,
pub circle_worker_id: String,
} }
impl ServerConfig { impl ServerConfig {
/// Create a new server configuration with TLS disabled pub fn builder(
pub fn new(
host: String, host: String,
port: u16, port: u16,
redis_url: String, redis_url: String,
) -> Self { worker_id: String,
Self { ) -> ServerConfigBuilder {
host, ServerConfigBuilder::new(host, port, redis_url, worker_id)
port,
redis_url,
enable_auth: false,
enable_tls: false,
cert_path: None,
key_path: None,
tls_port: None,
enable_webhooks: false,
}
}
/// Enable TLS with certificate and key paths
pub fn with_tls(mut self, cert_path: String, key_path: String) -> Self {
self.enable_tls = true;
self.cert_path = Some(cert_path);
self.key_path = Some(key_path);
self
}
/// Set a separate port for TLS connections
pub fn with_tls_port(mut self, tls_port: u16) -> Self {
self.tls_port = Some(tls_port);
self
}
/// Enable authentication
pub fn with_auth(mut self) -> Self {
self.enable_auth = true;
self
}
/// Enable webhooks
pub fn with_webhooks(mut self) -> Self {
self.enable_webhooks = true;
self
} }
/// Get the effective port for TLS connections /// Get the effective port for TLS connections
@@ -551,7 +528,75 @@ impl ServerConfig {
/// Check if TLS is properly configured /// Check if TLS is properly configured
pub fn is_tls_configured(&self) -> bool { pub fn is_tls_configured(&self) -> bool {
self.enable_tls && self.cert_path.is_some() && self.key_path.is_some() self.cert_path.is_some() && self.key_path.is_some()
}
}
/// ServerConfigBuilder
pub struct ServerConfigBuilder {
host: String,
port: u16,
redis_url: String,
enable_tls: bool,
cert_path: Option<String>,
key_path: Option<String>,
tls_port: Option<u16>,
enable_auth: bool,
enable_webhooks: bool,
circle_worker_id: String,
}
impl ServerConfigBuilder {
pub fn new(host: String, port: u16, redis_url: String, worker_id: String) -> Self {
Self {
host,
port,
redis_url,
enable_tls: false,
cert_path: None,
key_path: None,
tls_port: None,
enable_auth: false,
enable_webhooks: false,
circle_worker_id: worker_id,
}
}
pub fn with_tls(mut self, cert_path: String, key_path: String) -> Self {
self.enable_tls = true;
self.cert_path = Some(cert_path);
self.key_path = Some(key_path);
self
}
pub fn with_tls_port(mut self, tls_port: u16) -> Self {
self.tls_port = Some(tls_port);
self
}
pub fn with_auth(mut self) -> Self {
self.enable_auth = true;
self
}
pub fn with_webhooks(mut self) -> Self {
self.enable_webhooks = true;
self
}
pub fn build(self) -> ServerConfig {
ServerConfig {
host: self.host,
port: self.port,
redis_url: self.redis_url,
enable_tls: self.enable_tls,
cert_path: self.cert_path,
key_path: self.key_path,
tls_port: self.tls_port,
enable_auth: self.enable_auth,
enable_webhooks: self.enable_webhooks,
circle_worker_id: self.circle_worker_id,
}
} }
} }
@@ -613,23 +658,23 @@ fn load_rustls_config(
async fn ws_handler( async fn ws_handler(
req: HttpRequest, req: HttpRequest,
stream: web::Payload, stream: web::Payload,
path: web::Path<String>, config: web::Data<ServerConfig>,
server_config: web::Data<ServerConfig>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let circle_pk = path.into_inner(); let server_circle_name = req.match_info().get("circle_pk").unwrap_or("unknown").to_string();
let server_circle_public_key = server_circle_name.clone(); // Assuming pk is the name for now
info!(
"Incoming WebSocket connection for circle: {} (auth_enabled: {})",
circle_pk, server_config.enable_auth
);
let ws_actor = CircleWs::new_configured( // Create and start the WebSocket actor
format!("circle-{}", &circle_pk[..8]), // Use first 8 chars as display name ws::start(
circle_pk, CircleWs::new_configured(
server_config.redis_url.clone(), server_circle_name,
server_config.enable_auth, server_circle_public_key,
); config.redis_url.clone(),
ws::start(ws_actor, &req, stream) config.enable_auth,
config.circle_worker_id.clone(),
),
&req,
stream,
)
} }
pub fn spawn_circle_server( pub fn spawn_circle_server(
@@ -657,7 +702,7 @@ pub fn spawn_circle_server(
let webhook_app_state = create_webhook_app_state( let webhook_app_state = create_webhook_app_state(
webhook_config, webhook_config,
config.redis_url.clone(), config.redis_url.clone(),
"webhook_system".to_string() config.circle_worker_id.clone(),
); );
Some(web::Data::new(webhook_app_state)) Some(web::Data::new(webhook_app_state))
} }

View File

@@ -9,18 +9,24 @@ pub struct WebhookAppState {
pub config: WebhookConfig, pub config: WebhookConfig,
pub redis_url: String, pub redis_url: String,
pub caller_id: String, pub caller_id: String,
pub worker_id: String,
} }
/// Create webhook application state /// Create webhook application state
pub fn create_webhook_app_state( pub fn create_webhook_app_state(
config: WebhookConfig, config: WebhookConfig,
redis_url: String, redis_url: String,
caller_id: String, worker_id: String,
) -> WebhookAppState { ) -> WebhookAppState {
// For now, we'll use the worker_id as the caller_id for webhooks.
// This can be changed if a more specific caller_id is needed.
let caller_id = worker_id.clone();
WebhookAppState { WebhookAppState {
config, config,
redis_url, redis_url,
caller_id, caller_id,
worker_id,
} }
} }

View File

@@ -10,7 +10,7 @@ use actix_web::{web, HttpRequest, HttpResponse, Result as ActixResult};
use bytes::Bytes; use bytes::Bytes;
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
use serde_json; use serde_json;
use rhai_client::RhaiClientBuilder; use rhai_dispatcher::RhaiDispatcherBuilder;
/// Execute an iDenfy webhook script /// Execute an iDenfy webhook script
async fn execute_idenfy_webhook_script( async fn execute_idenfy_webhook_script(
@@ -24,12 +24,13 @@ async fn execute_idenfy_webhook_script(
circle_id, event.client_id, event.status circle_id, event.client_id, event.status
); );
// Create RhaiClient // Create RhaiDispatcher
let rhai_client = RhaiClientBuilder::new() let rhai_dispatcher = RhaiDispatcherBuilder::new()
.redis_url(redis_url) .redis_url(redis_url)
.caller_id(caller_id) .caller_id(caller_id)
.context_id(circle_id)
.build() .build()
.map_err(|e| WebhookError::ScriptExecutionError(format!("Failed to create RhaiClient: {}", e)))?; .map_err(|e| WebhookError::ScriptExecutionError(format!("Failed to create RhaiDispatcher: {}", e)))?;
// Serialize the event as JSON payload // Serialize the event as JSON payload
let event_json = serde_json::to_string(event) let event_json = serde_json::to_string(event)
@@ -43,9 +44,8 @@ async fn execute_idenfy_webhook_script(
debug!("Executing script: {}", script); debug!("Executing script: {}", script);
match rhai_client match rhai_dispatcher
.new_play_request() .new_play_request()
.recipient_id(circle_id)
.script(&script) .script(&script)
.timeout(std::time::Duration::from_secs(30)) .timeout(std::time::Duration::from_secs(30))
.await_response() .await_response()

View File

@@ -10,13 +10,14 @@ use actix_web::{web, HttpRequest, HttpResponse, Result as ActixResult};
use bytes::Bytes; use bytes::Bytes;
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
use serde_json; use serde_json;
use rhai_client::RhaiClientBuilder; use rhai_dispatcher::RhaiDispatcherBuilder;
/// Execute a Stripe webhook script /// Execute a Stripe webhook script
async fn execute_stripe_webhook_script( async fn execute_stripe_webhook_script(
redis_url: &str, redis_url: &str,
caller_id: &str, caller_id: &str,
circle_id: &str, circle_id: &str,
worker_id: &str,
event: &StripeWebhookEvent, event: &StripeWebhookEvent,
) -> Result<serde_json::Value, WebhookError> { ) -> Result<serde_json::Value, WebhookError> {
info!( info!(
@@ -24,12 +25,14 @@ async fn execute_stripe_webhook_script(
circle_id, event.event_type circle_id, event.event_type
); );
// Create RhaiClient // Create RhaiDispatcher
let rhai_client = RhaiClientBuilder::new() let rhai_dispatcher = RhaiDispatcherBuilder::new()
.redis_url(redis_url) .redis_url(redis_url)
.caller_id(caller_id) .caller_id(caller_id)
.worker_id(worker_id)
.context_id(circle_id)
.build() .build()
.map_err(|e| WebhookError::ScriptExecutionError(format!("Failed to create RhaiClient: {}", e)))?; .map_err(|e| WebhookError::ScriptExecutionError(format!("Failed to create RhaiDispatcher: {}", e)))?;
// Serialize the event as JSON payload // Serialize the event as JSON payload
let event_json = serde_json::to_string(event) let event_json = serde_json::to_string(event)
@@ -43,9 +46,8 @@ async fn execute_stripe_webhook_script(
debug!("Executing script: {}", script); debug!("Executing script: {}", script);
match rhai_client match rhai_dispatcher
.new_play_request() .new_play_request()
.recipient_id(circle_id)
.script(&script) .script(&script)
.timeout(std::time::Duration::from_secs(30)) .timeout(std::time::Duration::from_secs(30))
.await_response() .await_response()
@@ -161,6 +163,7 @@ pub async fn handle_stripe_webhook(
&data.redis_url, &data.redis_url,
&verification_result.caller_id, &verification_result.caller_id,
&circle_pk, &circle_pk,
&data.worker_id,
&stripe_event, &stripe_event,
).await { ).await {
Ok(script_result) => { Ok(script_result) => {