end to end job management support

This commit is contained in:
Timur Gordon 2025-07-30 08:36:55 +02:00
parent 7d7ff0f0ab
commit 32c2cbe0cc
20 changed files with 2686 additions and 442 deletions

5
Cargo.lock generated
View File

@ -1428,6 +1428,7 @@ version = "0.1.0"
name = "hero-websocket-examples"
version = "0.1.0"
dependencies = [
"env_logger",
"hero_websocket_client",
"hero_websocket_server",
"hex",
@ -1497,6 +1498,7 @@ dependencies = [
"gloo-console",
"gloo-net",
"gloo-timers",
"hero_job",
"hex",
"http 0.2.12",
"js-sys",
@ -1530,6 +1532,8 @@ dependencies = [
"dotenv",
"env_logger",
"futures-util",
"hero_dispatcher",
"hero_job",
"heromodels",
"hex",
"hmac",
@ -1539,7 +1543,6 @@ dependencies = [
"rand 0.8.5",
"redis 0.23.3",
"redis 0.25.4",
"rhai_dispatcher",
"rustls",
"rustls-pemfile 2.2.0",
"secp256k1",

View File

@ -19,6 +19,8 @@ pub enum DispatcherError {
ContextIdMissing,
/// Invalid input provided
InvalidInput(String),
/// Job operation error
JobError(hero_job::JobError),
}
impl From<redis::RedisError> for DispatcherError {
@ -33,6 +35,12 @@ impl From<serde_json::Error> for DispatcherError {
}
}
impl From<hero_job::JobError> for DispatcherError {
fn from(err: hero_job::JobError) -> Self {
DispatcherError::JobError(err)
}
}
impl std::fmt::Display for DispatcherError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
@ -50,6 +58,9 @@ impl std::fmt::Display for DispatcherError {
DispatcherError::InvalidInput(msg) => {
write!(f, "Invalid input: {}", msg)
}
DispatcherError::JobError(e) => {
write!(f, "Job error: {}", e)
}
}
}
}

View File

@ -11,6 +11,7 @@ pub use crate::job::JobBuilder;
// Re-export types from hero_job for public API
pub use hero_job::{Job, JobStatus, ScriptType};
#[derive(Clone)]
pub struct Dispatcher {
redis_client: redis::Client,
caller_id: String,
@ -218,6 +219,23 @@ impl Dispatcher {
Ok(())
}
// Method to start a previously created job
pub async fn start_job(
&self,
job_id: &str,
) -> Result<(), DispatcherError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await?;
// Load the job to get its script type
let job = Job::load_from_redis(&mut conn, job_id).await?;
// Select worker based on script type
let worker_id = self.select_worker_for_script_type(&job.script_type)?;
self.start_job_using_connection(&mut conn, job_id.to_string(), worker_id).await?;
Ok(())
}
// New method using dedicated reply queue with automatic worker selection
pub async fn run_job_and_await_result(
&self,

View File

@ -2,13 +2,76 @@
"openrpc": "1.2.6",
"info": {
"title": "Circle WebSocket Server API",
"version": "0.1.0",
"description": "API for interacting with a Circle's WebSocket server, primarily for Rhai script execution."
"version": "0.2.0",
"description": "API for interacting with a Circle's WebSocket server, supporting script execution and comprehensive job management."
},
"methods": [
{
"name": "fetch_nonce",
"summary": "Fetches a nonce for authentication purposes.",
"params": [
{
"name": "pubkey",
"description": "The public key to fetch a nonce for.",
"required": true,
"schema": {
"type": "string"
}
}
],
"result": {
"name": "nonceResult",
"description": "The nonce string for authentication.",
"schema": {
"type": "string"
}
}
},
{
"name": "authenticate",
"summary": "Authenticates a user with public key and signature.",
"params": [
{
"name": "pubkey",
"description": "The public key of the user.",
"required": true,
"schema": {
"type": "string"
}
},
{
"name": "signature",
"description": "The signature for authentication.",
"required": true,
"schema": {
"type": "string"
}
}
],
"result": {
"name": "authResult",
"description": "Authentication result.",
"schema": {
"type": "boolean"
}
}
},
{
"name": "whoami",
"summary": "Gets authentication status and user information.",
"params": [],
"result": {
"name": "whoamiResult",
"description": "User authentication information.",
"schema": {
"type": "object",
"additionalProperties": true
}
}
},
{
"name": "play",
"summary": "Executes a Rhai script on the server.",
"summary": "Executes a Rhai script on the server and returns the result immediately.",
"params": [
{
"name": "script",
@ -43,6 +106,227 @@
}
}
]
},
{
"name": "create_job",
"summary": "Creates a new job without starting it.",
"params": [
{
"name": "job",
"description": "The job to create.",
"required": true,
"schema": {
"$ref": "#/components/schemas/Job"
}
}
],
"result": {
"name": "createJobResult",
"description": "The ID of the created job.",
"schema": {
"type": "string"
}
}
},
{
"name": "start_job",
"summary": "Starts a previously created job.",
"params": [
{
"name": "job_id",
"description": "The ID of the job to start.",
"required": true,
"schema": {
"type": "string"
}
}
],
"result": {
"name": "startJobResult",
"description": "Confirmation that the job was started.",
"schema": {
"type": "object",
"properties": {
"success": {
"type": "boolean",
"description": "Whether the job was successfully started."
}
},
"required": ["success"]
}
}
},
{
"name": "run_job",
"summary": "Creates and runs a job, returning the result when complete.",
"params": [
{
"name": "script",
"description": "The script content to execute.",
"required": true,
"schema": {
"type": "string"
}
},
{
"name": "script_type",
"description": "The type of script (HeroScript, RhaiSAL, or RhaiDSL).",
"required": true,
"schema": {
"$ref": "#/components/schemas/ScriptType"
}
},
{
"name": "prerequisites",
"description": "List of job IDs that must complete before this job can run.",
"required": false,
"schema": {
"type": "array",
"items": {
"type": "string"
}
}
}
],
"result": {
"name": "runJobResult",
"description": "The job execution result.",
"schema": {
"type": "string"
}
}
},
{
"name": "get_job_status",
"summary": "Gets the current status of a job.",
"params": [
{
"name": "job_id",
"description": "The ID of the job to check.",
"required": true,
"schema": {
"type": "string"
}
}
],
"result": {
"name": "jobStatus",
"description": "The current job status.",
"schema": {
"$ref": "#/components/schemas/JobStatus"
}
}
},
{
"name": "get_job_output",
"summary": "Gets the output of a completed job.",
"params": [
{
"name": "job_id",
"description": "The ID of the job to get output for.",
"required": true,
"schema": {
"type": "string"
}
}
],
"result": {
"name": "jobOutput",
"description": "The job output, if available.",
"schema": {
"type": "string"
}
}
},
{
"name": "get_job_logs",
"summary": "Gets the logs of a job.",
"params": [
{
"name": "job_id",
"description": "The ID of the job to get logs for.",
"required": true,
"schema": {
"type": "string"
}
}
],
"result": {
"name": "jobLogs",
"description": "The job logs, if available.",
"schema": {
"$ref": "#/components/schemas/JobLogsResult"
}
}
},
{
"name": "list_jobs",
"summary": "Lists all job IDs in the system.",
"params": [],
"result": {
"name": "jobList",
"description": "List of all jobs.",
"schema": {
"type": "array",
"items": {
"$ref": "#/components/schemas/Job"
}
}
}
},
{
"name": "stop_job",
"summary": "Stops a running job.",
"params": [
{
"name": "job_id",
"description": "The ID of the job to stop.",
"required": true,
"schema": {
"type": "string"
}
}
],
"result": {
"name": "stopJobResult",
"description": "Confirmation that the stop request was sent.",
"schema": {
"type": "null"
}
}
},
{
"name": "delete_job",
"summary": "Deletes a job from the system.",
"params": [
{
"name": "job_id",
"description": "The ID of the job to delete.",
"required": true,
"schema": {
"type": "string"
}
}
],
"result": {
"name": "deleteJobResult",
"description": "Confirmation that the job was deleted.",
"schema": {
"type": "null"
}
}
},
{
"name": "clear_all_jobs",
"summary": "Clears all jobs from the system.",
"params": [],
"result": {
"name": "clearJobsResult",
"description": "Information about the cleared jobs.",
"schema": {
"type": "null"
}
}
}
],
"components": {
@ -56,6 +340,98 @@
}
},
"required": ["output"]
},
"ScriptType": {
"type": "string",
"enum": ["HeroScript", "RhaiSAL", "RhaiDSL"],
"description": "The type of script to execute."
},
"JobStatus": {
"type": "string",
"enum": ["Dispatched", "WaitingForPrerequisites", "Started", "Error", "Finished"],
"description": "The current status of a job."
},
"Job": {
"type": "object",
"properties": {
"id": {
"type": "string",
"description": "Unique identifier for the job."
},
"caller_id": {
"type": "string",
"description": "ID of the caller who created this job."
},
"context_id": {
"type": "string",
"description": "Context ID for the job execution."
},
"script": {
"type": "string",
"description": "The script content to execute."
},
"script_type": {
"$ref": "#/components/schemas/ScriptType"
},
"timeout": {
"type": "integer",
"description": "Timeout in seconds for script execution."
},
"retries": {
"type": "integer",
"description": "Number of retries on script execution failure."
},
"concurrent": {
"type": "boolean",
"description": "Whether to execute script in separate thread."
},
"log_path": {
"type": "string",
"description": "Path to write logs of script execution to."
},
"env_vars": {
"type": "object",
"additionalProperties": {
"type": "string"
},
"description": "Environment variables for script execution."
},
"prerequisites": {
"type": "array",
"items": {
"type": "string"
},
"description": "Job IDs that must complete before this job can run."
},
"dependents": {
"type": "array",
"items": {
"type": "string"
},
"description": "Job IDs that depend on this job completing."
},
"created_at": {
"type": "string",
"format": "date-time",
"description": "ISO 8601 timestamp when the job was created."
},
"updated_at": {
"type": "string",
"format": "date-time",
"description": "ISO 8601 timestamp when the job was last updated."
}
},
"required": ["id", "caller_id", "context_id", "script", "script_type", "timeout", "retries", "concurrent", "env_vars", "prerequisites", "dependents", "created_at", "updated_at"]
},
"JobLogsResult": {
"type": "object",
"properties": {
"logs": {
"type": ["string", "null"],
"description": "The job logs, null if not available."
}
},
"required": ["logs"]
}
}
}

