multisig rhai flow POC app

This commit is contained in:
timurgordon 2025-05-20 22:08:00 +03:00
parent 795c04fc5a
commit 123dfc606c
16 changed files with 4021 additions and 0 deletions

2782
flowbroker/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

27
flowbroker/Cargo.toml Normal file
View File

@ -0,0 +1,27 @@
[package]
name = "flowbroker"
version = "0.1.0"
edition = "2024"
[dependencies]
sigsocket = { path = "../sigsocket" } # Path relative to flowbroker directory
actix-web = "4.3.1"
actix-rt = "2.8.0"
actix-files = "0.6.2"
actix-web-actors = "4.2.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
env_logger = "0.10.0"
log = "0.4.0"
tera = "1.19.0"
tokio = { version = "1.28.0", features = ["full"] }
dotenv = "0.15.0"
hex = "0.4.3"
uuid = { version = "1.4", features = ["v4", "serde"] }
chrono = { version = "0.4", features = ["serde"] } # For timestamps
rhai = "1.18.0"
serde_urlencoded = "0.7"
# Database models and ORM-like functionality
heromodels = { path = "../../db/heromodels" }
# Note: heromodels pulls in 'ourdb', 'heromodels_core', 'heromodels_derive'

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1 @@
148

Binary file not shown.

690
flowbroker/src/main.rs Normal file
View File

