implement stripe and idenfy webhooks support

This commit is contained in:
Timur Gordon
2025-07-08 22:49:47 +02:00
parent 7dfd54a20a
commit 93977bad7a
129 changed files with 9655 additions and 3640 deletions

View File

@@ -0,0 +1,153 @@
use sal_service_manager::create_service_manager;
use crate::Circle;
use crate::Launcher;
use std::sync::{Arc, Mutex};
const DEFAULT_REDIS_URL: &str = "redis://127.0.0.1:6379";
const DEFAULT_PORT: u16 = 8443;
pub struct LauncherBuilder {
circles: Vec<CircleBuilder>, // circle pk's and their init scripts
worker_binary: String, // path to worker binary
server_binary: String, // path to server binary
redis_url: String, // redis url
port: u16, // port to bind to
}
/// Creates a new launcher builder
pub fn new_launcher() -> LauncherBuilder {
LauncherBuilder::new()
}
impl LauncherBuilder {
/// Creates a new launcher builder
pub fn new() -> Self {
let server_binary = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.parent() // Go up one level from lib.rs
.unwrap()
.parent() // Go up one level from src
.unwrap()
.join("target/release/circles_server")
.to_string_lossy()
.to_string();
let worker_binary = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.parent() // Go up one level from lib.rs
.unwrap()
.parent() // Go up one level from src
.unwrap()
.join("target/release/worker")
.to_string_lossy()
.to_string();
Self {
circles: vec![],
worker_binary: worker_binary,
server_binary: server_binary,
redis_url: DEFAULT_REDIS_URL.to_string(),
port: DEFAULT_PORT,
}
}
/// Adds a circle by public key
pub fn add_circle(mut self, public_key: impl ToString) -> Self {
self.circles.push(CircleBuilder::new().public_key(public_key.to_string()));
self
}
/// Sets initialization script for the last added circle
pub fn add_init_script(mut self, script_path: impl ToString) -> Self {
if let Some(last_circle) = self.circles.last_mut() {
last_circle.init_script = Some(script_path.to_string());
}
self
}
/// Sets the worker binary path
pub fn worker_binary(mut self, path: impl ToString) -> Self {
// TODO: Validate path
self.worker_binary = path.to_string();
self
}
/// Sets the server binary path
pub fn server_binary(mut self, path: impl ToString) -> Self {
// TODO: Validate path
self.server_binary = path.to_string();
self
}
/// Sets the Redis URL
pub fn redis_url(mut self, url: impl ToString) -> Self {
// TODO: Validate URL
self.redis_url = url.to_string();
self
}
/// Sets the port
pub fn port(mut self, port: u16) -> Self {
// TODO: Validate port
self.port = port;
self
}
pub async fn build(self) -> Result<Launcher, Box<dyn std::error::Error>> {
if self.circles.is_empty() {
return Err("No circles configured. Use add_circle() to add circles.".into());
}
let service_manager = tokio::task::spawn_blocking(|| create_service_manager(None))
.await??;
Ok(Launcher {
service_manager: Arc::new(Mutex::new(service_manager)),
circles: self.circles.iter().map(|circle| circle.build()).collect(),
worker_binary: self.worker_binary,
server_binary: self.server_binary,
redis_url: self.redis_url,
port: self.port,
})
}
}
/// Check if a port is in use by any process.
/// Note: This only indicates that *something* is using the port,
/// not necessarily our WebSocket server. Should only be used as a fallback
/// when service manager status is unavailable.
async fn is_port_in_use(port: u16) -> bool {
use std::net::{TcpListener, SocketAddr};
let addr = SocketAddr::from(([127, 0, 0, 1], port));
TcpListener::bind(addr).is_err()
}
pub struct CircleBuilder {
public_key: String,
init_script: Option<String>,
}
impl CircleBuilder {
pub fn new() -> Self {
Self {
public_key: String::new(),
init_script: None,
}
}
pub fn public_key(mut self, public_key: String) -> Self {
self.public_key = public_key;
self
}
pub fn init_script(mut self, init_script: String) -> Self {
self.init_script = Some(init_script);
self
}
pub fn build(&self) -> Circle {
Circle {
public_key: self.public_key.clone(),
init_script: self.init_script.clone(),
}
}
}

View File