View File

@ -17,6 +17,7 @@ futures-util = { workspace = true, features = ["sink"] }
thiserror = { workspace = true }
url = { workspace = true }
http = "0.2"
hero_job = { path = "../../../core/job" }
# Authentication dependencies
hex = { workspace = true }

View File

@ -148,7 +148,7 @@ async fn run_interactive_mode(client: hero_websocket_client::CircleWsClient) ->
// Execute the script
match client.play(input).await {
Ok(result) => {
console::log_1(&format!("📤 Result: {}", result.output).into());
console::log_1(&format!("📤 Result: {}", result).into());
}
Err(e) => {
console::log_1(&format!("❌ Script execution failed: {}", e).into());
@ -164,7 +164,7 @@ async fn execute_script(client: hero_websocket_client::CircleWsClient, script: S
match client.play(script).await {
Ok(result) => {
console::log_1(&result.output.into());
console::log_1(&result.into());
Ok(())
}
Err(e) => {
@ -209,7 +209,7 @@ async fn execute_script(client: hero_websocket_client::CircleWsClient, script: S
match client.play(script).await {
Ok(result) => {
println!("{}", result.output);
println!("{}", result);
Ok(())
}
Err(e) => {
@ -244,7 +244,7 @@ async fn run_interactive_mode(client: hero_websocket_client::CircleWsClient) ->
match client.play(input).await {
Ok(result) => {
println!("\n📤 Result: {}", result.output);
println!("\n📤 Result: {}", result);
}
Err(e) => {
error!("❌ Script execution failed: {}", e);

View File

@ -0,0 +1,34 @@
use std::sync::{Arc, Mutex};
use crate::CircleWsClient;
// Platform-specific imports removed - not needed in builder.rs
pub struct CircleWsClientBuilder {
ws_url: String,
private_key: Option<String>,
}
impl CircleWsClientBuilder {
pub fn new(ws_url: String) -> Self {
Self {
ws_url,
private_key: None,
}
}
pub fn with_keypair(mut self, private_key: String) -> Self {
self.private_key = Some(private_key);
self
}
pub fn build(self) -> CircleWsClient {
CircleWsClient {
ws_url: self.ws_url,
internal_tx: None,
#[cfg(not(target_arch = "wasm32"))]
task_handle: None,
private_key: self.private_key,
is_connected: Arc::new(Mutex::new(false)),
}
}
}

View File

@ -0,0 +1,40 @@
use serde::Deserialize;
use serde_json::Value;
use thiserror::Error;
#[derive(Deserialize, Debug, Clone)]
pub struct JsonRpcErrorClient {
pub code: i32,
pub message: String,
pub data: Option<Value>,
}
#[derive(Error, Debug)]
pub enum CircleWsClientError {
#[error("WebSocket connection error: {0}")]
ConnectionError(String),
#[error("WebSocket send error: {0}")]
SendError(String),
#[error("WebSocket receive error: {0}")]
ReceiveError(String),
#[error("JSON serialization/deserialization error: {0}")]
JsonError(#[from] serde_json::Error),
#[error("Request timed out for request ID: {0}")]
Timeout(String),
#[error("JSON-RPC error response: {code} - {message}")]
JsonRpcError {
code: i32,
message: String,
data: Option<Value>,
},
#[error("No response received for request ID: {0}")]
NoResponse(String),
#[error("Client is not connected")]
NotConnected,
#[error("Internal channel error: {0}")]
ChannelError(String),
#[error("Authentication error: {0}")]
Auth(#[from] crate::auth::AuthError),
#[error("Authentication requires a keypair, but none was provided.")]
AuthNoKeyPair,
}

View File

@ -5,12 +5,15 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use thiserror::Error;
use uuid::Uuid;
// Authentication module
pub mod auth;
pub mod builder;
pub mod methods;
pub mod error;
pub use error::{JsonRpcErrorClient, CircleWsClientError};
pub use auth::{AuthCredentials, AuthError, AuthResult};
// Platform-specific WebSocket imports and spawn function
@ -51,69 +54,6 @@ pub struct JsonRpcResponseClient {
pub id: String,
}
#[derive(Deserialize, Debug, Clone)]
pub struct JsonRpcErrorClient {
pub code: i32,
pub message: String,
pub data: Option<Value>,
}
#[derive(Serialize, Debug, Clone)]
pub struct PlayParamsClient {
pub script: String,
}
#[derive(Deserialize, Debug, Clone)]
pub struct PlayResultClient {
pub output: String,
}
#[derive(Serialize, Debug, Clone)]
pub struct AuthCredentialsParams {
pub pubkey: String,
pub signature: String,
}
#[derive(Serialize, Debug, Clone)]
pub struct FetchNonceParams {
pub pubkey: String,
}
#[derive(Deserialize, Debug, Clone)]
pub struct FetchNonceResponse {
pub nonce: String,
}
#[derive(Error, Debug)]
pub enum CircleWsClientError {
#[error("WebSocket connection error: {0}")]
ConnectionError(String),
#[error("WebSocket send error: {0}")]
SendError(String),
#[error("WebSocket receive error: {0}")]
ReceiveError(String),
#[error("JSON serialization/deserialization error: {0}")]
JsonError(#[from] serde_json::Error),
#[error("Request timed out for request ID: {0}")]
Timeout(String),
#[error("JSON-RPC error response: {code} - {message}")]
JsonRpcError {
code: i32,
message: String,
data: Option<Value>,
},
#[error("No response received for request ID: {0}")]
NoResponse(String),
#[error("Client is not connected")]
NotConnected,
#[error("Internal channel error: {0}")]
ChannelError(String),
#[error("Authentication error: {0}")]
Auth(#[from] auth::AuthError),
#[error("Authentication requires a keypair, but none was provided.")]
AuthNoKeyPair,
}
// Wrapper for messages sent to the WebSocket task
enum InternalWsMessage {
SendJsonRpc(
@ -183,116 +123,6 @@ impl CircleWsClient {
}
impl CircleWsClient {
pub async fn authenticate(&mut self) -> Result<bool, CircleWsClientError> {
info!("🔐 [{}] Starting authentication process...", self.ws_url);
let private_key = self
.private_key
.as_ref()
.ok_or(CircleWsClientError::AuthNoKeyPair)?;
info!("🔑 [{}] Deriving public key from private key...", self.ws_url);
let public_key = auth::derive_public_key(private_key)?;
info!("✅ [{}] Public key derived: {}...", self.ws_url, &public_key[..8]);
info!("🎫 [{}] Fetching authentication nonce...", self.ws_url);
let nonce = self.fetch_nonce(&public_key).await?;
info!("✅ [{}] Nonce received: {}...", self.ws_url, &nonce[..8]);
info!("✍️ [{}] Signing nonce with private key...", self.ws_url);
let signature = auth::sign_message(private_key, &nonce)?;
info!("✅ [{}] Signature created: {}...", self.ws_url, &signature[..8]);
info!("🔒 [{}] Submitting authentication credentials...", self.ws_url);
let result = self.authenticate_with_signature(&public_key, &signature).await?;
if result {
info!("🎉 [{}] Authentication successful!", self.ws_url);
} else {
error!("❌ [{}] Authentication failed - server rejected credentials", self.ws_url);
}
Ok(result)
}
async fn fetch_nonce(&self, pubkey: &str) -> Result<String, CircleWsClientError> {
info!("📡 [{}] Sending fetch_nonce request for pubkey: {}...", self.ws_url, &pubkey[..8]);
let params = FetchNonceParams {
pubkey: pubkey.to_string(),
};
let req = self.create_request("fetch_nonce", params)?;
let res = self.send_request(req).await?;
if let Some(err) = res.error {
error!("❌ [{}] fetch_nonce failed: {} (code: {})", self.ws_url, err.message, err.code);
return Err(CircleWsClientError::JsonRpcError {
code: err.code,
message: err.message,
data: err.data,
});
}
let nonce_res: FetchNonceResponse = serde_json::from_value(res.result.unwrap_or_default())?;
info!("✅ [{}] fetch_nonce successful, nonce length: {}", self.ws_url, nonce_res.nonce.len());
Ok(nonce_res.nonce)
}
async fn authenticate_with_signature(
&self,
pubkey: &str,
signature: &str,
) -> Result<bool, CircleWsClientError> {
info!("📡 [{}] Sending authenticate request with signature...", self.ws_url);
let params = AuthCredentialsParams {
pubkey: pubkey.to_string(),
signature: signature.to_string(),
};
let req = self.create_request("authenticate", params)?;
let res = self.send_request(req).await?;
if let Some(err) = res.error {
error!("❌ [{}] authenticate failed: {} (code: {})", self.ws_url, err.message, err.code);
return Err(CircleWsClientError::JsonRpcError {
code: err.code,
message: err.message,
data: err.data,
});
}
let authenticated = res
.result
.and_then(|v| v.get("authenticated").and_then(|v| v.as_bool()))
.unwrap_or(false);
if authenticated {
info!("✅ [{}] authenticate request successful - server confirmed authentication", self.ws_url);
} else {
error!("❌ [{}] authenticate request failed - server returned false", self.ws_url);
}
Ok(authenticated)
}
/// Call the whoami method to get authentication status and user information
pub async fn whoami(&self) -> Result<Value, CircleWsClientError> {
let req = self.create_request("whoami", serde_json::json!({}))?;
let response = self.send_request(req).await?;
if let Some(result) = response.result {
Ok(result)
} else if let Some(error) = response.error {
Err(CircleWsClientError::JsonRpcError {
code: error.code,
message: error.message,
data: error.data,
})
} else {
Err(CircleWsClientError::NoResponse("whoami".to_string()))
}
}
fn create_request<T: Serialize>(
&self,
method: &str,
@ -676,7 +506,7 @@ impl CircleWsClient {
ws_conn: tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>,
mut internal_rx: mpsc::Receiver<InternalWsMessage>,
pending_requests: &Arc<Mutex<HashMap<String, oneshot::Sender<Result<JsonRpcResponseClient, CircleWsClientError>>>>>,
log_url: &str,
_log_url: &str,
_is_connected: &Arc<Mutex<bool>>,
) -> String {
let (mut ws_tx, mut ws_rx) = ws_conn.split();
@ -811,141 +641,6 @@ impl CircleWsClient {
}
}
pub fn play(
&self,
script: String,
) -> impl std::future::Future<Output = Result<PlayResultClient, CircleWsClientError>> + Send + 'static
{
let req_id_outer = Uuid::new_v4().to_string();
// Clone the sender option. The sender itself (mpsc::Sender) is also Clone.
let internal_tx_clone_opt = self.internal_tx.clone();
async move {
let req_id = req_id_outer; // Move req_id into the async block
let params = PlayParamsClient { script }; // script is moved in
let request = match serde_json::to_value(params) {
Ok(p_val) => JsonRpcRequestClient {
jsonrpc: "2.0".to_string(),
method: "play".to_string(),
params: p_val,
id: req_id.clone(),
},
Err(e) => return Err(CircleWsClientError::JsonError(e)),
};
let (response_tx, response_rx) = oneshot::channel();
if let Some(mut internal_tx) = internal_tx_clone_opt {
internal_tx
.send(InternalWsMessage::SendJsonRpc(request, response_tx))
.await
.map_err(|e| {
CircleWsClientError::ChannelError(format!(
"Failed to send request to internal task: {}",
e
))
})?;
} else {
return Err(CircleWsClientError::NotConnected);
}
// Add a timeout for waiting for the response
// For simplicity, using a fixed timeout here. Could be configurable.
#[cfg(target_arch = "wasm32")]
{
match response_rx.await {
Ok(Ok(rpc_response)) => {
if let Some(json_rpc_error) = rpc_response.error {
Err(CircleWsClientError::JsonRpcError {
code: json_rpc_error.code,
message: json_rpc_error.message,
data: json_rpc_error.data,
})
} else if let Some(result_value) = rpc_response.result {
serde_json::from_value(result_value)
.map_err(CircleWsClientError::JsonError)
} else {
Err(CircleWsClientError::NoResponse(req_id.clone()))
}
}
Ok(Err(e)) => Err(e), // Error propagated from the ws task
Err(_) => Err(CircleWsClientError::Timeout(req_id.clone())), // oneshot channel cancelled
}
}
#[cfg(not(target_arch = "wasm32"))]
{
use tokio::time::timeout as tokio_timeout;
match tokio_timeout(std::time::Duration::from_secs(10), response_rx).await {
Ok(Ok(Ok(rpc_response))) => {
// Timeout -> Result<ChannelRecvResult, Error>
if let Some(json_rpc_error) = rpc_response.error {
Err(CircleWsClientError::JsonRpcError {
code: json_rpc_error.code,
message: json_rpc_error.message,
data: json_rpc_error.data,
})
} else if let Some(result_value) = rpc_response.result {
serde_json::from_value(result_value)
.map_err(CircleWsClientError::JsonError)
} else {
Err(CircleWsClientError::NoResponse(req_id.clone()))
}
}
Ok(Ok(Err(e))) => Err(e), // Error propagated from the ws task
Ok(Err(_)) => Err(CircleWsClientError::ChannelError(
"Response channel cancelled".to_string(),
)), // oneshot cancelled
Err(_) => Err(CircleWsClientError::Timeout(req_id.clone())), // tokio_timeout expired
}
}
}
}
/// Send a plaintext ping message and wait for pong response
pub async fn ping(&mut self) -> Result<String, CircleWsClientError> {
if let Some(mut tx) = self.internal_tx.clone() {
let (response_tx, response_rx) = oneshot::channel();
// Send plaintext ping message
tx.send(InternalWsMessage::SendPlaintext("ping".to_string(), response_tx))
.await
.map_err(|e| {
CircleWsClientError::ChannelError(format!(
"Failed to send ping request to internal task: {}",
e
))
})?;
// Wait for pong response with timeout
#[cfg(target_arch = "wasm32")]
{
match response_rx.await {
Ok(Ok(response)) => Ok(response),
Ok(Err(e)) => Err(e),
Err(_) => Err(CircleWsClientError::ChannelError(
"Ping response channel cancelled".to_string(),
)),
}
}
#[cfg(not(target_arch = "wasm32"))]
{
use tokio::time::timeout as tokio_timeout;
match tokio_timeout(std::time::Duration::from_secs(10), response_rx).await {
Ok(Ok(Ok(response))) => Ok(response),
Ok(Ok(Err(e))) => Err(e),
Ok(Err(_)) => Err(CircleWsClientError::ChannelError(
"Ping response channel cancelled".to_string(),
)),
Err(_) => Err(CircleWsClientError::Timeout("ping".to_string())),
}
}
} else {
Err(CircleWsClientError::NotConnected)
}
}
pub async fn disconnect(&mut self) {
if let Some(mut tx) = self.internal_tx.take() {
info!("Sending close signal to internal WebSocket task.");

View File

@ -0,0 +1,500 @@
use futures_channel::oneshot;
use futures_util::SinkExt;
use log::{error, info};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;
use hero_job::{Job, JobStatus};
use crate::{CircleWsClient, CircleWsClientError, InternalWsMessage, JsonRpcRequestClient, auth};
// Platform-specific WebSocket imports removed - not needed in methods.rs
// JSON-RPC structures are imported from crate root
#[derive(Serialize, Debug, Clone)]
pub struct PlayParamsClient {
pub script: String,
}
#[derive(Deserialize, Debug, Clone)]
pub struct PlayResultClient {
pub output: String,
}
#[derive(Serialize, Debug, Clone)]
pub struct AuthCredentialsParams {
pub pubkey: String,
pub signature: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct JobLogsResult {
pub logs: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ClearJobsResult {
pub deleted_count: usize,
}
impl CircleWsClient {
pub async fn authenticate(&mut self) -> Result<bool, CircleWsClientError> {
info!("🔐 [{}] Starting authentication process...", self.ws_url);
let private_key = self
.private_key
.as_ref()
.ok_or(CircleWsClientError::AuthNoKeyPair)?;
info!("🔑 [{}] Deriving public key from private key...", self.ws_url);
let public_key = auth::derive_public_key(private_key)?;
info!("✅ [{}] Public key derived: {}...", self.ws_url, &public_key[..8]);
info!("🎫 [{}] Fetching authentication nonce...", self.ws_url);
let nonce = self.fetch_nonce(&public_key).await?;
info!("✅ [{}] Nonce received: {}...", self.ws_url, &nonce[..8]);
info!("✍️ [{}] Signing nonce with private key...", self.ws_url);
let signature = auth::sign_message(private_key, &nonce)?;
info!("✅ [{}] Signature created: {}...", self.ws_url, &signature[..8]);
info!("🔒 [{}] Submitting authentication credentials...", self.ws_url);
let result = self.authenticate_with_signature(&public_key, &signature).await?;
if result {
info!("🎉 [{}] Authentication successful!", self.ws_url);
} else {
error!("❌ [{}] Authentication failed - server rejected credentials", self.ws_url);
}
Ok(result)
}
async fn fetch_nonce(&self, pubkey: &str) -> Result<String, CircleWsClientError> {
info!("📡 [{}] Sending fetch_nonce request for pubkey: {}...", self.ws_url, &pubkey[..8]);
let req = self.create_request("fetch_nonce", pubkey.to_string())?;
let res = self.send_request(req).await?;
if let Some(err) = res.error {
error!("❌ [{}] fetch_nonce failed: {} (code: {})", self.ws_url, err.message, err.code);
return Err(CircleWsClientError::JsonRpcError {
code: err.code,
message: err.message,
data: err.data,
});
}
let nonce = res.result
.and_then(|v| v.as_str().map(|s| s.to_string()))
.ok_or_else(|| CircleWsClientError::JsonRpcError {
code: -32603,
message: "Invalid nonce response format".to_string(),
data: None,
})?;
info!("✅ [{}] fetch_nonce successful, nonce length: {}", self.ws_url, nonce.len());
Ok(nonce)
}
async fn authenticate_with_signature(
&self,
pubkey: &str,
signature: &str,
) -> Result<bool, CircleWsClientError> {
info!("📡 [{}] Sending authenticate request with signature...", self.ws_url);
let params = AuthCredentialsParams {
pubkey: pubkey.to_string(),
signature: signature.to_string(),
};
let req = self.create_request("authenticate", params)?;
let res = self.send_request(req).await?;
if let Some(err) = res.error {
error!("❌ [{}] authenticate failed: {} (code: {})", self.ws_url, err.message, err.code);
return Err(CircleWsClientError::JsonRpcError {
code: err.code,
message: err.message,
data: err.data,
});
}
let authenticated = res
.result
.and_then(|v| v.get("authenticated").and_then(|v| v.as_bool()))
.unwrap_or(false);
if authenticated {
info!("✅ [{}] authenticate request successful - server confirmed authentication", self.ws_url);
} else {
error!("❌ [{}] authenticate request failed - server returned false", self.ws_url);
}
Ok(authenticated)
}
/// Call the whoami method to get authentication status and user information
pub async fn whoami(&self) -> Result<Value, CircleWsClientError> {
let req = self.create_request("whoami", serde_json::json!({}))?;
let response = self.send_request(req).await?;
if let Some(result) = response.result {
Ok(result)
} else if let Some(error) = response.error {
Err(CircleWsClientError::JsonRpcError {
code: error.code,
message: error.message,
data: error.data,
})
} else {
Err(CircleWsClientError::NoResponse("whoami".to_string()))
}
}
pub fn play(
&self,
script: String,
) -> impl std::future::Future<Output = Result<String, CircleWsClientError>> + Send + 'static
{
let req_id_outer = Uuid::new_v4().to_string();
// Clone the sender option. The sender itself (mpsc::Sender) is also Clone.
let internal_tx_clone_opt = self.internal_tx.clone();
async move {
let req_id = req_id_outer; // Move req_id into the async block
let params = PlayParamsClient { script }; // script is moved in
let request = match serde_json::to_value(params) {
Ok(p_val) => JsonRpcRequestClient {
jsonrpc: "2.0".to_string(),
method: "play".to_string(),
params: p_val,
id: req_id.clone(),
},
Err(e) => return Err(CircleWsClientError::JsonError(e)),
};
let (response_tx, response_rx) = oneshot::channel();
if let Some(mut internal_tx) = internal_tx_clone_opt {
internal_tx
.send(InternalWsMessage::SendJsonRpc(request, response_tx))
.await
.map_err(|e| {
CircleWsClientError::ChannelError(format!(
"Failed to send request to internal task: {}",
e
))
})?;
} else {
return Err(CircleWsClientError::NotConnected);
}
// Add a timeout for waiting for the response
// For simplicity, using a fixed timeout here. Could be configurable.
#[cfg(target_arch = "wasm32")]
{
match response_rx.await {
Ok(Ok(rpc_response)) => {
if let Some(json_rpc_error) = rpc_response.error {
Err(CircleWsClientError::JsonRpcError {
code: json_rpc_error.code,
message: json_rpc_error.message,
data: json_rpc_error.data,
})
} else if let Some(result_value) = rpc_response.result {
// Extract output string directly from the result
if let Some(output) = result_value.get("output").and_then(|v| v.as_str()) {
Ok(output.to_string())
} else {
Err(CircleWsClientError::JsonRpcError {
code: -32603,
message: "Invalid play response format - missing output field".to_string(),
data: None,
})
}
} else {
Err(CircleWsClientError::NoResponse(req_id.clone()))
}
}
Ok(Err(e)) => Err(e), // Error propagated from the ws task
Err(_) => Err(CircleWsClientError::Timeout(req_id.clone())), // oneshot channel cancelled
}
}
#[cfg(not(target_arch = "wasm32"))]
{
use tokio::time::timeout as tokio_timeout;
match tokio_timeout(std::time::Duration::from_secs(10), response_rx).await {
Ok(Ok(Ok(rpc_response))) => {
// Timeout -> Result<ChannelRecvResult, Error>
if let Some(json_rpc_error) = rpc_response.error {
Err(CircleWsClientError::JsonRpcError {
code: json_rpc_error.code,
message: json_rpc_error.message,
data: json_rpc_error.data,
})
} else if let Some(result_value) = rpc_response.result {
// Extract output string directly from the result
if let Some(output) = result_value.get("output").and_then(|v| v.as_str()) {
Ok(output.to_string())
} else {
Err(CircleWsClientError::JsonRpcError {
code: -32603,
message: "Invalid play response format - missing output field".to_string(),
data: None,
})
}
} else {
Err(CircleWsClientError::NoResponse(req_id.clone()))
}
}
Ok(Ok(Err(e))) => Err(e), // Error propagated from the ws task
Ok(Err(_)) => Err(CircleWsClientError::ChannelError(
"Response channel cancelled".to_string(),
)), // oneshot cancelled
Err(_) => Err(CircleWsClientError::Timeout(req_id.clone())), // tokio_timeout expired
}
}
}
}
/// Send a plaintext ping message and wait for pong response
pub async fn ping(&mut self) -> Result<String, CircleWsClientError> {
if let Some(mut tx) = self.internal_tx.clone() {
let (response_tx, response_rx) = oneshot::channel();
// Send plaintext ping message
tx.send(InternalWsMessage::SendPlaintext("ping".to_string(), response_tx))
.await
.map_err(|e| {
CircleWsClientError::ChannelError(format!(
"Failed to send ping request to internal task: {}",
e
))
})?;
// Wait for pong response with timeout
#[cfg(target_arch = "wasm32")]
{
match response_rx.await {
Ok(Ok(response)) => Ok(response),
Ok(Err(e)) => Err(e),
Err(_) => Err(CircleWsClientError::ChannelError(
"Ping response channel cancelled".to_string(),
)),
}
}
#[cfg(not(target_arch = "wasm32"))]
{
use tokio::time::timeout as tokio_timeout;
match tokio_timeout(std::time::Duration::from_secs(10), response_rx).await {
Ok(Ok(Ok(response))) => Ok(response),
Ok(Ok(Err(e))) => Err(e),
Ok(Err(_)) => Err(CircleWsClientError::ChannelError(
"Ping response channel cancelled".to_string(),
)),
Err(_) => Err(CircleWsClientError::Timeout("ping".to_string())),
}
}
} else {
Err(CircleWsClientError::NotConnected)
}
}
/// Create a new job without starting it
pub async fn create_job(
&self,
job: Job,
) -> Result<String, CircleWsClientError> {
let req = self.create_request("create_job", job)?;
let response = self.send_request(req).await?;
if let Some(result) = response.result {
serde_json::from_value(result).map_err(CircleWsClientError::JsonError)
} else if let Some(error) = response.error {
Err(CircleWsClientError::JsonRpcError {
code: error.code,
message: error.message,
data: error.data,
})
} else {
Err(CircleWsClientError::ReceiveError(
"No result or error in response".to_string(),
))
}
}
/// Create and run a job, returning the result when complete
pub async fn run_job(
&self,
job: &Job,
) -> Result<String, CircleWsClientError> {
let req = self.create_request("run_job", job)?;
let response = self.send_request(req).await?;
if let Some(result) = response.result {
serde_json::from_value(result).map_err(CircleWsClientError::JsonError)
} else if let Some(error) = response.error {
Err(CircleWsClientError::JsonRpcError {
code: error.code,
message: error.message,
data: error.data,
})
} else {
Err(CircleWsClientError::ReceiveError(
"No result or error in response".to_string(),
))
}
}
/// Get the status of a job
pub async fn get_job_status(&self, job_id: String) -> Result<JobStatus, CircleWsClientError> {
let params = job_id;
let req = self.create_request("get_job_status", params)?;
let response = self.send_request(req).await?;
if let Some(result) = response.result {
serde_json::from_value(result).map_err(CircleWsClientError::JsonError)
} else if let Some(error) = response.error {
Err(CircleWsClientError::JsonRpcError {
code: error.code,
message: error.message,
data: error.data,
})
} else {
Err(CircleWsClientError::ReceiveError(
"No result or error in response".to_string(),
))
}
}
/// Get the output of a job
pub async fn get_job_output(&self, job_id: String) -> Result<String, CircleWsClientError> {
let params = job_id;
let req = self.create_request("get_job_output", params)?;
let response = self.send_request(req).await?;
if let Some(result) = response.result {
serde_json::from_value(result).map_err(CircleWsClientError::JsonError)
} else if let Some(error) = response.error {
Err(CircleWsClientError::JsonRpcError {
code: error.code,
message: error.message,
data: error.data,
})
} else {
Err(CircleWsClientError::ReceiveError(
"No result or error in response".to_string(),
))
}
}
/// Get the logs of a job
pub async fn get_job_logs(&self, job_id: String) -> Result<JobLogsResult, CircleWsClientError> {
let params = job_id;
let req = self.create_request("get_job_logs", params)?;
let response = self.send_request(req).await?;
if let Some(result) = response.result {
serde_json::from_value(result).map_err(CircleWsClientError::JsonError)
} else if let Some(error) = response.error {
Err(CircleWsClientError::JsonRpcError {
code: error.code,
message: error.message,
data: error.data,
})
} else {
Err(CircleWsClientError::ReceiveError(
"No result or error in response".to_string(),
))
}
}
/// List all job IDs in the system
pub async fn list_jobs(&self) -> Result<Vec<Job>, CircleWsClientError> {
let req = self.create_request("list_jobs", serde_json::Value::Null)?;
let response = self.send_request(req).await?;
if let Some(result) = response.result {
serde_json::from_value(result).map_err(CircleWsClientError::JsonError)
} else if let Some(error) = response.error {
Err(CircleWsClientError::JsonRpcError {
code: error.code,
message: error.message,
data: error.data,
})
} else {
Err(CircleWsClientError::ReceiveError(
"No result or error in response".to_string(),
))
}
}
/// Stop a running job
pub async fn stop_job(&self, job_id: String) -> Result<(), CircleWsClientError> {
let req = self.create_request("stop_job", job_id)?;
let response = self.send_request(req).await?;
if let Some(result) = response.result {
serde_json::from_value(result).map_err(CircleWsClientError::JsonError)
} else if let Some(error) = response.error {
Err(CircleWsClientError::JsonRpcError {
code: error.code,
message: error.message,
data: error.data,
})
} else {
Err(CircleWsClientError::ReceiveError(
"No result or error in response".to_string(),
))
}
}
/// Delete a job from the system
pub async fn delete_job(&self, job_id: String) -> Result<(), CircleWsClientError> {
let req = self.create_request("delete_job", job_id)?;
let response = self.send_request(req).await?;
if let Some(error) = response.error {
Err(CircleWsClientError::JsonRpcError {
code: error.code,
message: error.message,
data: error.data,
})
} else if response.result.is_some() {
// Success - return void
Ok(())
} else {
Err(CircleWsClientError::ReceiveError(
"No result or error in response".to_string(),
))
}
}
/// Clear all jobs from the system
pub async fn clear_all_jobs(&self) -> Result<(), CircleWsClientError> {
let req = self.create_request("clear_all_jobs", serde_json::Value::Null)?;
let response = self.send_request(req).await?;
if let Some(error) = response.error {
Err(CircleWsClientError::JsonRpcError {
code: error.code,
message: error.message,
data: error.data,
})
} else if response.result.is_some() {
// Success - return void
Ok(())
} else {
Err(CircleWsClientError::ReceiveError(
"No result or error in response".to_string(),
))
}
}
}

View File

@ -10,6 +10,7 @@ tokio = { version = "1.0", features = ["full"] }
k256 = { version = "0.13", features = ["ecdsa", "sha256"] }
rand = "0.8"
hex = "0.4"
env_logger = "0.10"
[[bin]]
name = "ping"
@ -19,6 +20,10 @@ path = "src/ping.rs"
name = "auth"
path = "src/auth.rs"
[[bin]]
name = "circle_auth"
path = "src/circle_auth.rs"
[[bin]]
name = "play"
path = "src/play.rs"

View File

@ -0,0 +1,234 @@
use hero_websocket_client::CircleWsClientBuilder;
use hero_websocket_server::ServerBuilder;
use tokio::signal;
use tokio::time::{sleep, Duration};
use k256::ecdsa::SigningKey;
use k256::elliptic_curve::sec1::ToEncodedPoint;
use rand::rngs::OsRng;
use std::collections::HashMap;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
println!("🔗 Circle-based Authentication Example");
println!("=====================================");
// Generate keypairs for different users
let (alice_pubkey, alice_privkey) = generate_keypair();
let (bob_pubkey, bob_privkey) = generate_keypair();
let (charlie_pubkey, charlie_privkey) = generate_keypair();
let (eve_pubkey, eve_privkey) = generate_keypair(); // Not in any circle
println!("👥 Generated test users:");
println!(" Alice: {}...{}", &alice_pubkey[..10], &alice_pubkey[alice_pubkey.len()-10..]);
println!(" Bob: {}...{}", &bob_pubkey[..10], &bob_pubkey[bob_pubkey.len()-10..]);
println!(" Charlie: {}...{}", &charlie_pubkey[..10], &charlie_pubkey[charlie_pubkey.len()-10..]);
println!(" Eve: {}...{} (unauthorized)", &eve_pubkey[..10], &eve_pubkey[eve_pubkey.len()-10..]);
// Define circles and their members
let mut circles = HashMap::new();
// Circle "alpha" - Alice and Bob are members
circles.insert("alpha".to_string(), vec![
alice_pubkey.clone(),
bob_pubkey.clone(),
]);
// Circle "beta" - Only Charlie is a member
circles.insert("beta".to_string(), vec![
charlie_pubkey.clone(),
]);
// Circle "gamma" - Alice and Charlie are members
circles.insert("gamma".to_string(), vec![
alice_pubkey.clone(),
charlie_pubkey.clone(),
]);
println!("\n🔐 Circle memberships:");
for (circle_id, members) in &circles {
println!(" Circle '{}': {} members", circle_id, members.len());
for member in members {
let name = if member == &alice_pubkey { "Alice" }
else if member == &bob_pubkey { "Bob" }
else if member == &charlie_pubkey { "Charlie" }
else { "Unknown" };
println!(" - {} ({}...{})", name, &member[..10], &member[member.len()-10..]);
}
}
// Build server with circle-based authentication
let server = ServerBuilder::new()
.host("127.0.0.1")
.port(8443)
.redis_url("redis://localhost:6379")
.worker_id("circle_test")
.with_auth()
.circles(circles)
.build()?;
// Start server
println!("\n🚀 Starting server with circle-based authentication...");
let (server_task, server_handle) = server.spawn_circle_server().map_err(|e| {
eprintln!("Failed to start server: {}", e);
e
})?;
// Setup signal handling for clean shutdown
let server_handle_clone = server_handle.clone();
tokio::spawn(async move {
signal::ctrl_c().await.expect("Failed to listen for Ctrl+C");
println!("\n🔌 Shutting down...");
server_handle_clone.stop(true).await;
std::process::exit(0);
});
// Brief pause for server startup
sleep(Duration::from_millis(500)).await;
println!("\n🧪 Testing authentication scenarios:");
println!("===================================");
// Test 1: Alice connecting to circle "alpha" (should succeed)
println!("\n📋 Test 1: Alice → Circle 'alpha' (authorized)");
test_authentication("Alice", "alpha", &alice_privkey).await?;
// Test 2: Bob connecting to circle "alpha" (should succeed)
println!("\n📋 Test 2: Bob → Circle 'alpha' (authorized)");
test_authentication("Bob", "alpha", &bob_privkey).await?;
// Test 3: Charlie connecting to circle "beta" (should succeed)
println!("\n📋 Test 3: Charlie → Circle 'beta' (authorized)");
test_authentication("Charlie", "beta", &charlie_privkey).await?;
// Test 4: Alice connecting to circle "beta" (should fail - not a member)
println!("\n📋 Test 4: Alice → Circle 'beta' (unauthorized - not a member)");
test_authentication_failure("Alice", "beta", &alice_privkey).await;
// Test 5: Eve connecting to circle "alpha" (should fail - not a member)
println!("\n📋 Test 5: Eve → Circle 'alpha' (unauthorized - not a member)");
test_authentication_failure("Eve", "alpha", &eve_privkey).await;
// Test 6: Charlie connecting to circle "gamma" (should succeed)
println!("\n📋 Test 6: Charlie → Circle 'gamma' (authorized)");
test_authentication("Charlie", "gamma", &charlie_privkey).await?;
// Test 7: Bob connecting to non-existent circle (should fail)
println!("\n📋 Test 7: Bob → Circle 'nonexistent' (unauthorized - circle doesn't exist)");
test_authentication_failure("Bob", "nonexistent", &bob_privkey).await;
println!("\n✅ All tests completed!");
println!("\n💡 Summary:");
println!(" - Users can only authenticate to circles they are members of");
println!(" - Circle membership is checked after signature verification");
println!(" - Unauthorized access attempts are properly rejected");
println!(" - Each circle operates independently with its own member list");
// Clean shutdown
server_handle.stop(true).await;
println!("\n🔌 Server stopped. Example complete!");
Ok(())
}
fn generate_keypair() -> (String, String) {
let signing_key = SigningKey::random(&mut OsRng);
let verifying_key = signing_key.verifying_key();
let public_key_bytes = verifying_key.to_encoded_point(false).as_bytes().to_vec();
let private_key_bytes = signing_key.to_bytes().to_vec();
(hex::encode(public_key_bytes), hex::encode(private_key_bytes))
}
async fn test_authentication(
user_name: &str,
circle_name: &str,
private_key: &str,
) -> Result<(), Box<dyn std::error::Error>> {
println!(" 📤 {} connecting to circle '{}'...", user_name, circle_name);
let mut client = CircleWsClientBuilder::new(format!("ws://localhost:8443/{}", circle_name))
.with_keypair(private_key.to_string())
.build();
// Connect
match client.connect().await {
Ok(_) => println!(" ✅ Connection established"),
Err(e) => {
println!(" ❌ Connection failed: {}", e);
return Err(e.into());
}
}
// Authenticate
print!(" 📤 Authenticating... ");
match client.authenticate().await {
Ok(response) => {
println!("✅ Success!");
println!(" Response: {}", response);
}
Err(e) => {
println!("❌ Failed: {}", e);
client.disconnect().await;
return Err(e.into());
}
}
// Test whoami to confirm authentication
print!(" 📤 Testing whoami... ");
match client.whoami().await {
Ok(response) => {
println!("✅ Success!");
println!(" Response: {}", response);
}
Err(e) => {
println!("❌ Failed: {}", e);
client.disconnect().await;
return Err(e.into());
}
}
// Disconnect
client.disconnect().await;
println!(" 🔌 Disconnected");
Ok(())
}
async fn test_authentication_failure(
user_name: &str,
circle_name: &str,
private_key: &str,
) {
println!(" 📤 {} connecting to circle '{}'...", user_name, circle_name);
let mut client = CircleWsClientBuilder::new(format!("ws://localhost:8443/{}", circle_name))
.with_keypair(private_key.to_string())
.build();
// Connect
match client.connect().await {
Ok(_) => println!(" ✅ Connection established"),
Err(e) => {
println!(" ❌ Connection failed: {}", e);
return;
}
}
// Authenticate (expecting failure)
print!(" 📤 Authenticating... ");
match client.authenticate().await {
Ok(response) => {
println!("❌ Unexpected success: {}", response);
println!(" This should have failed!");
}
Err(e) => {
println!("✅ Expected failure!");
println!(" Error: {}", e);
}
}
// Disconnect
client.disconnect().await;
println!(" 🔌 Disconnected");
}

View File

@ -44,7 +44,8 @@ redis = { workspace = true }
uuid = { workspace = true }
tokio = { workspace = true }
chrono = { workspace = true }
rhai_dispatcher = { path = "../../../../rhailib/src/dispatcher" } # Corrected relative path
hero_dispatcher = { path = "../../../core/dispatcher" }
hero_job = { path = "../../../core/job" }
thiserror = { workspace = true }
heromodels = { path = "../../../../db/heromodels" }

View File

@ -1,29 +1,11 @@
# `server`: The Circles WebSocket Server
# `server`: The Hero WebSocket Server
The `server` crate provides a secure, high-performance WebSocket server built with `Actix`. It is the core backend component of the `circles` ecosystem, responsible for handling client connections, processing JSON-RPC requests, and executing Rhai scripts in a secure manner.
An OpenRPC WebSocket Server to interface with the [cores](../../core) of authorized circles.
## Features
- **`Actix` Framework**: Built on `Actix`, a powerful and efficient actor-based web framework.
- **WebSocket Management**: Uses `actix-web-actors` to manage each client connection in its own isolated actor (`CircleWs`), ensuring robust and concurrent session handling.
- **JSON-RPC 2.0 API**: Implements a JSON-RPC 2.0 API for all client-server communication. The API is formally defined in the root [openrpc.json](../../openrpc.json) file.
- **Secure Authentication**: Features a built-in `secp256k1` signature-based authentication system to protect sensitive endpoints.
- **Stateful Session Management**: The `CircleWs` actor maintains the authentication state for each client, granting or denying access to protected methods like `play`.
- **Webhook Integration**: Supports HTTP webhook endpoints for external services (Stripe, iDenfy) with signature verification and script execution capabilities.
## Core Components
### `spawn_circle_server`
This is the main entry point function for the server. It configures and starts the `Actix` HTTP server and sets up the WebSocket route with path-based routing (`/{circle_pk}`).
### `CircleWs` Actor
This `Actix` actor is the heart of the server's session management. A new instance of `CircleWs` is created for each client that connects. Its responsibilities include:
- Handling the WebSocket connection lifecycle.
- Parsing incoming JSON-RPC messages.
- Managing the authentication state of the session (i.e., whether the client is authenticated or not).
- Dispatching requests to the appropriate handlers (`fetch_nonce`, `authenticate`, and `play`).
- [OpenRPC Specification](openrpc.json) defines the API.
- There are RPC Operations specified to authorize a websocket connection.
- Authorized clients can execute Rhai scripts on the server.
- The server uses the [supervisor] to dispatch [jobs] to the [workers].
## Authentication
@ -34,43 +16,6 @@ The server provides a robust authentication mechanism to ensure that only author
For a more detailed breakdown of the authentication architecture, please see the [ARCHITECTURE.md](docs/ARCHITECTURE.md) file.
## Webhook Integration
The server also provides HTTP webhook endpoints for external services alongside the WebSocket functionality:
- **Stripe Webhooks**: `POST /webhooks/stripe/{circle_pk}` - Handles Stripe payment events
- **iDenfy Webhooks**: `POST /webhooks/idenfy/{circle_pk}` - Handles iDenfy KYC verification events
### Webhook Features
- **Signature Verification**: All webhooks use HMAC signature verification for security
- **Script Execution**: Webhook events trigger Rhai script execution via the same Redis-based system
- **Type Safety**: Webhook payload types are defined in the `heromodels` library for reusability
- **Modular Architecture**: Separate handlers for each webhook provider with common utilities
For detailed webhook architecture and configuration, see [WEBHOOK_ARCHITECTURE.md](WEBHOOK_ARCHITECTURE.md).
## How to Run
### As a Library
The `server` is designed to be used as a library by the `launcher`, which is responsible for spawning a single multi-circle server instance that can handle multiple circles via path-based routing.
To run the server via the launcher with circle public keys:
```bash
cargo run --package launcher -- -k <circle_public_key1> -k <circle_public_key2> [options]
```
The launcher will start a single `server` instance that can handle multiple circles through path-based WebSocket connections at `/{circle_pk}`.
### Standalone Binary
A standalone binary is also available for development and testing purposes. See [`cmd/README.md`](cmd/README.md) for detailed usage instructions.
```bash
# Basic standalone server
cargo run
# With authentication and TLS
cargo run -- --auth --tls --cert cert.pem --key key.pem
```
cargo run

View File

@ -0,0 +1,73 @@
use std::collections::HashMap;
use hero_websocket_server::ServerBuilder;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
// Define circles and their members
let mut circles = HashMap::new();
// Circle "alpha" with two members
circles.insert("alpha".to_string(), vec![
"04a1b2c3d4e5f6789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789ab".to_string(),
"04b2c3d4e5f6789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcd".to_string(),
]);
// Circle "beta" with one member
circles.insert("beta".to_string(), vec![
"04c3d4e5f6789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef".to_string(),
]);
// Circle "gamma" with three members
circles.insert("gamma".to_string(), vec![
"04d4e5f6789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef01".to_string(),
"04e5f6789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123".to_string(),
"04f6789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef012345".to_string(),
]);
// Build server with circle-based authentication
let server = ServerBuilder::new()
.host("127.0.0.1")
.port(8080)
.redis_url("redis://localhost:6379")
.with_auth()
.circles(circles)
.build()?;
println!("Starting WebSocket server with circle-based authentication...");
println!("Available circles:");
for (circle_id, members) in &server.circles {
println!(" Circle '{}' has {} members:", circle_id, members.len());
for (i, member) in members.iter().enumerate() {
println!(" Member {}: {}...{}", i + 1, &member[..10], &member[member.len()-10..]);
}
}
println!("\nTo connect to a specific circle, use URLs like:");
println!(" ws://127.0.0.1:8080/ws/alpha (for circle 'alpha')");
println!(" ws://127.0.0.1:8080/ws/beta (for circle 'beta')");
println!(" ws://127.0.0.1:8080/ws/gamma (for circle 'gamma')");
println!("\nAuthentication flow:");
println!("1. Connect to WebSocket URL for specific circle");
println!("2. Call 'fetch_nonce' method to get a nonce");
println!("3. Sign the nonce with your private key");
println!("4. Call 'authenticate' with your public key and signature");
println!("5. Server will verify signature AND check circle membership");
println!("6. Only members of the target circle will be authenticated");
// Start the server
let (task_handle, server_handle) = server.spawn_circle_server()?;
println!("\nServer started! Press Ctrl+C to stop.");
// Wait for the server to complete
match task_handle.await {
Ok(Ok(())) => println!("Server stopped successfully"),
Ok(Err(e)) => eprintln!("Server error: {}", e),
Err(e) => eprintln!("Task join error: {}", e),
}
Ok(())
}

View File

@ -13,6 +13,7 @@ pub struct ServerBuilder {
enable_auth: bool,
enable_webhooks: bool,
circle_worker_id: String,
circles: HashMap<String, Vec<String>>,
}
impl ServerBuilder {
@ -28,6 +29,7 @@ impl ServerBuilder {
enable_auth: false,
enable_webhooks: false,
circle_worker_id: "default".to_string(),
circles: HashMap::new(),
}
}
@ -72,6 +74,11 @@ impl ServerBuilder {
self.enable_webhooks = true;
self
}
pub fn circles(mut self, circles: HashMap<String, Vec<String>>) -> Self {
self.circles = circles;
self
}
pub fn build(self) -> Result<Server, TlsConfigError> {
Ok(Server {
@ -87,8 +94,10 @@ impl ServerBuilder {
circle_worker_id: self.circle_worker_id,
circle_name: "default".to_string(),
circle_public_key: "default".to_string(),
circles: self.circles,
nonce_store: HashMap::new(),
authenticated_pubkey: None,
dispatcher: None,
})
}
}

View File

@ -0,0 +1,220 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs;
use std::path::Path;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerConfig {
/// Server host address
#[serde(default = "default_host")]
pub host: String,
/// Server port
#[serde(default = "default_port")]
pub port: u16,
/// Redis connection URL
#[serde(default = "default_redis_url")]
pub redis_url: String,
/// Enable authentication
#[serde(default)]
pub auth: bool,
/// Enable TLS/WSS
#[serde(default)]
pub tls: bool,
/// Path to TLS certificate file
pub cert: Option<String>,
/// Path to TLS private key file
pub key: Option<String>,
/// Separate port for TLS connections
pub tls_port: Option<u16>,
/// Enable webhook handling
#[serde(default)]
pub webhooks: bool,
/// Circles configuration - maps circle names to lists of member public keys
#[serde(default)]
pub circles: HashMap<String, Vec<String>>,
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
host: default_host(),
port: default_port(),
redis_url: default_redis_url(),
auth: false,
tls: false,
cert: None,
key: None,
tls_port: None,
webhooks: false,
circles: HashMap::new(),
}
}
}
impl ServerConfig {
/// Load configuration from a JSON file
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self, ConfigError> {
let content = fs::read_to_string(path.as_ref())
.map_err(|e| ConfigError::FileRead(path.as_ref().to_path_buf(), e))?;
let config: ServerConfig = serde_json::from_str(&content)
.map_err(|e| ConfigError::JsonParse(e))?;
config.validate()?;
Ok(config)
}
/// Save configuration to a JSON file
pub fn to_file<P: AsRef<Path>>(&self, path: P) -> Result<(), ConfigError> {
let content = serde_json::to_string_pretty(self)
.map_err(|e| ConfigError::JsonSerialize(e))?;
fs::write(path.as_ref(), content)
.map_err(|e| ConfigError::FileWrite(path.as_ref().to_path_buf(), e))?;
Ok(())
}
/// Validate the configuration
pub fn validate(&self) -> Result<(), ConfigError> {
// Validate TLS configuration
if self.tls && (self.cert.is_none() || self.key.is_none()) {
return Err(ConfigError::InvalidTlsConfig(
"TLS is enabled but certificate or key path is missing".to_string()
));
}
// Validate that circles are not empty if auth is enabled
if self.auth && self.circles.is_empty() {
return Err(ConfigError::InvalidAuthConfig(
"Authentication is enabled but no circles are configured".to_string()
));
}
Ok(())
}
/// Create a sample configuration file
pub fn create_sample() -> Self {
let mut circles = HashMap::new();
circles.insert(
"example_circle".to_string(),
vec![
"0x1234567890abcdef1234567890abcdef12345678".to_string(),
"0xabcdef1234567890abcdef1234567890abcdef12".to_string(),
]
);
Self {
host: "127.0.0.1".to_string(),
port: 8443,
redis_url: "redis://127.0.0.1/".to_string(),
auth: true,
tls: false,
cert: Some("cert.pem".to_string()),
key: Some("key.pem".to_string()),
tls_port: Some(8444),
webhooks: false,
circles,
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum ConfigError {
#[error("Failed to read config file {0}: {1}")]
FileRead(std::path::PathBuf, std::io::Error),
#[error("Failed to write config file {0}: {1}")]
FileWrite(std::path::PathBuf, std::io::Error),
#[error("Failed to parse JSON config: {0}")]
JsonParse(serde_json::Error),
#[error("Failed to serialize JSON config: {0}")]
JsonSerialize(serde_json::Error),
#[error("Invalid TLS configuration: {0}")]
InvalidTlsConfig(String),
#[error("Invalid authentication configuration: {0}")]
InvalidAuthConfig(String),
}
// Default value functions
fn default_host() -> String {
"127.0.0.1".to_string()
}
fn default_port() -> u16 {
8443
}
fn default_redis_url() -> String {
"redis://127.0.0.1/".to_string()
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::NamedTempFile;
#[test]
fn test_config_serialization() {
let config = ServerConfig::create_sample();
let json = serde_json::to_string_pretty(&config).unwrap();
let deserialized: ServerConfig = serde_json::from_str(&json).unwrap();
assert_eq!(config.host, deserialized.host);
assert_eq!(config.port, deserialized.port);
assert_eq!(config.circles.len(), deserialized.circles.len());
}
#[test]
fn test_config_file_operations() {
let config = ServerConfig::create_sample();
let temp_file = NamedTempFile::new().unwrap();
// Test writing
config.to_file(temp_file.path()).unwrap();
// Test reading
let loaded_config = ServerConfig::from_file(temp_file.path()).unwrap();
assert_eq!(config.host, loaded_config.host);
assert_eq!(config.circles.len(), loaded_config.circles.len());
}
#[test]
fn test_config_validation() {
let mut config = ServerConfig::default();
// Valid config should pass
assert!(config.validate().is_ok());
// TLS enabled without cert/key should fail
config.tls = true;
assert!(config.validate().is_err());
// Fix TLS config
config.cert = Some("cert.pem".to_string());
config.key = Some("key.pem".to_string());
assert!(config.validate().is_ok());
// Auth enabled without circles should fail
config.auth = true;
assert!(config.validate().is_err());
// Add circles
config.circles.insert("test".to_string(), vec!["pubkey".to_string()]);
assert!(config.validate().is_ok());
}
}

View File

@ -31,6 +31,16 @@ impl actix::StreamHandler<Result<ws::Message, ws::ProtocolError>> for Server {
self.handle_whoami(req.params, client_rpc_id, ctx)
}
"play" => self.handle_play(req.params, client_rpc_id, ctx),
"create_job" => self.handle_create_job(req.params, client_rpc_id, ctx),
"start_job" => self.handle_start_job(req.params, client_rpc_id, ctx),
"run_job" => self.handle_run_job(req.params, client_rpc_id, ctx),
"get_job_status" => self.handle_get_job_status(req.params, client_rpc_id, ctx),
"get_job_output" => self.handle_get_job_output(req.params, client_rpc_id, ctx),
"get_job_logs" => self.handle_get_job_logs(req.params, client_rpc_id, ctx),
"list_jobs" => self.handle_list_jobs(req.params, client_rpc_id, ctx),
"stop_job" => self.handle_stop_job(req.params, client_rpc_id, ctx),
"delete_job" => self.handle_delete_job(req.params, client_rpc_id, ctx),
"clear_all_jobs" => self.handle_clear_all_jobs(req.params, client_rpc_id, ctx),
_ => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),

View File

@ -0,0 +1,999 @@
use crate::Server;
use actix::prelude::*;
use actix_web_actors::ws;
use hero_dispatcher::{Dispatcher, ScriptType};
use serde_json::{json, Value};
use std::time::Duration;
const TASK_TIMEOUT_DURATION: Duration = Duration::from_secs(30);
#[derive(serde::Serialize)]
struct SuccessResult {
success: bool,
}
#[derive(serde::Serialize)]
struct JobResult {
job_id: String,
}
#[derive(serde::Serialize)]
struct JsonRpcResponse {
jsonrpc: String,
result: Option<Value>,
error: Option<JsonRpcError>,
id: Value,
}
#[derive(serde::Serialize)]
struct JsonRpcError {
code: i32,
message: String,
data: Option<Value>,
}
impl Server {
pub fn handle_create_job(
&mut self,
params: Value,
client_rpc_id: Value,
ctx: &mut ws::WebsocketContext<Self>,
) {
// For now, create_job is the same as run_job
self.handle_run_job(params, client_rpc_id, ctx);
}
pub fn handle_start_job(
&mut self,
params: Value,
client_rpc_id: Value,
ctx: &mut ws::WebsocketContext<Self>,
) {
if self.enable_auth && !self.is_connection_authenticated() {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: "Authentication required".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
let job_id = match params.get("job_id").and_then(|v| v.as_str()) {
Some(id) => id.to_string(),
None => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32602,
message: "Missing required parameter: job_id".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
};
let dispatcher = match self.dispatcher.clone() {
Some(d) => d,
None => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32603,
message: "Internal error: dispatcher not available".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
};
let client_rpc_id_clone = client_rpc_id.clone();
let fut = async move {
dispatcher.start_job(&job_id).await
};
ctx.spawn(
fut.into_actor(self)
.map(move |res, _act, ctx_inner| match res {
Ok(_) => {
let result = SuccessResult { success: true };
let resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: Some(serde_json::to_value(result).unwrap()),
error: None,
id: client_rpc_id_clone.clone(),
};
ctx_inner.text(serde_json::to_string(&resp).unwrap());
}
Err(e) => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: format!("Failed to start job: {}", e),
data: None,
}),
id: client_rpc_id_clone,
};
ctx_inner.text(serde_json::to_string(&err_resp).unwrap());
}
})
.timeout(TASK_TIMEOUT_DURATION)
.map(move |res, _act, ctx_inner| {
if res.is_err() {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: "Request timed out".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx_inner.text(serde_json::to_string(&err_resp).unwrap());
}
}),
);
}
pub fn handle_get_job_status(
&mut self,
params: Value,
client_rpc_id: Value,
ctx: &mut ws::WebsocketContext<Self>,
) {
if self.enable_auth && !self.is_connection_authenticated() {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: "Authentication required".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
let job_id = match params.get("job_id").and_then(|v| v.as_str()) {
Some(id) => id.to_string(),
None => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32602,
message: "Missing required parameter: job_id".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
};
let dispatcher = match self.dispatcher.clone() {
Some(d) => d,
None => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32603,
message: "Internal error: dispatcher not available".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
};
let client_rpc_id_clone = client_rpc_id.clone();
let fut = async move {
dispatcher.get_job_status(&job_id).await
};
ctx.spawn(
fut.into_actor(self)
.map(move |res, _act, ctx_inner| match res {
Ok(status) => {
let resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: Some(json!(status)),
error: None,
id: client_rpc_id_clone.clone(),
};
ctx_inner.text(serde_json::to_string(&resp).unwrap());
}
Err(e) => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: format!("Failed to get job status: {}", e),
data: None,
}),
id: client_rpc_id_clone,
};
ctx_inner.text(serde_json::to_string(&err_resp).unwrap());
}
})
.timeout(TASK_TIMEOUT_DURATION)
.map(move |res, _act, ctx_inner| {
if res.is_err() {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: "Request timed out".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx_inner.text(serde_json::to_string(&err_resp).unwrap());
}
}),
);
}
pub fn handle_list_jobs(
&mut self,
_params: Value,
client_rpc_id: Value,
ctx: &mut ws::WebsocketContext<Self>,
) {
if self.enable_auth && !self.is_connection_authenticated() {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: "Authentication required".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
let dispatcher = match self.dispatcher.clone() {
Some(d) => d,
None => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32603,
message: "Internal error: dispatcher not available".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
};
let client_rpc_id_clone = client_rpc_id.clone();
let fut = async move {
dispatcher.list_jobs().await
};
ctx.spawn(
fut.into_actor(self)
.map(move |res, _act, ctx_inner| match res {
Ok(jobs) => {
let resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: Some(json!(jobs)),
error: None,
id: client_rpc_id_clone.clone(),
};
ctx_inner.text(serde_json::to_string(&resp).unwrap());
}
Err(e) => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: format!("Failed to list jobs: {}", e),
data: None,
}),
id: client_rpc_id_clone,
};
ctx_inner.text(serde_json::to_string(&err_resp).unwrap());
}
})
.timeout(TASK_TIMEOUT_DURATION)
.map(move |res, _act, ctx_inner| {
if res.is_err() {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: "Request timed out".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx_inner.text(serde_json::to_string(&err_resp).unwrap());
}
}),
);
}
pub fn handle_run_job(
&mut self,
params: Value,
client_rpc_id: Value,
ctx: &mut ws::WebsocketContext<Self>,
) {
if self.enable_auth && !self.is_connection_authenticated() {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: "Authentication required".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
let circle_pk = match params.get("circle_pk").and_then(|v| v.as_str()) {
Some(pk) => pk.to_string(),
None => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32602,
message: "Missing required parameter: circle_pk".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
};
let script_content = match params.get("script_content").and_then(|v| v.as_str()) {
Some(script) => script.to_string(),
None => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32602,
message: "Missing required parameter: script_content".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
};
let dispatcher = match self.dispatcher.clone() {
Some(d) => d,
None => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32603,
message: "Internal error: dispatcher not available".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
};
let client_rpc_id_clone = client_rpc_id.clone();
let fut = async move {
dispatcher
.new_job()
.context_id(&circle_pk)
.script_type(ScriptType::RhaiSAL)
.script(&script_content)
.timeout(TASK_TIMEOUT_DURATION)
.await_response()
.await
};
ctx.spawn(
fut.into_actor(self)
.map(move |res, _act, ctx_inner| match res {
Ok(job_id) => {
let result = JobResult { job_id };
let resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: Some(serde_json::to_value(result).unwrap()),
error: None,
id: client_rpc_id_clone.clone(),
};
ctx_inner.text(serde_json::to_string(&resp).unwrap());
}
Err(e) => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: format!("Failed to run job: {}", e),
data: None,
}),
id: client_rpc_id_clone,
};
ctx_inner.text(serde_json::to_string(&err_resp).unwrap());
}
})
.timeout(TASK_TIMEOUT_DURATION)
.map(move |res, _act, ctx_inner| {
if res.is_err() {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: "Request timed out".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx_inner.text(serde_json::to_string(&err_resp).unwrap());
}
}),
);
}
pub fn handle_get_job_output(
&mut self,
params: Value,
client_rpc_id: Value,
ctx: &mut ws::WebsocketContext<Self>,
) {
if self.enable_auth && !self.is_connection_authenticated() {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: "Authentication required".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
let job_id = match params.get("job_id").and_then(|v| v.as_str()) {
Some(id) => id.to_string(),
None => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32602,
message: "Missing required parameter: job_id".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
};
let dispatcher = match self.dispatcher.clone() {
Some(d) => d,
None => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32603,
message: "Internal error: dispatcher not available".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
};
let client_rpc_id_clone = client_rpc_id.clone();
let fut = async move {
dispatcher.get_job_output(&job_id).await
};
ctx.spawn(
fut.into_actor(self)
.map(move |res, _act, ctx_inner| match res {
Ok(output) => {
let resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: Some(json!(output)),
error: None,
id: client_rpc_id_clone.clone(),
};
ctx_inner.text(serde_json::to_string(&resp).unwrap());
}
Err(e) => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: format!("Failed to get job output: {}", e),
data: None,
}),
id: client_rpc_id_clone,
};
ctx_inner.text(serde_json::to_string(&err_resp).unwrap());
}
})
.timeout(TASK_TIMEOUT_DURATION)
.map(move |res, _act, ctx_inner| {
if res.is_err() {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: "Request timed out".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx_inner.text(serde_json::to_string(&err_resp).unwrap());
}
}),
);
}
pub fn handle_get_job_logs(
&mut self,
params: Value,
client_rpc_id: Value,
ctx: &mut ws::WebsocketContext<Self>,
) {
if self.enable_auth && !self.is_connection_authenticated() {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: "Authentication required".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
let job_id = match params.get("job_id").and_then(|v| v.as_str()) {
Some(id) => id.to_string(),
None => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32602,
message: "Missing required parameter: job_id".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
};
let dispatcher = match self.dispatcher.clone() {
Some(d) => d,
None => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32603,
message: "Internal error: dispatcher not available".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
};
let client_rpc_id_clone = client_rpc_id.clone();
let fut = async move {
dispatcher.get_job_logs(&job_id).await
};
ctx.spawn(
fut.into_actor(self)
.map(move |res, _act, ctx_inner| match res {
Ok(logs) => {
let result = json!({ "logs": logs });
let resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: Some(result),
error: None,
id: client_rpc_id_clone.clone(),
};
ctx_inner.text(serde_json::to_string(&resp).unwrap());
}
Err(e) => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: format!("Failed to get job logs: {}", e),
data: None,
}),
id: client_rpc_id_clone,
};
ctx_inner.text(serde_json::to_string(&err_resp).unwrap());
}
})
.timeout(TASK_TIMEOUT_DURATION)
.map(move |res, _act, ctx_inner| {
if res.is_err() {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: "Request timed out".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx_inner.text(serde_json::to_string(&err_resp).unwrap());
}
}),
);
}
pub fn handle_stop_job(
&mut self,
params: Value,
client_rpc_id: Value,
ctx: &mut ws::WebsocketContext<Self>,
) {
if self.enable_auth && !self.is_connection_authenticated() {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: "Authentication required".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
let job_id = match params.get("job_id").and_then(|v| v.as_str()) {
Some(id) => id.to_string(),
None => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32602,
message: "Missing required parameter: job_id".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
};
let dispatcher = match self.dispatcher.clone() {
Some(d) => d,
None => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32603,
message: "Internal error: dispatcher not available".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
};
let client_rpc_id_clone = client_rpc_id.clone();
let fut = async move {
dispatcher.stop_job(&job_id).await
};
ctx.spawn(
fut.into_actor(self)
.map(move |res, _act, ctx_inner| match res {
Ok(_) => {
let resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: Some(json!(null)),
error: None,
id: client_rpc_id_clone.clone(),
};
ctx_inner.text(serde_json::to_string(&resp).unwrap());
}
Err(e) => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: format!("Failed to stop job: {}", e),
data: None,
}),
id: client_rpc_id_clone,
};
ctx_inner.text(serde_json::to_string(&err_resp).unwrap());
}
})
.timeout(TASK_TIMEOUT_DURATION)
.map(move |res, _act, ctx_inner| {
if res.is_err() {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: "Request timed out".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx_inner.text(serde_json::to_string(&err_resp).unwrap());
}
}),
);
}
pub fn handle_delete_job(
&mut self,
params: Value,
client_rpc_id: Value,
ctx: &mut ws::WebsocketContext<Self>,
) {
if self.enable_auth && !self.is_connection_authenticated() {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: "Authentication required".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
let job_id = match params.get("job_id").and_then(|v| v.as_str()) {
Some(id) => id.to_string(),
None => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32602,
message: "Missing required parameter: job_id".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
};
let dispatcher = match self.dispatcher.clone() {
Some(d) => d,
None => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32603,
message: "Internal error: dispatcher not available".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
};
let client_rpc_id_clone = client_rpc_id.clone();
let fut = async move {
dispatcher.delete_job(&job_id).await
};
ctx.spawn(
fut.into_actor(self)
.map(move |res, _act, ctx_inner| match res {
Ok(_) => {
let resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: Some(json!(null)),
error: None,
id: client_rpc_id_clone.clone(),
};
ctx_inner.text(serde_json::to_string(&resp).unwrap());
}
Err(e) => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: format!("Failed to delete job: {}", e),
data: None,
}),
id: client_rpc_id_clone,
};
ctx_inner.text(serde_json::to_string(&err_resp).unwrap());
}
})
.timeout(TASK_TIMEOUT_DURATION)
.map(move |res, _act, ctx_inner| {
if res.is_err() {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: "Request timed out".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx_inner.text(serde_json::to_string(&err_resp).unwrap());
}
}),
);
}
pub fn handle_clear_all_jobs(
&mut self,
_params: Value,
client_rpc_id: Value,
ctx: &mut ws::WebsocketContext<Self>,
) {
if self.enable_auth && !self.is_connection_authenticated() {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: "Authentication required".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
let dispatcher = match self.dispatcher.clone() {
Some(d) => d,
None => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32603,
message: "Internal error: dispatcher not available".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
};
let client_rpc_id_clone = client_rpc_id.clone();
let fut = async move {
dispatcher.clear_all_jobs().await
};
ctx.spawn(
fut.into_actor(self)
.map(move |res, _act, ctx_inner| match res {
Ok(_) => {
let resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: Some(json!(null)),
error: None,
id: client_rpc_id_clone.clone(),
};
ctx_inner.text(serde_json::to_string(&resp).unwrap());
}
Err(e) => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: format!("Failed to clear jobs: {}", e),
data: None,
}),
id: client_rpc_id_clone,
};
ctx_inner.text(serde_json::to_string(&err_resp).unwrap());
}
})
.timeout(TASK_TIMEOUT_DURATION)
.map(move |res, _act, ctx_inner| {
if res.is_err() {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: "Request timed out".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx_inner.text(serde_json::to_string(&err_resp).unwrap());
}
}),
);
}
}