@ -0,0 +1,690 @@
use actix_files as fs;
use actix_web::{web, App, HttpResponse, HttpServer, Responder, Result as ActixResult};
use std::fs as std_fs;
use std::path::PathBuf;
use actix_web_actors::ws;
use serde::{Deserialize, Serialize};
use serde_urlencoded; // Added for from_str
use tera::{Tera, Context};
use std::sync::{Arc, Mutex, RwLock};
use sigsocket::service::SigSocketService;
use sigsocket::registry::ConnectionRegistry;
use log::{info, error};
use uuid::Uuid;
use rhai::{Engine, EvalAltResult, Position};
use serde_json::Value as JsonValue;
// use std::collections::HashMap; // Removed as no longer used
use heromodels; // Added for database models
use heromodels::db::hero::OurDB;
use heromodels::db::{Db, Collection}; // Import Db trait for .collection() and Collection trait for .set()/.get_all()
use heromodels::models::flowbroker_models::{Flow, FlowStep, SignatureRequirement}; // Import the models
use dotenv::dotenv;
use std::env;
// --- Flowbroker Specific Enums (to be used by application logic) ---
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum FlowStepStatus {
Pending, // Step created, not yet processed
InProgress, // Step is actively being processed (e.g., waiting for signatures)
Completed, // All requirements for this step are met
Failed, // Step failed (e.g., a signature requirement failed or timed out)
Skipped, // Step was skipped (e.g., due to conditional logic not yet implemented)
}
impl FlowStepStatus {
pub fn to_db_string(&self) -> String {
format!("{:?}", self)
}
pub fn from_db_string(s: &str) -> Result<Self, String> {
match s {
"Pending" => Ok(FlowStepStatus::Pending),
"InProgress" => Ok(FlowStepStatus::InProgress),
"Completed" => Ok(FlowStepStatus::Completed),
"Failed" => Ok(FlowStepStatus::Failed),
"Skipped" => Ok(FlowStepStatus::Skipped),
_ => Err(format!("Invalid FlowStepStatus string: {}", s)),
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum SignatureRequirementStatus {
Pending, // Not yet processed or sent for signing
SentToClient, // Sent to client via SigSocket, awaiting signature
Signed, // Successfully signed
Failed, // Signing failed (e.g., client rejected, timeout, error)
Error, // An internal error occurred processing this requirement
}
impl SignatureRequirementStatus {
pub fn to_db_string(&self) -> String {
format!("{:?}", self)
}
pub fn from_db_string(s: &str) -> Result<Self, String> {
match s {
"Pending" => Ok(SignatureRequirementStatus::Pending),
"SentToClient" => Ok(SignatureRequirementStatus::SentToClient),
"Signed" => Ok(SignatureRequirementStatus::Signed),
"Failed" => Ok(SignatureRequirementStatus::Failed),
"Error" => Ok(SignatureRequirementStatus::Error),
_ => Err(format!("Invalid SignatureRequirementStatus string: {}", s)),
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum FlowStatus {
Pending, // Flow created, no steps initiated
InProgress, // Flow started, steps are being processed
Completed, // All steps successfully signed
Failed, // A step failed or timed out
}
impl FlowStatus {
pub fn to_db_string(&self) -> String {
format!("{:?}", self)
}
pub fn from_db_string(s: &str) -> Result<Self, String> {
match s {
"Pending" => Ok(FlowStatus::Pending),
"InProgress" => Ok(FlowStatus::InProgress),
"Completed" => Ok(FlowStatus::Completed),
"Failed" => Ok(FlowStatus::Failed),
_ => Err(format!("Invalid FlowStatus string: {}", s)),
}
}
}
// NOTE: The old Flow, FlowStep, and SignatureRequirement structs previously here
// have been removed. Their definitions are now in the heromodels crate.
// --- AppState ---
pub struct AppState {
templates: Tera,
sigsocket_service: Arc<SigSocketService>,
db: Arc<OurDB>, // Using OurDB from heromodels
next_id_counter: Arc<Mutex<u32>>, // For generating temporary primary keys
}
// --- Form Deserialization (for new dynamic form) ---
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
pub struct RequirementRealFormData {
// The name attributes in HTML are like: steps[0][requirements][0][message]
pub message: String, // Made fields public for external construction in tests
pub public_key: String, // Made fields public
}
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
pub struct FlowStepFormData {
description: Option<String>, // If description field is optional and might not be present
requirements: Vec<RequirementRealFormData>,
}
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
pub struct CreateFlowRealFormData { // Renamed to avoid confusion with heromodels::Flow
flow_name: String,
steps: Vec<FlowStepFormData>,
}
#[derive(serde::Deserialize, Debug)]
pub struct RhaiScriptFormData {
rhai_script: String,
}
// --- Handlers ---
// Display list of flows
async fn list_flows(data: web::Data<AppState>) -> ActixResult<HttpResponse> {
let mut context = Context::new();
match data.db.collection::<Flow>() {
Ok(flow_collection) => {
match flow_collection.get_all() {
Ok(mut flows_vec) => {
// Sort by creation date, newest first
flows_vec.sort_by(|a, b| b.base_data.created_at.cmp(&a.base_data.created_at));
context.insert("flows", &flows_vec);
},
Err(e) => {
error!("Failed to retrieve flows from database: {:?}", e);
// Optionally, insert an empty vec or an error message for the template
context.insert("flows", &Vec::<Flow>::new());
context.insert("db_error", "Failed to load flows.");
}
}
},
Err(e) => {
error!("Failed to get flow collection from database: {}", e);
context.insert("flows", &Vec::<Flow>::new());
context.insert("db_error", "Database collection error.");
}
}
let rendered = data.templates.render("index.html", &context)
.map_err(|e| {
error!("Template error (index.html): {}", e);
actix_web::error::ErrorInternalServerError("Template error rendering index.html")
})?;
Ok(HttpResponse::Ok().content_type("text/html").body(rendered))
}
// Show form to create a new flow
#[derive(Serialize, Clone)] // Clone is for the context, Serialize for Tera
struct RhaiExampleScript {
name: String,
content: String,
}
async fn new_flow_form(data: web::Data<AppState>) -> impl Responder {
let mut context = Context::new();
let mut example_scripts = Vec::new();
let examples_path = PathBuf::from("templates/rhai_examples");
if examples_path.is_dir() {
match std_fs::read_dir(examples_path) {
Ok(entries) => {
for entry in entries {
if let Ok(entry) = entry {
let path = entry.path();
if path.is_file() && path.extension().and_then(|s| s.to_str()) == Some("rhai") {
match std_fs::read_to_string(&path) {
Ok(content) => {
let file_stem = path.file_stem().and_then(|s| s.to_str()).unwrap_or("Unknown Script");
// Convert filename (e.g., simple_two_step) to a nicer name (e.g., Simple Two Step)
let name = file_stem.replace("_", " ")
.split_whitespace()
.map(|word| {
let mut c = word.chars();
match c.next() {
None => String::new(),
Some(f) => f.to_uppercase().collect::<String>() + c.as_str(),
}
})
.collect::<Vec<String>>().join(" ");
example_scripts.push(RhaiExampleScript { name, content });
}
Err(e) => {
error!("Failed to read Rhai example script {}: {}", path.display(), e);
}
}
}
}
}
}
Err(e) => {
error!("Failed to read rhai_examples directory: {}", e);
}
}
}
context.insert("example_scripts", &example_scripts);
info!("Rendering new flow form with {} examples from files.", example_scripts.len());
match data.templates.render("new_flow_form.html", &context) {
Ok(rendered) => HttpResponse::Ok().body(rendered),
Err(e) => {
error!("Template error in new_flow_form: {}", e);
HttpResponse::InternalServerError().body(format!("Template error: {}", e))
}
}
}
// Handle creation of a new flow
async fn create_flow(
data: web::Data<AppState>,
raw_form_data: String, // Changed to accept raw String
) -> impl Responder {
info!("Received raw form data for create_flow: {}", raw_form_data);
// Attempt to parse the raw form data
let form_parse_result: Result<CreateFlowRealFormData, serde_urlencoded::de::Error> = serde_urlencoded::from_str(&raw_form_data);
let form = match form_parse_result {
Ok(parsed_form_data) => {
info!("Successfully parsed form data: {:?}", parsed_form_data);
parsed_form_data // Use the successfully parsed data
}
Err(e) => {
error!("Failed to parse form data from string: {}. Raw data: {}", e, raw_form_data);
return HttpResponse::BadRequest().body(format!("Form parsing error: {}. Please check input and logs.", e));
}
};
// --- Logic starts here, using `form` which is now CreateFlowRealFormData ---
info!("Processing create_flow request for: {}", form.flow_name);
let db = &data.db;
let mut id_counter = match data.next_id_counter.lock() {
Ok(guard) => guard,
Err(poisoned) => {
error!("Mutex for next_id_counter was poisoned: {}. Recovering.", poisoned);
poisoned.into_inner() // Attempt to recover
}
};
// 1. Create and save the main Flow object
*id_counter += 1;
let flow_db_id = *id_counter;
let flow_uuid = Uuid::new_v4().to_string();
let flow_instance = Flow::new(
flow_db_id,
&flow_uuid,
&form.flow_name,
FlowStatus::Pending.to_db_string() // Use local enum's string representation
);
match db.collection::<Flow>() {
Ok(flow_collection) => {
if let Err(e) = flow_collection.set(&flow_instance) {
error!("Failed to save Flow (name: {}): {:?}. Aborting flow creation.", form.flow_name, e);
return HttpResponse::InternalServerError().body(format!("Failed to save main flow data: {:?}", e));
}
info!("Saved Flow object for '{}', UUID: {}, DB_ID: {}", flow_instance.name, flow_instance.flow_uuid, flow_instance.base_data.id);
}
Err(e) => {
error!("Failed to get Flow collection: {:?}. Aborting flow creation.", e);
return HttpResponse::InternalServerError().body(format!("Database error getting flow collection: {:?}", e));
}
}
// 2. Create and save FlowStep and SignatureRequirement objects
for (step_idx, step_form_data) in form.steps.into_iter().enumerate() {
*id_counter += 1;
let flow_step_db_id = *id_counter;
let mut flow_step_instance = FlowStep::new(
flow_step_db_id,
flow_instance.base_data.id, // Use ID from the saved Flow instance
step_idx as u32, // step_order
FlowStepStatus::Pending.to_db_string() // Use local enum's string representation
);
if let Some(desc) = step_form_data.description {
if !desc.is_empty() { // Only set if description is not empty
flow_step_instance = flow_step_instance.description(desc);
}
}
match db.collection::<FlowStep>() {
Ok(step_collection) => {
if let Err(e) = step_collection.set(&flow_step_instance) {
error!("Failed to save FlowStep (flow: {}, step_idx: {}): {:?}", flow_instance.name, step_idx, e);
return HttpResponse::InternalServerError().body(format!("Failed to save flow step: {:?}", e));
}
info!("Saved FlowStep {} for flow '{}', DB_ID: {}", step_idx + 1, flow_instance.name, flow_step_instance.base_data.id);
}
Err(e) => {
error!("Failed to get FlowStep collection: {:?}. Aborting.", e);
return HttpResponse::InternalServerError().body(format!("Database error getting step collection: {:?}", e));
}
}
for (req_idx, req_form_data) in step_form_data.requirements.into_iter().enumerate() {
*id_counter += 1;
let sig_req_db_id = *id_counter;
let sig_req_instance = SignatureRequirement::new(
sig_req_db_id,
flow_step_instance.base_data.id, // Use ID from the saved FlowStep instance
&req_form_data.public_key,
&req_form_data.message,
SignatureRequirementStatus::Pending.to_db_string() // Use local enum's string representation
);
match db.collection::<SignatureRequirement>() {
Ok(req_collection) => {
if let Err(e) = req_collection.set(&sig_req_instance) {
error!("Failed to save SignatureRequirement (flow: {}, step: {}, req_idx: {}): {:?}", flow_instance.name, step_idx, req_idx, e);
return HttpResponse::InternalServerError().body(format!("Failed to save signature requirement: {:?}", e));
}
info!(
"Saved SignatureRequirement {} for step {} of flow '{}', DB_ID: {}",
req_idx + 1, step_idx + 1, flow_instance.name, sig_req_instance.base_data.id
);
}
Err(e) => {
error!("Failed to get SignatureRequirement collection: {:?}. Aborting.", e);
return HttpResponse::InternalServerError().body(format!("Database error getting requirement collection: {:?}", e));
}
}
}
}
info!("Finished processing all steps for flow '{}', UUID: {}", flow_instance.name, flow_instance.flow_uuid);
HttpResponse::SeeOther()
.append_header((actix_web::http::header::LOCATION, "/"))
.finish()
}
// --- Rhai-Callable Helper Functions ---
fn rhai_create_flow_entry(
db_arc: Arc<OurDB>,
id_counter_arc: Arc<Mutex<u32>>,
name: String,
) -> Result<u32, Box<rhai::EvalAltResult>> {
info!("Rhai: Attempting to create flow entry with name: {}", name);
let mut id_counter = match id_counter_arc.lock() {
Ok(guard) => guard,
Err(poisoned) => {
let err_msg = format!("Rhai: Mutex for next_id_counter was poisoned: {}", poisoned);
error!("{}", err_msg);
return Err(Box::new(rhai::EvalAltResult::ErrorRuntime(err_msg.into(), Position::NONE)));
}
};
*id_counter += 1;
let flow_db_id = *id_counter;
let flow_uuid = Uuid::new_v4().to_string();
let flow_instance = Flow::new(
flow_db_id,
&flow_uuid,
&name,
FlowStatus::Pending.to_db_string(),
);
match db_arc.collection::<Flow>() {
Ok(flow_collection) => {
if let Err(e) = flow_collection.set(&flow_instance) {
let err_msg = format!("Rhai: Failed to save Flow (name: {}): {:?}", name, e);
error!("{}", err_msg);
return Err(Box::new(rhai::EvalAltResult::ErrorRuntime(err_msg.into(), Position::NONE)));
}
info!("Rhai: Saved Flow object for '{}', UUID: {}, DB_ID: {}", flow_instance.name, flow_instance.flow_uuid, flow_instance.base_data.id);
Ok(flow_instance.base_data.id)
}
Err(e) => {
let err_msg = format!("Rhai: Failed to get Flow collection: {:?}", e);
error!("{}", err_msg);
return Err(Box::new(rhai::EvalAltResult::ErrorRuntime(err_msg.into(), Position::NONE)));
}
}
}
fn rhai_add_step_entry(
db_arc: Arc<OurDB>,
id_counter_arc: Arc<Mutex<u32>>,
flow_db_id: u32, // ID of the parent flow
description: String,
order: u32,
) -> Result<u32, Box<rhai::EvalAltResult>> {
info!(
"Rhai: Adding step to flow ID {}, order {}, description: '{}'",
flow_db_id, order, description
);
let mut id_counter = match id_counter_arc.lock() {
Ok(guard) => guard,
Err(poisoned) => {
let err_msg = format!("Rhai: Mutex for next_id_counter was poisoned: {}", poisoned);
error!("{}", err_msg);
return Err(Box::new(rhai::EvalAltResult::ErrorRuntime(err_msg.into(), Position::NONE)));
}
};
*id_counter += 1;
let flow_step_db_id = *id_counter;
let mut flow_step_instance = FlowStep::new(
flow_step_db_id,
flow_db_id,
order,
FlowStepStatus::Pending.to_db_string(),
);
if !description.is_empty() {
flow_step_instance = flow_step_instance.description(description);
}
match db_arc.collection::<FlowStep>() {
Ok(step_collection) => {
if let Err(e) = step_collection.set(&flow_step_instance) {
let err_msg = format!(
"Rhai: Failed to save FlowStep (flow_id: {}, order: {}): {:?}",
flow_db_id, order, e
);
error!("{}", err_msg);
return Err(Box::new(rhai::EvalAltResult::ErrorRuntime(err_msg.into(), Position::NONE)));
}
info!(
"Rhai: Saved FlowStep for flow_id {}, order {}, DB_ID: {}",
flow_db_id, order, flow_step_instance.base_data.id
);
Ok(flow_step_instance.base_data.id)
}
Err(e) => {
let err_msg = format!("Rhai: Failed to get FlowStep collection: {:?}", e);
error!("{}", err_msg);
return Err(Box::new(rhai::EvalAltResult::ErrorRuntime(err_msg.into(), Position::NONE)));
}
}
}
fn rhai_add_requirement_entry(
db_arc: Arc<OurDB>,
id_counter_arc: Arc<Mutex<u32>>,
step_db_id: u32, // ID of the parent step
public_key: String,
message: String,
) -> Result<u32, Box<rhai::EvalAltResult>> {
info!(
"Rhai: Adding requirement to step ID {}, pk: '{}', msg: '{}'",
step_db_id, public_key, message
);
let mut id_counter = match id_counter_arc.lock() {
Ok(guard) => guard,
Err(poisoned) => {
let err_msg = format!("Rhai: Mutex for next_id_counter was poisoned: {}", poisoned);
error!("{}", err_msg);
return Err(Box::new(rhai::EvalAltResult::ErrorRuntime(err_msg.into(), Position::NONE)));
}
};
*id_counter += 1;
let sig_req_db_id = *id_counter;
let sig_req_instance = SignatureRequirement::new(
sig_req_db_id,
step_db_id,
&public_key,
&message,
SignatureRequirementStatus::Pending.to_db_string(),
);
match db_arc.collection::<SignatureRequirement>() {
Ok(req_collection) => {
if let Err(e) = req_collection.set(&sig_req_instance) {
let err_msg = format!(
"Rhai: Failed to save SigRequirement (step_id: {}): {:?}",
step_db_id, e
);
error!("{}", err_msg);
return Err(Box::new(rhai::EvalAltResult::ErrorRuntime(err_msg.into(), Position::NONE)));
}
info!(
"Rhai: Saved SigRequirement for step_id {}, DB_ID: {}",
step_db_id, sig_req_instance.base_data.id
);
Ok(sig_req_instance.base_data.id)
}
Err(e) => {
let err_msg = format!("Rhai: Failed to get SigRequirement collection: {:?}", e);
error!("{}", err_msg);
return Err(Box::new(rhai::EvalAltResult::ErrorRuntime(err_msg.into(), Position::NONE)));
}
}
}
// Handle creation of a new flow from a Rhai script
async fn create_flow_from_script(
data: web::Data<AppState>,
form: web::Form<RhaiScriptFormData>,
) -> impl Responder {
info!("Received Rhai script for flow creation:\n{}", form.rhai_script);
let mut engine = Engine::new();
// Clone Arcs for capturing in closures
let db_clone_for_flow = data.db.clone();
let id_clone_for_flow = data.next_id_counter.clone();
let db_clone_for_step = data.db.clone();
let id_clone_for_step = data.next_id_counter.clone();
let db_clone_for_req = data.db.clone();
let id_clone_for_req = data.next_id_counter.clone();
engine
.register_fn("create_flow", move |name: String| {
crate::rhai_create_flow_entry(db_clone_for_flow.clone(), id_clone_for_flow.clone(), name)
})
.register_fn("add_step", move |flow_id: u32, desc: String, order: i64| {
if order < 0 || order > u32::MAX as i64 {
return Err(Box::new(EvalAltResult::ErrorRuntime(format!("Order {} is out of range for u32", order).into(), Position::NONE)));
}
crate::rhai_add_step_entry(db_clone_for_step.clone(), id_clone_for_step.clone(), flow_id, desc, order as u32)
})
.register_fn("add_requirement", move |step_id: u32, pk: String, msg: String| {
crate::rhai_add_requirement_entry(db_clone_for_req.clone(), id_clone_for_req.clone(), step_id, pk, msg)
});
match engine.eval::<()>(&form.rhai_script) { // Expecting () as successful script execution doesn't need to return a value to Rust here.
Ok(_) => {
info!("Rhai script executed successfully.");
HttpResponse::SeeOther()
.append_header((actix_web::http::header::LOCATION, "/"))
.finish()
}
Err(e) => {
error!("Rhai script execution failed: {}", e.to_string());
HttpResponse::BadRequest().body(format!("Rhai script error: {}\n\nYour script was:\n{}", e.to_string(), form.rhai_script))
}
}
}
// Placeholder for SigSocket WebSocket handler
async fn websocket_handler(
req: actix_web::HttpRequest,
stream: actix_web::web::Payload,
service: web::Data<Arc<SigSocketService>>,
) -> ActixResult<HttpResponse> {
info!("WebSocket connection attempt");
let handler = service.create_websocket_handler();
ws::start(handler, &req, stream)
}
// --- Extracted Helper Functions for App Setup and Configuration ---
/// Sets up the shared application data (AppState).
/// Allows overriding the database path for testing purposes.
pub async fn setup_app_data(db_path_override: Option<String>) -> Result<web::Data<AppState>, std::io::Error> {
// Initialize templates
let tera = match Tera::new("templates/**/*") {
Ok(t) => t,
Err(e) => {
error!("Critical: Tera template parsing error(s): {}", e);
// Convert tera::Error to std::io::Error
return Err(std::io::Error::new(std::io::ErrorKind::Other, format!("Tera init error: {}", e)));
}
};
// Initialize SigSocket registry and service
let registry = Arc::new(RwLock::new(ConnectionRegistry::new()));
let sigsocket_service = Arc::new(SigSocketService::new(registry.clone()));
// Load environment variables from .env file
dotenv().ok();
// Initialize Database
let database_path = db_path_override.unwrap_or_else(||
env::var("DATABASE_PATH").unwrap_or_else(|_|
{
info!("DATABASE_PATH not set, defaulting to ./flowbroker_db");
"./flowbroker_db".to_string()
})
);
let db = match OurDB::new(&database_path, true) { // true for create_if_missing
Ok(db_instance) => Arc::new(db_instance),
Err(e) => {
error!("Failed to initialize database at '{}': {}. Please ensure the path is writable.", database_path, e);
// Convert heromodels::Error to std::io::Error (assuming Error impls std::error::Error)
return Err(std::io::Error::new(std::io::ErrorKind::Other, format!("DB init error: {}", e)));
}
};
info!("Database initialized at: {}", database_path);
// Initialize ID counter for temporary primary keys
let next_id_counter = Arc::new(Mutex::new(0_u32));
// TODO: Replace this with a robust primary key generation strategy from the database itself if possible.
// Create shared application state
Ok(web::Data::new(AppState {
templates: tera,
sigsocket_service: sigsocket_service.clone(), // Clone for AppState
db,
next_id_counter,
}))
}
/// Configures the application routes.
pub fn configure_app_routes(cfg: &mut web::ServiceConfig) {
// Note: AppState should be added via .app_data() before calling this configure function.
// The websocket_handler specifically needs web::Data<Arc<SigSocketService>>.
// The main HttpServer setup will add AppState (which includes an Arc<SigSocketService>)
// and also the specific web::Data<Arc<SigSocketService>> for handlers like websocket_handler that expect it directly.
cfg.route("/", web::get().to(list_flows))
.service(
web::scope("/flows") // Group flow-related routes under /flows
// .route("", web::get().to(list_flows)) // If you want /flows to also list flows
.route("/new", web::get().to(new_flow_form))
.route("/create", web::post().to(create_flow))
.route("/create_script", web::post().to(create_flow_from_script)) // Moved inside /flows scope
)
.service(web::resource("/ws/").route(web::get().to(websocket_handler)))
.service(fs::Files::new("/static", "./static").show_files_listing()); // Static files
}
// --- Main Function ---
#[actix_web::main]
async fn main() -> std::io::Result<()> {
env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
let app_data = match setup_app_data(None).await {
Ok(data) => data,
Err(e) => {
error!("Failed to setup application data: {}", e);
std::process::exit(1);
}
};
// The AppState (app_data) already contains an Arc<SigSocketService>.
// Handlers like websocket_handler that take web::Data<Arc<SigSocketService>> directly
// will be able to access it if AppState is correctly registered and the handler signature matches.
// Alternatively, if a handler needs *only* the SigSocketService, it can be added separately.
// For the websocket_handler as defined (taking web::Data<Arc<SigSocketService>>),
// it needs this specific type registered with app_data.
let sigsocket_service_for_ws_handler_data = web::Data::new(app_data.sigsocket_service.clone());
info!("Flowbroker server starting on http://127.0.0.1:8081");
info!("SigSocket WebSocket endpoint available at ws://127.0.0.1:8081/ws");
HttpServer::new(move || {
App::new()
.app_data(app_data.clone()) // Main app state (includes SigSocketService)
.app_data(sigsocket_service_for_ws_handler_data.clone()) // Specifically for handlers expecting web::Data<Arc<SigSocketService>>
.configure(configure_app_routes)
})
.bind("127.0.0.1:8081")? // Using a different port for now
.run()
.await
}

34
flowbroker/start.sh Executable file
View File

@ -0,0 +1,34 @@
#!/bin/zsh
FORCE_KILL=false
# Parse command line options
while getopts ":f" opt; do
case ${opt} in
f )
FORCE_KILL=true
;;
\? )
echo "Usage: cmd [-f]"
exit 1
;;
esac
done
if [ "$FORCE_KILL" = true ] ; then
echo "Attempting to kill process on port 8081..."
# Get PID of process using port 8081 and kill it
# -t option for lsof outputs only the PID
# xargs -r ensures kill is only run if lsof finds a PID
lsof -t -i:8081 | xargs -r kill -9
if [ $? -eq 0 ]; then
echo "Process(es) on port 8081 killed."
else
echo "No process found on port 8081 or failed to kill."
fi
# Give a moment for the port to be released
sleep 1
fi
echo "Starting Flowbroker server..."
cargo run

127
flowbroker/static/style.css Normal file
View File

@ -0,0 +1,127 @@
body {
font-family: sans-serif;
margin: 20px;
line-height: 1.6;
}
h1, h2 {
color: #333;
}
a {
color: #007bff;
text-decoration: none;
}
a:hover {
text-decoration: underline;
}
form div {
margin-bottom: 10px;
}
label {
display: block;
margin-bottom: 5px;
}
input[type="text"], textarea {
width: 100%;
padding: 8px;
box-sizing: border-box;
border: 1px solid #ccc;
border-radius: 4px;
}
button {
background-color: #007bff;
color: white;
padding: 10px 15px;
border: none;
border-radius: 4px;
cursor: pointer;
}
button:hover {
background-color: #0056b3;
}
hr {
margin: 20px 0;
}
#flows-list ul {
list-style-type: none;
padding: 0;
}
#flows-list li {
border: 1px solid #eee;
padding: 10px;
margin-bottom: 10px;
border-radius: 4px;
}
/* Styles for dynamic form elements from create_flow.html */
.step, .requirement {
border: 1px solid #ddd;
padding: 15px; /* Increased padding */
margin-bottom: 15px;
border-radius: 4px;
background-color: #f9f9f9;
}
.step h3, .step h4, .requirement h5 {
margin-top: 0;
color: #555; /* Slightly softer color */
}
.step .requirementsContainer {
margin-left: 20px;
border-left: 3px solid #007bff; /* Thicker border */
padding-left: 20px; /* Increased padding */
margin-top: 10px;
margin-bottom: 10px;
}
button.removeStepBtn, button.removeRequirementBtn {
background-color: #dc3545;
color: white;
padding: 5px 10px; /* Adjusted padding */
border: none;
border-radius: 4px;
cursor: pointer;
margin-top: 10px; /* Increased margin */
float: right; /* Align to the right */
}
button.removeStepBtn:hover, button.removeRequirementBtn:hover {
background-color: #c82333;
}
/* Clearfix for floated remove buttons */
.step::after, .requirement::after {
content: "";
clear: both;
display: table;
}
.addBtn { /* Style for Add Step / Add Requirement buttons */
background-color: #28a745;
color: white;
padding: 8px 12px;
border: none;
border-radius: 4px;
cursor: pointer;
margin-top: 10px;
margin-bottom: 10px;
}
.addBtn:hover {
background-color: #218838;
}
/* General styling for form elements within steps/requirements for consistency */
.step input[type="text"], .step textarea,
.requirement input[type="text"], .requirement textarea {
margin-bottom: 8px; /* Add some space below inputs */
}

View File

@ -0,0 +1,187 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Flowbroker - Create Flow</title>
<link rel="stylesheet" href="/static/style.css">
<style>
.step, .requirement {
border: 1px solid #ddd;
padding: 10px;
margin-bottom: 15px;
border-radius: 4px;
background-color: #f9f9f9;
}
.step h3, .step h4, .requirement h5 {
margin-top: 0;
}
.step .requirementsContainer {
margin-left: 20px;
border-left: 2px solid #007bff;
padding-left: 15px;
}
button.removeStepBtn, button.removeRequirementBtn {
background-color: #dc3545;
margin-top: 5px;
}
button.removeStepBtn:hover, button.removeRequirementBtn:hover {
background-color: #c82333;
}
</style>
</head>
<body>
<h1>Create New Flow</h1>
<form id="createFlowForm" action="/flows" method="post">
<div>
<label for="flow_name">Flow Name:</label>
<input type="text" id="flow_name" name="flow_name" required>
</div>
<hr>
<div id="stepsContainer">
<!-- Steps will be added here by JavaScript -->
</div>
<button type="button" id="addStepBtn" class="addBtn">Add Step</button>
<hr>
<button type="submit">Create Flow</button>
</form>
<p><a href="/">Back to Flows List</a></p>
<!-- Template for a new step -->
<template id="stepTemplate">
<div class="step" data-step-index="">
<h3>Step <span class="step-number"></span></h3>
<button type="button" class="removeStepBtn">Remove This Step</button>
<div>
<label>Step Description (Optional):</label>
<input type="text" name="steps[X].description" class="step-description">
</div>
<h4>Signature Requirements for Step <span class="step-number"></span></h4>
<div class="requirementsContainer" data-step-index="">
<!-- Requirements will be added here -->
</div>
<button type="button" class="addRequirementBtn addBtn" data-step-index="">Add Signature Requirement</button>
</div>
</template>
<!-- Template for a new signature requirement -->
<template id="requirementTemplate">
<div class="requirement" data-req-index="">
<h5>Requirement <span class="req-number"></span></h5>
<button type="button" class="removeRequirementBtn">Remove Requirement</button>
<div>
<label>Message to Sign:</label>
<textarea name="steps[X].requirements[Y].message" rows="2" required class="req-message"></textarea>
</div>
<div>
<label>Required Public Key:</label>
<input type="text" name="steps[X].requirements[Y].public_key" required class="req-pubkey">
</div>
</div>
</template>
<script>
document.addEventListener('DOMContentLoaded', () => {
const stepsContainer = document.getElementById('stepsContainer');
const addStepBtn = document.getElementById('addStepBtn');
const stepTemplate = document.getElementById('stepTemplate');
const requirementTemplate = document.getElementById('requirementTemplate');
const form = document.getElementById('createFlowForm');
const updateIndices = () => {
const steps = stepsContainer.querySelectorAll('.step');
steps.forEach((step, stepIdx) => {
// Update step-level attributes and text
step.dataset.stepIndex = stepIdx;
step.querySelector('.step-number').textContent = stepIdx + 1;
step.querySelector('.step-description').name = `steps[${stepIdx}].description`;
const addReqBtn = step.querySelector('.addRequirementBtn');
if (addReqBtn) addReqBtn.dataset.stepIndex = stepIdx;
const requirements = step.querySelectorAll('.requirementsContainer .requirement');
requirements.forEach((req, reqIdx) => {
// Update requirement-level attributes and text
req.dataset.reqIndex = reqIdx;
req.querySelector('.req-number').textContent = reqIdx + 1;
req.querySelector('.req-message').name = `steps[${stepIdx}].requirements[${reqIdx}].message`;
req.querySelector('.req-pubkey').name = `steps[${stepIdx}].requirements[${reqIdx}].public_key`;
});
});
};
const addRequirement = (currentStepElement, stepIndex) => {
const requirementsContainer = currentStepElement.querySelector('.requirementsContainer');
const reqFragment = requirementTemplate.content.cloneNode(true);
const newRequirement = reqFragment.querySelector('.requirement');
requirementsContainer.appendChild(newRequirement);
updateIndices(); // Update all indices after adding
};
const addStep = () => {
const stepFragment = stepTemplate.content.cloneNode(true);
const newStep = stepFragment.querySelector('.step');
stepsContainer.appendChild(newStep);
// Add at least one requirement to the new step automatically
const currentStepIndex = stepsContainer.querySelectorAll('.step').length - 1;
addRequirement(newStep, currentStepIndex);
updateIndices(); // Update all indices after adding
};
// Event delegation for remove buttons and add requirement button
stepsContainer.addEventListener('click', (event) => {
if (event.target.classList.contains('removeStepBtn')) {
event.target.closest('.step').remove();
if (stepsContainer.querySelectorAll('.step').length === 0) { // Ensure at least one step
addStep();
}
updateIndices();
} else if (event.target.classList.contains('addRequirementBtn')) {
const stepElement = event.target.closest('.step');
const stepIndex = parseInt(stepElement.dataset.stepIndex, 10);
addRequirement(stepElement, stepIndex);
} else if (event.target.classList.contains('removeRequirementBtn')) {
const requirementElement = event.target.closest('.requirement');
const stepElement = event.target.closest('.step');
const requirementsContainer = stepElement.querySelector('.requirementsContainer');
requirementElement.remove();
// Ensure at least one requirement per step
if (requirementsContainer.querySelectorAll('.requirement').length === 0) {
const stepIndex = parseInt(stepElement.dataset.stepIndex, 10);
addRequirement(stepElement, stepIndex);
}
updateIndices();
}
});
addStepBtn.addEventListener('click', addStep);
// Add one step by default when the page loads
if (stepsContainer.children.length === 0) {
addStep();
}
// Optional: Validate that there's at least one step and one requirement before submit
form.addEventListener('submit', (event) => {
if (stepsContainer.querySelectorAll('.step').length === 0) {
alert('Please add at least one step to the flow.');
event.preventDefault();
return;
}
const steps = stepsContainer.querySelectorAll('.step');
for (let i = 0; i < steps.length; i++) {
if (steps[i].querySelectorAll('.requirementsContainer .requirement').length === 0) {
alert(`Step ${i + 1} must have at least one signature requirement.`);
event.preventDefault();
return;
}
}
});
});
</script>
</body>
</html>

View File

@ -0,0 +1,28 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Flowbroker - Flows</title>
<link rel="stylesheet" href="/static/style.css">
</head>
<body>
<h1>Active Flows</h1>
<a href="/flows/new">Create New Flow</a>
<div id="flows-list">
{% if flows %}
<ul>
{% for flow in flows %}
<li>
<strong>{{ flow.name }}</strong> (UUID: {{ flow.flow_uuid }}) - Status: {{ flow.status }}
<br>
Created: {{ flow.base_data.created_at | date(format="%Y-%m-%d %H:%M:%S") }} <!-- Assuming created_at is a Unix timestamp -->
<p><a href="/flows/{{ flow.flow_uuid }}">View Details</a></p> <!-- Link uses flow_uuid -->
</li>
{% endfor %}
</ul>
{% else %}
<p>No active flows. <a href="/flows/new">Create one?</a></p>
{% endif %}
</div>
</body>
</html>

View File

@ -0,0 +1,105 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Create Flow from Rhai Script</title>
<link rel="stylesheet" href="/static/style.css">
<style>
body {
font-family: sans-serif;
margin: 20px;
background-color: #f4f4f9;
color: #333;
}
.container {
background-color: #fff;
padding: 20px;
border-radius: 8px;
box-shadow: 0 0 10px rgba(0,0,0,0.1);
}
h1 {
color: #333;
}
textarea {
width: 100%;
min-height: 300px;
padding: 10px;
border: 1px solid #ccc;
border-radius: 4px;
box-sizing: border-box;
margin-bottom: 15px;
font-family: monospace;
font-size: 14px;
}
button {
background-color: #007bff;
color: white;
padding: 10px 15px;
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 16px;
}
button:hover {
background-color: #0056b3;
}
a {
color: #007bff;
text-decoration: none;
}
a:hover {
text-decoration: underline;
}
.back-link {
display: block;
margin-bottom: 20px;
}
</style>
</head>
<body>
<div class="container">
<a href="/" class="back-link">&larr; Back to Flow List</a>
<h1>Create Flow from Rhai Script</h1>
<div id="rhai_script_examples_data" style="display: none;">
{% for example in example_scripts %}
<div id="rhai_example_content_{{ loop.index }}">{{ example.content }}</div>
{% endfor %}
</div>
<div>
<label for="example_script_selector">Load Example Script:</label>
<select id="example_script_selector">
<option value="">-- Select an Example --</option>
{% for example in example_scripts %}
<option value="{{ example.name }}" data-example-id="rhai_example_content_{{ loop.index }}">{{ example.name }}</option>
{% endfor %}
</select>
</div>
<form action="/flows/create_script" method="POST" style="margin-top: 15px;">
<div>
<label for="rhai_script">Rhai Script:</label>
</div>
<div>
<textarea id="rhai_script" name="rhai_script" placeholder="Enter your Rhai script here or select an example above..."></textarea>
</div>
<button type="submit">Create Flow</button>
</form>
<script>
document.getElementById('example_script_selector').addEventListener('change', function() {
var selectedOption = this.options[this.selectedIndex];
var exampleId = selectedOption.getAttribute('data-example-id');
if (exampleId) {
var scriptContent = document.getElementById(exampleId).textContent; // Use textContent
document.getElementById('rhai_script').value = scriptContent;
} else {
document.getElementById('rhai_script').value = '';
}
});
</script>
</div>
</body>
</html>

View File

@ -0,0 +1,8 @@
// Minimal Single Signature Flow
let flow_id = create_flow("Quick Sign");
let step1_id = add_step(flow_id, "Sign the message", 0);
add_requirement(step1_id, "any_signer_pk", "Please provide your signature.");
print("Minimal Flow (ID: " + flow_id + ") defined.");
()

View File

@ -0,0 +1,18 @@
// Flow with Multi-Requirement Step
// If create_flow, add_step, or add_requirement fail from Rust,
// the script will stop and the error will be reported by the server.
let flow_id = create_flow("Multi-Req Sign Off");
let step1_id = add_step(flow_id, "Initial Signatures (3 needed)", 0);
add_requirement(step1_id, "signer1_pk", "Signatory 1: Please sign terms.");
add_requirement(step1_id, "signer2_pk", "Signatory 2: Please sign terms.");
add_requirement(step1_id, "signer3_pk", "Signatory 3: Please sign terms.");
let step2_id = add_step(flow_id, "Final Confirmation", 1);
add_requirement(step2_id, "final_approver_pk", "Final approval for multi-req sign off.");
print("Multi-Requirement Flow (ID: " + flow_id + ") defined.");
()

View File

@ -0,0 +1,14 @@
// Simple Two-Step Flow
// If create_flow, add_step, or add_requirement fail from Rust,
// the script will stop and the error will be reported by the server.
let flow_id = create_flow("Simple Two-Stepper");
let step1_id = add_step(flow_id, "Collect Document", 0);
add_requirement(step1_id, "user_pubkey_document", "Please sign the document hash.");
let step2_id = add_step(flow_id, "Approval Signature", 1);
add_requirement(step2_id, "approver_pubkey", "Please approve the collected document.");
print("Simple Two-Step Flow (ID: " + flow_id + ") defined.");
()