rhailib/docs/DISPATCHER_FLOW_ARCHITECTURE.md

11 KiB

Dispatcher-Based Event-Driven Flow Architecture

Overview

This document describes the implementation of a non-blocking, event-driven flow architecture for Rhai payment functions using the existing RhaiDispatcher. The system transforms blocking API calls into fire-and-continue patterns where HTTP requests spawn background threads that dispatch new Rhai scripts based on API responses.

Architecture Principles

1. Non-Blocking API Calls

  • All payment functions (e.g., create_payment_intent()) return immediately
  • HTTP requests happen in background threads
  • No blocking of the main Rhai engine thread

2. Self-Dispatching Pattern

  • Worker dispatches scripts to itself
  • Same worker_id and context_id maintained
  • caller_id changes to reflect the API response source

3. Generic Request/Response Flow

  • Request functions: new_..._request pattern
  • Response scripts: new_..._response pattern
  • Consistent naming across all API operations

Flow Architecture

graph TD
    A[main.rhai] --> B[create_payment_intent]
    B --> C[HTTP Thread Spawned]
    B --> D[Return Immediately]
    C --> E[Stripe API Call]
    E --> F{API Response}
    F -->|Success| G[Dispatch: new_create_payment_intent_response]
    F -->|Error| H[Dispatch: new_create_payment_intent_error]
    G --> I[Response Script Execution]
    H --> J[Error Script Execution]

Implementation Components

1. FlowManager

use rhai_dispatcher::{RhaiDispatcher, RhaiDispatcherBuilder, RhaiDispatcherError};
use std::sync::{Arc, Mutex};

pub struct FlowManager {
    dispatcher: RhaiDispatcher,
    worker_id: String,
    context_id: String,
}

#[derive(Debug)]
pub enum FlowError {
    DispatcherError(RhaiDispatcherError),
    ConfigurationError(String),
}

impl From<RhaiDispatcherError> for FlowError {
    fn from(err: RhaiDispatcherError) -> Self {
        FlowError::DispatcherError(err)
    }
}

impl FlowManager {
    pub fn new(worker_id: String, context_id: String) -> Result<Self, FlowError> {
        let dispatcher = RhaiDispatcherBuilder::new()
            .caller_id("stripe") // API responses come from Stripe
            .worker_id(&worker_id)
            .context_id(&context_id)
            .redis_url("redis://127.0.0.1/")
            .build()?;
            
        Ok(Self {
            dispatcher,
            worker_id,
            context_id,
        })
    }
    
