This repository has been archived on 2025-08-27. You can view files and clone it, but cannot push or open issues or pull requests.
Files
rhailib/src/flow/src/executor.rs
Timur Gordon 6d271068fc rhailib fixes
2025-08-05 16:06:48 +02:00

243 lines
8.2 KiB
Rust

//! Simple flow executor with non-blocking step execution
use crate::types::{Flow, Step, FlowStatus, StepStatus};
use crate::state::{FlowState, StepState, StateManager};
use crate::mock_api::MockAPI;
use rhai_dispatcher::RhaiDispatcherBuilder;
use std::sync::Arc;
use tokio::time::{timeout, Duration};
/// Simple flow executor
pub struct FlowExecutor {
state_manager: Arc<StateManager>,
mock_api: Arc<MockAPI>,
redis_url: String,
}
impl FlowExecutor {
pub async fn new(redis_url: &str) -> Result<Self, Box<dyn std::error::Error>> {
let state_manager = Arc::new(StateManager::new(redis_url).await?);
let mock_api = Arc::new(MockAPI::default());
Ok(Self {
state_manager,
mock_api,
redis_url: redis_url.to_string(),
})
}
/// Execute a flow non-blocking
pub async fn execute_flow(&self, flow: Flow) -> Result<String, Box<dyn std::error::Error>> {
// Initialize flow state
let mut flow_state = FlowState::new(flow.id.clone());
flow_state.status = FlowStatus::Running;
self.state_manager.save_flow_state(&flow_state).await?;
// Initialize step states
for step in &flow.steps {
let step_state = StepState::new(step.id.clone());
self.state_manager.save_step_state(&flow.id, &step_state).await?;
}
// Spawn flow execution in background
let flow_id = flow.id.clone();
let state_manager = self.state_manager.clone();
let mock_api = self.mock_api.clone();
let redis_url = self.redis_url.clone();
tokio::spawn(async move {
if let Err(e) = Self::execute_flow_steps(flow, state_manager, mock_api, redis_url).await {
eprintln!("Flow execution error: {}", e);
}
});
Ok(format!("flow_execution_started:{}", flow_id))
}
/// Execute all steps in a flow
async fn execute_flow_steps(
flow: Flow,
state_manager: Arc<StateManager>,
mock_api: Arc<MockAPI>,
redis_url: String,
) -> Result<(), Box<dyn std::error::Error>> {
let mut flow_state = state_manager.load_flow_state(&flow.id).await?
.ok_or("Flow state not found")?;
// Execute steps sequentially
for step in &flow.steps {
flow_state.current_step = Some(step.id.clone());
state_manager.save_flow_state(&flow_state).await?;
match Self::execute_step_with_retries(
step,
&flow.id,
state_manager.clone(),
mock_api.clone(),
redis_url.clone(),
).await {
Ok(_) => {
flow_state.completed_steps.push(step.id.clone());
}
Err(e) => {
eprintln!("Step {} failed: {}", step.name, e);
flow_state.status = FlowStatus::Failed;
state_manager.save_flow_state(&flow_state).await?;
return Err(e);
}
}
}
// Mark flow as completed
flow_state.status = FlowStatus::Completed;
flow_state.current_step = None;
state_manager.save_flow_state(&flow_state).await?;
Ok(())
}
/// Execute a single step with retry logic
async fn execute_step_with_retries(
step: &Step,
flow_id: &str,
state_manager: Arc<StateManager>,
mock_api: Arc<MockAPI>,
redis_url: String,
) -> Result<(), Box<dyn std::error::Error>> {
let mut step_state = state_manager.load_step_state(flow_id, &step.id).await?
.ok_or("Step state not found")?;
let max_attempts = step.max_retries + 1;
for attempt in 0..max_attempts {
step_state.attempt_count = attempt + 1;
step_state.status = StepStatus::Running;
state_manager.save_step_state(flow_id, &step_state).await?;
match Self::execute_single_step(step, &mock_api, &redis_url).await {
Ok(output) => {
step_state.status = StepStatus::Completed;
step_state.output = Some(output);
state_manager.save_step_state(flow_id, &step_state).await?;
return Ok(());
}
Err(e) => {
if attempt + 1 >= max_attempts {
step_state.status = StepStatus::Failed;
state_manager.save_step_state(flow_id, &step_state).await?;
return Err(e);
}
// Wait before retry
tokio::time::sleep(Duration::from_millis(1000)).await;
}
}
}
Err("Max retries exceeded".into())
}
/// Execute a single step
async fn execute_single_step(
step: &Step,
mock_api: &MockAPI,
redis_url: &str,
) -> Result<String, Box<dyn std::error::Error>> {
// Execute with timeout
let result = timeout(step.timeout(), async {
// For demo, we'll use mock API calls instead of real Rhai execution
// In real implementation, this would execute the Rhai script
if step.script.contains("mock_api_call") {
// Extract endpoint from script (simple parsing)
let endpoint = if step.script.contains("stripe_config") {
"stripe_config"
} else if step.script.contains("create_product") {
"create_product"
} else {
"default_endpoint"
};
mock_api.call(endpoint).await
} else {
// For non-mock scripts, simulate Rhai execution via dispatcher
Self::execute_rhai_script(&step.script, redis_url).await
}
}).await;
match result {
Ok(Ok(output)) => Ok(output),
Ok(Err(e)) => Err(e.into()),
Err(_) => Err("Step execution timed out".into()),
}
}
/// Execute Rhai script using dispatcher (simplified)
async fn execute_rhai_script(
script: &str,
redis_url: &str,
) -> Result<String, Box<dyn std::error::Error>> {
let dispatcher = RhaiDispatcherBuilder::new()
.caller_id("flow_executor")
.redis_url(redis_url)
.build()?;
let result = dispatcher
.new_play_request()
.worker_id("flow_worker")
.script(script)
.timeout(Duration::from_secs(30))
.await_response()
.await;
match result {
Ok(task_details) => {
if task_details.status == "completed" {
Ok(task_details.output.unwrap_or_default())
} else {
Err(format!("Script execution failed: {:?}", task_details.error).into())
}
}
Err(e) => Err(format!("Dispatcher error: {}", e).into()),
}
}
/// Get flow status
pub async fn get_flow_status(&self, flow_id: &str) -> Result<Option<FlowState>, Box<dyn std::error::Error>> {
self.state_manager.load_flow_state(flow_id).await
}
/// Get step status
pub async fn get_step_status(&self, flow_id: &str, step_id: &str) -> Result<Option<StepState>, Box<dyn std::error::Error>> {
self.state_manager.load_step_state(flow_id, step_id).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::builder::{new_step, new_flow};
#[tokio::test]
async fn test_flow_execution() {
// This test requires Redis to be running
// Skip if Redis is not available
if std::env::var("SKIP_REDIS_TESTS").is_ok() {
return;
}
let executor = FlowExecutor::new("redis://127.0.0.1/").await.unwrap();
let step1 = new_step("test_step")
.script("mock_api_call stripe_config")
.timeout(5)
.retries(1)
.build();
let flow = new_flow("test_flow")
.add_step(step1)
.build();
let result = executor.execute_flow(flow).await;
assert!(result.is_ok());
assert!(result.unwrap().starts_with("flow_execution_started:"));
}
}