@@ -0,0 +1,98 @@
use clap::Parser;
use circles_launcher::{Circle, Launcher};
use sal_service_manager::create_service_manager;
use std::error::Error as StdError;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
const DEFAULT_REDIS_URL: &str = "redis://127.0.0.1/";
// Newtype wrapper to satisfy the orphan rule
#[derive(Clone, Debug)]
pub struct CircleArg(Circle);
impl FromStr for CircleArg {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let parts: Vec<&str> = s.split(':').collect();
let circle = match parts.len() {
1 => {
// Validate public key
secp256k1::PublicKey::from_str(parts[0])
.map_err(|e| format!("Invalid public key '{}': {}", parts[0], e))?;
Circle {
public_key: parts[0].to_string(),
init_script: None,
}
}
2 => {
// Validate public key
secp256k1::PublicKey::from_str(parts[0])
.map_err(|e| format!("Invalid public key '{}': {}", parts[0], e))?;
Circle {
public_key: parts[0].to_string(),
init_script: Some(parts[1].to_string()),
}
}
_ => return Err(format!("Invalid circle format '{}'. Expected 'public_key' or 'public_key:init_script.rhai'", s)),
};
Ok(CircleArg(circle))
}
}
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
pub struct Args {
/// Port for the WebSocket server
#[arg(short, long, default_value = "443")]
pub port: u16,
/// Circle configurations: public_key[:init_script.rhai] (can be specified multiple times)
#[arg(short = 'c', long = "circle", required = true)]
pub circles: Vec<CircleArg>,
/// Redis URL
#[arg(long, default_value = DEFAULT_REDIS_URL)]
pub redis_url: String,
/// Worker binary path
#[arg(long, default_value = "./target/release/worker")]
pub worker_binary: PathBuf,
/// Server binary path
#[arg(long, default_value = "./target/release/server")]
pub server_binary: PathBuf,
/// Enable debug mode
#[arg(short, long)]
pub debug: bool,
/// Verbosity level
#[arg(short, long, action = clap::ArgAction::Count)]
pub verbose: u8,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn StdError>> {
let args = Args::parse();
// To build the launcher, its fields must be public, or a public constructor
// must be provided from the `circles_launcher` library.
let service_manager = tokio::task::spawn_blocking(|| create_service_manager(None))
.await??;
let launcher = Launcher {
service_manager: Arc::new(Mutex::new(service_manager)),
circles: args.circles.into_iter().map(|c| c.0).collect(),
worker_binary: args.worker_binary.to_string_lossy().into_owned(),
server_binary: args.server_binary.to_string_lossy().into_owned(),
redis_url: args.redis_url,
port: args.port,
};
launcher.launch().await?;
Ok(())
}

View File

@@ -0,0 +1,236 @@
use log::{info, debug};
use rhai_client::RhaiClientBuilder;
use sal_service_manager::{ServiceConfig as ServiceManagerConfig, ServiceStatus};
use std::sync::{Arc, Mutex};
mod builder;
pub use builder::*;
const SERVER_SERVICE_NAME: &str = "circle-ws-server";
pub struct Launcher {
pub service_manager: Arc<Mutex<Box<dyn sal_service_manager::ServiceManager>>>,
pub circles: Vec<Circle>,
pub worker_binary: String,
pub server_binary: String,
pub redis_url: String,
pub port: u16,
}
#[derive(Debug, Clone)]
pub struct Circle {
pub public_key: String,
pub init_script: Option<String>,
}
impl Circle {
pub fn service_name(&self) -> String {
format!("circle-worker-{}", self.public_key)
}
}
impl Launcher {
/// Launches all configured circles
pub async fn launch(&self) -> Result<(), Box<dyn std::error::Error>> {
self.launch_server().await?;
for circle in &self.circles {
println!("Launching circle {}", circle.public_key);
self.launch_circle(circle).await?;
}
Ok(())
}
// Launches the circles WebSocket server
async fn launch_server(&self) -> Result<(), Box<dyn std::error::Error>> {
// Check if service exists
let exists = tokio::task::spawn_blocking({
let service_manager = Arc::clone(&self.service_manager);
move || service_manager.lock().unwrap().exists(SERVER_SERVICE_NAME)
}).await??;
if !exists {
self.create_circle_server_service().await?;
}
// Check if the WebSocket server service is already running via service manager
let status = tokio::task::spawn_blocking({
let service_manager = Arc::clone(&self.service_manager);
move || service_manager.lock().unwrap().status(SERVER_SERVICE_NAME)
}).await??;
match status {
ServiceStatus::Running => {
println!("✓ WebSocket server service '{}' is already running", SERVER_SERVICE_NAME);
return Ok(());
}
ServiceStatus::Failed => {
println!("WebSocket server service '{}' exists but failed, removing it", SERVER_SERVICE_NAME);
if let Err(e) = tokio::task::spawn_blocking({
let service_manager = Arc::clone(&self.service_manager);
move || service_manager.lock().unwrap().remove(SERVER_SERVICE_NAME)
}).await? {
println!("Warning: Failed to remove failed service '{}': {}", SERVER_SERVICE_NAME, e);
return Err(e.into());
}
}
ServiceStatus::Unknown => {
println!("WebSocket server service '{}' exists but is in an unknown state, removing it", SERVER_SERVICE_NAME);
if let Err(e) = tokio::task::spawn_blocking({
let service_manager = Arc::clone(&self.service_manager);
move || service_manager.lock().unwrap().remove(SERVER_SERVICE_NAME)
}).await? {
println!("Warning: Failed to remove failed service '{}': {}", SERVER_SERVICE_NAME, e);
return Err(e.into());
}
}
ServiceStatus::Stopped => {
println!("WebSocket server service '{}' exists but is stopped, starting it", SERVER_SERVICE_NAME);
match tokio::task::spawn_blocking({
let service_manager = Arc::clone(&self.service_manager);
move || service_manager.lock().unwrap().start(SERVER_SERVICE_NAME)
}).await? {
Ok(_) => {
println!("✓ WebSocket server service '{}' started", SERVER_SERVICE_NAME);
return Ok(());
}
Err(e) => {
println!("Failed to start existing service, removing and recreating: {}", e);
if let Err(e) = tokio::task::spawn_blocking({
let service_manager = Arc::clone(&self.service_manager);
move || service_manager.lock().unwrap().remove(SERVER_SERVICE_NAME)
}).await? {
println!("Warning: Failed to remove problematic service '{}': {}", SERVER_SERVICE_NAME, e);
return Err(e.into());
}
}
}
}
}
// This part is reached if the service was Failed, Unknown or Stopped and then removed/failed to start.
// We need to create and start it.
println!("Creating and starting new WebSocket server service '{}'", SERVER_SERVICE_NAME);
self.create_circle_server_service().await?;
tokio::task::spawn_blocking({
let service_manager = Arc::clone(&self.service_manager);
move || service_manager.lock().unwrap().start(SERVER_SERVICE_NAME)
}).await??;
println!("✓ WebSocket server service '{}' started successfully", SERVER_SERVICE_NAME);
Ok(())
}
// Creates circles server service
async fn create_circle_server_service(&self) -> Result<(), Box<dyn std::error::Error>> {
let config = ServiceManagerConfig {
name: SERVER_SERVICE_NAME.to_string(),
binary_path: self.server_binary.clone(),
args: vec![
"--port".to_string(),
self.port.to_string(),
"--redis-url".to_string(),
self.redis_url.clone(),
],
working_directory: Some(std::env::current_dir()?.to_string_lossy().to_string()),
environment: std::env::vars().collect(),
auto_restart: true,
};
// Use spawn_blocking to avoid runtime conflicts
tokio::task::spawn_blocking({
let service_manager = Arc::clone(&self.service_manager);
move || service_manager.lock().unwrap().create(&config)
}).await??;
Ok(())
}
pub async fn launch_circle(&self, circle: &Circle) -> Result<(), Box<dyn std::error::Error>> {
info!("Launching circle {}", circle.public_key);
let config = ServiceManagerConfig {
name: circle.service_name(),
binary_path: self.worker_binary.clone(),
args: vec![circle.public_key.clone()],
auto_restart: true,
environment: std::env::vars().collect(),
working_directory: Some(std::env::current_dir()?.to_string_lossy().to_string()),
};
// Use spawn_blocking for service manager operations
let service_name = circle.service_name();
tokio::task::spawn_blocking({
let service_manager = Arc::clone(&self.service_manager);
let config = config.clone();
move || service_manager.lock().unwrap().create(&config)
}).await?
.map_err(|e| format!("Failed to create service manager: {}", e))?;
tokio::task::spawn_blocking({
let service_manager = Arc::clone(&self.service_manager);
let service_name = service_name.clone();
move || service_manager.lock().unwrap().start(&service_name)
}).await?
.map_err(|e| format!("Failed to start service manager: {}", e))?;
if let Some(init_script) = &circle.init_script {
send_init_script_to_worker(&circle.public_key, &init_script, &self.redis_url)
.await?;
}
Ok(())
}
/// Cleanup all services created by the launcher
pub async fn clean(&self) -> Result<(), Box<dyn std::error::Error>> {
tokio::task::spawn_blocking({
let service_manager = Arc::clone(&self.service_manager);
move || service_manager.lock().unwrap().remove(SERVER_SERVICE_NAME)
}).await??;
for circle in &self.circles {
self.clean_circle(&circle.public_key).await?;
}
println!("Cleanup completed.");
Ok(())
}
/// Cleanup all services created by the launcher
pub async fn clean_circle(&self, circle_public_key: &str) -> Result<(), Box<dyn std::error::Error>> {
let circle_key = circle_public_key.to_string();
match tokio::task::spawn_blocking({
let service_manager = Arc::clone(&self.service_manager);
move || service_manager.lock().unwrap().remove(&circle_key)
}).await? {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
}
}
async fn send_init_script_to_worker(
public_key: &str,
init_script: &str,
redis_url: &str,
) -> Result<(), Box<dyn std::error::Error>> {
println!("Sending initialization script '{}' to worker for circle: {}", init_script, public_key);
// Create RhaiClient and send script
let client = RhaiClientBuilder::new()
.redis_url(redis_url)
.caller_id("launcher")
.build()?;
client.new_play_request()
.recipient_id(&format!("rhai_tasks:{}", public_key))
.script(init_script)
.submit()
.await?;
println!("Successfully sent initialization script to worker for circle: {}", public_key);
Ok(())
}