    pub async fn dispatch_response_script(&self, script_name: &str, data: &str) -> Result<(), FlowError> {
        let script_content = format!(
            r#"
            // Auto-generated response script for {}
            let response_data = `{}`;
            let parsed_data = parse_json(response_data);
            
            // Include the response script
            eval_file("flows/{}.rhai");
            "#,
            script_name,
            data.replace('`', r#"\`"#),
            script_name
        );
        
        self.dispatcher
            .new_play_request()
            .worker_id(&self.worker_id)
            .context_id(&self.context_id)
            .script(&script_content)
            .submit()
            .await?;
            
        Ok(())
    }
    
    pub async fn dispatch_error_script(&self, script_name: &str, error: &str) -> Result<(), FlowError> {
        let script_content = format!(
            r#"
            // Auto-generated error script for {}
            let error_data = `{}`;
            let parsed_error = parse_json(error_data);
            
            // Include the error script
            eval_file("flows/{}.rhai");
            "#,
            script_name,
            error.replace('`', r#"\`"#),
            script_name
        );
        
        self.dispatcher
            .new_play_request()
            .worker_id(&self.worker_id)
            .context_id(&self.context_id)
            .script(&script_content)
            .submit()
            .await?;
            
        Ok(())
    }
}

// Global flow manager instance
static FLOW_MANAGER: Mutex<Option<FlowManager>> = Mutex::new(None);

pub fn initialize_flow_manager(worker_id: String, context_id: String) -> Result<(), FlowError> {
    let manager = FlowManager::new(worker_id, context_id)?;
    let mut global_manager = FLOW_MANAGER.lock().unwrap();
    *global_manager = Some(manager);
    Ok(())
}

pub fn get_flow_manager() -> Result<FlowManager, FlowError> {
    let global_manager = FLOW_MANAGER.lock().unwrap();
    global_manager.as_ref()
        .ok_or_else(|| FlowError::ConfigurationError("Flow manager not initialized".to_string()))
        .map(|manager| FlowManager {
            dispatcher: manager.dispatcher.clone(), // Assuming Clone is implemented
            worker_id: manager.worker_id.clone(),
            context_id: manager.context_id.clone(),
        })
}

2. Non-Blocking Payment Functions

// Transform blocking function into non-blocking
#[rhai_fn(name = "create", return_raw)]
pub fn create_payment_intent(intent: &mut RhaiPaymentIntent) -> Result<String, Box<EvalAltResult>> {
    let form_data = prepare_payment_intent_data(intent);
    
    // Get flow manager
    let flow_manager = get_flow_manager()
        .map_err(|e| format!("Flow manager error: {:?}", e))?;
    
    // Spawn background thread for HTTP request
    let stripe_config = get_stripe_config()?;
    thread::spawn(move || {
        let rt = Runtime::new().expect("Failed to create runtime");
        rt.block_on(async {
            match make_stripe_request(&stripe_config, "payment_intents", &form_data).await {
                Ok(response) => {
                    if let Err(e) = flow_manager.dispatch_response_script(
                        "new_create_payment_intent_response",
                        &response
                    ).await {
                        eprintln!("Failed to dispatch response: {:?}", e);
                    }
                }
                Err(error) => {
                    if let Err(e) = flow_manager.dispatch_error_script(
                        "new_create_payment_intent_error", 
                        &error
                    ).await {
                        eprintln!("Failed to dispatch error: {:?}", e);
                    }
                }
            }
        });
    });
    
    // Return immediately with confirmation
    Ok("payment_intent_request_dispatched".to_string())
}

// Generic async HTTP request function
async fn make_stripe_request(
    config: &StripeConfig, 
    endpoint: &str, 
    form_data: &HashMap<String, String>
) -> Result<String, String> {
    let url = format!("{}/{}", STRIPE_API_BASE, endpoint);
    
    let response = config.client
        .post(&url)
        .basic_auth(&config.secret_key, None::<&str>)
        .form(form_data)
        .send()
        .await
        .map_err(|e| format!("HTTP request failed: {}", e))?;
    
    let response_text = response.text().await
        .map_err(|e| format!("Failed to read response: {}", e))?;
    
    let json: serde_json::Value = serde_json::from_str(&response_text)
        .map_err(|e| format!("Failed to parse JSON: {}", e))?;
    
    if json.get("error").is_some() {
        Err(response_text)
    } else {
        Ok(response_text)
    }
}

3. Flow Script Templates

Success Response Script

// flows/new_create_payment_intent_response.rhai
let payment_intent_id = parsed_data.id;
let status = parsed_data.status;

print(`✅ Payment Intent Created: ${payment_intent_id}`);
print(`Status: ${status}`);

// Continue the flow based on status
if status == "requires_payment_method" {
    print("Payment method required - ready for frontend");
    // Could dispatch another flow here
} else if status == "succeeded" {
    print("Payment completed successfully!");
    // Dispatch success notification flow
}

// Store the payment intent ID for later use
set_context("payment_intent_id", payment_intent_id);
set_context("payment_status", status);

Error Response Script

// flows/new_create_payment_intent_error.rhai
let error_type = parsed_error.error.type;
let error_message = parsed_error.error.message;

print(`❌ Payment Intent Error: ${error_type}`);
print(`Message: ${error_message}`);

// Handle different error types
if error_type == "card_error" {
    print("Card was declined - notify user");
    // Dispatch user notification flow
} else if error_type == "rate_limit_error" {
    print("Rate limited - retry later");
    // Dispatch retry flow
} else {
    print("Unknown error - log for investigation");
    // Dispatch error logging flow
}

// Store error details for debugging
set_context("last_error_type", error_type);
set_context("last_error_message", error_message);

4. Configuration and Initialization

// Add to payment module initialization
#[rhai_fn(name = "init_flows", return_raw)]
pub fn init_flows(worker_id: String, context_id: String) -> Result<String, Box<EvalAltResult>> {
    initialize_flow_manager(worker_id, context_id)
        .map_err(|e| format!("Failed to initialize flow manager: {:?}", e))?;
    
    Ok("Flow manager initialized successfully".to_string())
}

Usage Examples

1. Basic Payment Flow

// main.rhai
init_flows("worker-1", "context-123");
configure_stripe("sk_test_...");

let payment_intent = new_payment_intent()
    .amount(2000)
    .currency("usd")
    .customer("cus_customer123");

// This returns immediately, HTTP happens in background
let result = payment_intent.create();
print(`Request dispatched: ${result}`);

// Script ends here, but flow continues in background

2. Chained Flow Example

// flows/new_create_payment_intent_response.rhai
let payment_intent_id = parsed_data.id;

if parsed_data.status == "requires_payment_method" {
    // Chain to next operation
    let subscription = new_subscription()
        .customer(get_context("customer_id"))
        .add_price("price_monthly");
    
    // This will trigger new_create_subscription_response flow
    subscription.create();
}

Benefits

1. Non-Blocking Execution

  • Main Rhai script never blocks on HTTP requests
  • Multiple API calls can happen concurrently
  • Engine remains responsive for other scripts

2. Event-Driven Architecture

  • Clear separation between request and response handling
  • Easy to add new flow steps
  • Composable and chainable operations

3. Error Handling

  • Dedicated error flows for each operation
  • Contextual error information preserved
  • Retry and recovery patterns possible

4. Scalability

  • Each HTTP request runs in its own thread
  • No shared state between concurrent operations
  • Redis-based dispatch scales horizontally

Implementation Checklist

  • Implement FlowManager with RhaiDispatcher integration
  • Convert all payment functions to non-blocking pattern
  • Create flow script templates for all operations
  • Add flow initialization functions
  • Test with example payment flows
  • Update documentation and examples

Migration Path

  1. Phase 1: Implement FlowManager and basic infrastructure
  2. Phase 2: Convert payment_intent functions to non-blocking
  3. Phase 3: Convert remaining payment functions (products, prices, subscriptions, coupons)
  4. Phase 4: Create comprehensive flow script library
  5. Phase 5: Add advanced features (retries, timeouts, monitoring)