3.1 KiB
3.1 KiB
Flow Manager
A simple, generic flow manager for Rhai scripts with builder pattern API and non-blocking execution.
Features
- Builder Pattern API: Fluent interface for creating steps and flows
- Non-blocking Execution: Uses
tokio::spawn
for async step execution - Simple State Management: Redis-based state tracking
- Retry Logic: Configurable timeouts and retry attempts
- Mock API Support: Built-in mock API for testing different scenarios
- RhaiDispatcher Integration: Seamless integration with existing Rhai execution system
Quick Start
use flow::{new_step, new_flow, FlowExecutor};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create executor
let executor = FlowExecutor::new("redis://127.0.0.1/").await?;
// Build steps using fluent API
let step1 = new_step("stripe_config")
.script("stripe_config_script")
.timeout(5)
.retries(2)
.build();
let step2 = new_step("stripe_config_confirm")
.script("script that looks up stripe config confirmation in db")
.timeout(5)
.build();
let step3 = new_step("create_product")
.script("create_product_script")
.timeout(10)
.retries(1)
.build();
// Build flow using fluent API
let flow = new_flow("stripe_payment_request")
.add_step(step1)
.add_step(step2)
.add_step(step3)
.build();
// Execute flow (non-blocking)
let result = executor.execute_flow(flow).await?;
println!("Flow started: {}", result);
Ok(())
}
Architecture
Core Components
- Types (
types.rs
): Core data structures (Flow, Step, Status enums) - Builder (
builder.rs
): Fluent API for constructing flows and steps - State (
state.rs
): Simple Redis-based state management - Executor (
executor.rs
): Non-blocking flow execution engine - Mock API (
mock_api.rs
): Testing utilities for different response scenarios
State Management
The system tracks minimal state:
Flow State:
flow_id: String
- unique identifierstatus: FlowStatus
(Created, Running, Completed, Failed)current_step: Option<String>
- currently executing stepcompleted_steps: Vec<String>
- list of finished steps
Step State:
step_id: String
- unique identifierstatus: StepStatus
(Pending, Running, Completed, Failed)attempt_count: u32
- for retry logicoutput: Option<String>
- result from script execution
Storage:
- Redis key-value pairs:
flow:{flow_id}
andstep:{flow_id}:{step_id}
Examples
Run the example:
cd ../rhailib/src/flow
cargo run --example stripe_flow_example
Testing
cargo test
Note: Some tests require Redis to be running. Set SKIP_REDIS_TESTS=1
to skip Redis-dependent tests.
Integration
The flow manager integrates with:
- RhaiDispatcher: For executing Rhai scripts
- Redis: For state persistence
- tokio: For non-blocking async execution
This provides a simple, reliable foundation for orchestrating complex workflows while maintaining the non-blocking execution pattern established in the payment system.