View File

@ -3,7 +3,8 @@ use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws;
use log::{info, error}; // Added error for better logging
use once_cell::sync::Lazy;
use hero_dispatcher::{DispatcherBuilder, DispatcherError};
use hero_dispatcher::{Dispatcher, DispatcherBuilder, DispatcherError};
use hero_job::{Job, JobStatus};
use rustls::pki_types::PrivateKeyDer;
use rustls::ServerConfig as RustlsServerConfig;
use rustls_pemfile::{certs, pkcs8_private_keys};
@ -29,10 +30,13 @@ static AUTHENTICATED_CONNECTIONS: Lazy<Mutex<HashMap<Addr<Server>, String>>> =
mod auth;
mod builder;
mod config;
mod handler;
mod job_handlers;
use crate::auth::{generate_nonce, NonceResponse};
pub use crate::builder::ServerBuilder;
pub use crate::config::{ServerConfig, ConfigError};
// Re-export server handle type for external use
pub type ServerHandle = actix_web::dev::ServerHandle;
@ -100,6 +104,64 @@ struct FetchNonceParams {
pubkey: String,
}
// Job management parameter structures
#[derive(Debug, Serialize, Deserialize)]
struct CreateJobParams {
job: Job,
}
#[derive(Debug, Serialize, Deserialize)]
struct RunJobParams {
job: Job,
}
#[derive(Debug, Serialize, Deserialize)]
struct JobIdParams {
job_id: String,
}
// Job management result structures
#[derive(Debug, Serialize, Deserialize)]
struct CreateJobResult {
job_id: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct RunJobResult {
job_id: String,
output: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct JobStatusResult {
status: JobStatus,
}
#[derive(Debug, Serialize, Deserialize)]
struct JobOutputResult {
output: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct JobLogsResult {
logs: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct ListJobsResult {
jobs: Vec<Job>,
}
#[derive(Debug, Serialize, Deserialize)]
struct SuccessResult {
success: bool,
}
#[derive(Debug, Serialize, Deserialize)]
struct ClearJobsResult {
deleted_count: usize,
}
impl Actor for Server {
type Context = ws::WebsocketContext<Self>;
@ -142,11 +204,14 @@ pub struct Server {
pub tls_port: Option<u16>,
pub enable_auth: bool,
pub enable_webhooks: bool,
pub circle_worker_id: String,
pub circle_name: String,
pub circle_public_key: String,
/// Map of circle IDs to vectors of public keys that are members of that circle
pub circles: HashMap<String, Vec<String>>,
nonce_store: HashMap<String, NonceResponse>,
authenticated_pubkey: Option<String>,
pub dispatcher: Option<Dispatcher>,
}
impl Server {
@ -250,33 +315,34 @@ impl Server {
client_rpc_id: Value,
ctx: &mut ws::WebsocketContext<Self>,
) {
match serde_json::from_value::<FetchNonceParams>(params) {
Ok(params) => {
let nonce_response = generate_nonce();
self.nonce_store
.insert(params.pubkey, nonce_response.clone());
let resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: Some(serde_json::to_value(nonce_response).unwrap()),
error: None,
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&resp).unwrap());
}
Err(e) => {
// Extract pubkey string directly from params
let pubkey = match params.as_str() {
Some(pk) => pk.to_string(),
None => {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32602,
message: format!("Invalid parameters for fetch_nonce: {}", e),
message: "Invalid parameters for fetch_nonce: expected string pubkey".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
return;
}
}
};
let nonce_response = generate_nonce();
self.nonce_store.insert(pubkey, nonce_response.clone());
let resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: Some(serde_json::to_value(&nonce_response.nonce).unwrap()),
error: None,
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&resp).unwrap());
}
fn handle_authenticate(
@ -327,18 +393,41 @@ impl Server {
};
if is_valid {
self.authenticated_pubkey = Some(auth_params.pubkey.clone());
AUTHENTICATED_CONNECTIONS
.lock()
.unwrap()
.insert(ctx.address(), auth_params.pubkey);
let resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: Some(serde_json::json!({ "authenticated": true })),
error: None,
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&resp).unwrap());
// Check if the authenticated public key belongs to the circle
let is_circle_member = self.circles
.get(&self.circle_name)
.map(|members| members.contains(&auth_params.pubkey))
.unwrap_or(false);
if is_circle_member {
self.authenticated_pubkey = Some(auth_params.pubkey.clone());
AUTHENTICATED_CONNECTIONS
.lock()
.unwrap()
.insert(ctx.address(), auth_params.pubkey);
let resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: Some(serde_json::json!({ "authenticated": true })),
error: None,
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&resp).unwrap());
} else {
log::warn!("Auth failed for {}: Public key {} not a member of circle {}",
self.circle_name, auth_params.pubkey, self.circle_name);
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32003,
message: "Public key not authorized for this circle".to_string(),
data: None,
}),
id: client_rpc_id,
};
ctx.text(serde_json::to_string(&err_resp).unwrap());
ctx.stop();
}
} else {
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
@ -459,7 +548,7 @@ impl Server {
let redis_url_clone = self.redis_url.clone();
let _rpc_id_clone = client_rpc_id.clone();
let public_key = self.authenticated_pubkey.clone();
let worker_id_clone = self.circle_worker_id.clone();
let fut = async move {
let caller_id = public_key.unwrap_or_else(|| "anonymous".to_string());
@ -471,7 +560,7 @@ impl Server {
hero_dispatcher
.new_job()
.context_id(&circle_pk_clone)
.worker_id(&worker_id_clone)
.script_type(hero_dispatcher::ScriptType::RhaiSAL)
.script(&script_content)
.timeout(TASK_TIMEOUT_DURATION)
.await_response()
@ -484,35 +573,16 @@ impl Server {
ctx.spawn(
fut.into_actor(self)
.map(move |res, _act, ctx_inner| match res {
Ok(task_details) => {
if task_details.status == "completed" {
let output = task_details
.output
.unwrap_or_else(|| "No output".to_string());
let result_value = PlayResult { output };
let resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: Some(serde_json::to_value(result_value).unwrap()),
error: None,
id: client_rpc_id,
};
ctx_inner.text(serde_json::to_string(&resp).unwrap());
} else {
let error_message = task_details.error.unwrap_or_else(|| {
"Rhai script execution failed".to_string()
});
let err_resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: None,
error: Some(JsonRpcError {
code: -32000,
message: error_message,
data: None,
}),
id: client_rpc_id,
};
ctx_inner.text(serde_json::to_string(&err_resp).unwrap());
}
Ok(output) => {
// The dispatcher returns the actual string output from job execution
let result_value = PlayResult { output };
let resp = JsonRpcResponse {
jsonrpc: "2.0".to_string(),
result: Some(serde_json::to_value(result_value).unwrap()),
error: None,
id: client_rpc_id,
};
ctx_inner.text(serde_json::to_string(&resp).unwrap());
}
Err(e) => {
let (code, message) = match e {