Rationale for Orchestrator
We may have scripts that run asynchrounsly, depend on human input or depend on other scripts to complete. We want to be able to implement high-level workflows of rhai scripts.
Design
Direct Acyclic Graphs (DAGs) are a natural fit for representing workflows.
Requirements
- Uses Direct Acyclic Graphs (DAGs) to represent workflows.
- Each step in the workflow defines the script to execute, the inputs to pass to it, and the outputs to expect from it.
- Simplicity: the output cases are binary (success or failure), and params inputted / outputted are simple key-value pairs.
- Multiple steps can depend on the same step.
- Scripts are executed using RhaiDispatcher.
Architecture
The Orchestrator is a simple DAG-based workflow execution system that extends the heromodels flow structures to support workflows with dependencies and distributed script execution.
Core Component
graph TB
    subgraph "Orchestrator"
        O[Orchestrator] --> RE[RhaiExecutor Trait]
        O --> DB[(Database)]
    end
    
    subgraph "Executor Implementations"
        RE --> RD[RhaiDispatcher]
        RE --> WS[WebSocketClient]
        RE --> HTTP[HttpClient]
        RE --> LOCAL[LocalExecutor]
    end
    
    subgraph "Data Models (heromodels)"
        F[Flow] --> FS[FlowStep]
        FS --> SR[SignatureRequirement]
    end
    
    subgraph "Infrastructure"
        RD --> RQ[Redis Queues]
        RD --> W[Workers]
        WS --> WSS[WebSocket Server]
        HTTP --> API[REST API]
    end
Execution Abstraction
The orchestrator uses a trait-based approach for script execution, allowing different execution backends:
RhaiExecutor Trait
use rhai_dispatcher::{PlayRequestBuilder, RhaiTaskDetails, RhaiDispatcherError};
#[async_trait]
pub trait RhaiExecutor {
    async fn call(&self, request: PlayRequestBuilder<'_>) -> Result<RhaiTaskDetails, RhaiDispatcherError>;
}
Executor Implementations
RhaiDispatcher Implementation:
pub struct DispatcherExecutor {
    dispatcher: RhaiDispatcher,
}
#[async_trait]
impl RhaiExecutor for DispatcherExecutor {
    async fn call(&self, request: PlayRequestBuilder<'_>) -> Result<RhaiTaskDetails, RhaiDispatcherError> {
        // Use RhaiDispatcher to execute script via Redis queues
        request.await_response().await
    }
}
WebSocket Client Implementation:
pub struct WebSocketExecutor {
    ws_client: WebSocketClient,
    endpoint: String,
}
#[async_trait]
impl RhaiExecutor for WebSocketExecutor {
    async fn call(&self, request: PlayRequestBuilder<'_>) -> Result<RhaiTaskDetails, RhaiDispatcherError> {
        // Build the PlayRequest and send via WebSocket
        let play_request = request.build()?;
        
        // Send script execution request via WebSocket
        let ws_message = serde_json::to_string(&play_request)?;
        self.ws_client.send(ws_message).await?;
        
        // Wait for response and convert to RhaiTaskDetails
        let response = self.ws_client.receive().await?;
        serde_json::from_str(&response).map_err(RhaiDispatcherError::from)
    }
}
HTTP Client Implementation:
pub struct HttpExecutor {
    http_client: reqwest::Client,
    base_url: String,
}
#[async_trait]
impl RhaiExecutor for HttpExecutor {
    async fn call(&self, request: PlayRequestBuilder<'_>) -> Result<RhaiTaskDetails, RhaiDispatcherError> {
        // Build the PlayRequest and send via HTTP
        let play_request = request.build()?;
        
        // Send script execution request via HTTP API
        let response = self.http_client
            .post(&format!("{}/execute", self.base_url))
            .json(&play_request)
            .send()
            .await?;
            
        response.json().await.map_err(RhaiDispatcherError::from)
    }
}
Local Executor Implementation:
pub struct LocalExecutor {
    engine: Engine,
}
#[async_trait]
impl RhaiExecutor for LocalExecutor {
    async fn call(&self, request: PlayRequestBuilder<'_>) -> Result<RhaiTaskDetails, RhaiDispatcherError> {
        // Build the PlayRequest and execute locally
        let play_request = request.build()?;
        
        // Execute script directly in local Rhai engine
        let result = self.engine.eval::<String>(&play_request.script);
        
        // Convert to RhaiTaskDetails format
        let task_details = RhaiTaskDetails {
            task_id: play_request.id,
            script: play_request.script,
            status: if result.is_ok() { "completed".to_string() } else { "error".to_string() },
            output: result.ok(),
            error: result.err().map(|e| e.to_string()),
            created_at: chrono::Utc::now(),
            updated_at: chrono::Utc::now(),
            caller_id: "local".to_string(),
            context_id: play_request.context_id,
            worker_id: "local".to_string(),
        };
        
        Ok(task_details)
    }
}
Data Model Extensions
Simple extensions to the existing heromodels flow structures:
Enhanced FlowStep Model
// Extends heromodels::models::flow::FlowStep
pub struct FlowStep {
    // ... existing heromodels::models::flow::FlowStep fields
    pub script: String,                // Rhai script to execute
    pub depends_on: Vec<u32>,          // IDs of steps this step depends on
    pub context_id: String,            // Execution context (circle)
    pub inputs: HashMap<String, String>, // Input parameters
    pub outputs: HashMap<String, String>, // Output results
}
Execution Flow
sequenceDiagram
    participant Client as Client
    participant O as Orchestrator
    participant RE as RhaiExecutor
    participant DB as Database
    
    Client->>O: Submit Flow
    O->>DB: Store flow and steps
    O->>O: Find steps with no dependencies
    
    loop Until all steps complete
        O->>RE: Execute ready steps
        RE-->>O: Return results
        O->>DB: Update step status
        O->>O: Find newly ready steps
    end
    
    O->>Client: Flow completed
Flexible Orchestrator Implementation
use rhai_dispatcher::{RhaiDispatcher, PlayRequestBuilder};
use std::collections::HashSet;
pub struct Orchestrator<E: RhaiExecutor> {
    executor: E,
    database: Arc<Database>,
}
impl<E: RhaiExecutor> Orchestrator<E> {
    pub fn new(executor: E, database: Arc<Database>) -> Self {
        Self { executor, database }
    }
    
    pub async fn execute_flow(&self, flow: Flow) -> Result<(), OrchestratorError> {
        // 1. Store flow in database
        self.database.collection::<Flow>()?.set(&flow)?;
        
        // 2. Find steps with no dependencies (depends_on is empty)
        let mut pending_steps: Vec<FlowStep> = flow.steps.clone();
        let mut completed_steps: HashSet<u32> = HashSet::new();
        
        while !pending_steps.is_empty() {
            // Find ready steps (all dependencies completed)
            let ready_steps: Vec<FlowStep> = pending_steps
                .iter()
                .filter(|step| {
                    step.depends_on.iter().all(|dep_id| completed_steps.contains(dep_id))
                })
                .cloned()
                .collect();
            
            if ready_steps.is_empty() {
                return Err(OrchestratorError::NoReadySteps);
            }
            
            // Execute ready steps concurrently
            let mut tasks = Vec::new();
            for step in ready_steps {
                let executor = &self.executor;
                let task = async move {
                    // Create PlayRequestBuilder for this step
                    let request = RhaiDispatcher::new_play_request()
                        .script(&step.script)
                        .context_id(&step.context_id)
                        .worker_id(&step.worker_id);
                    
                    // Execute via the trait
                    let result = executor.call(request).await?;
                    Ok((step.base_data.id, result))
                };
                tasks.push(task);
            }
            
            // Wait for all ready steps to complete
            let results = futures::future::try_join_all(tasks).await?;
            
            // Update step status and mark as completed
            for (step_id, task_details) in results {
                if task_details.status == "completed" {
                    completed_steps.insert(step_id);
                    // Update step status in database
                    // self.update_step_status(step_id, "completed", task_details.output).await?;
                } else {
                    return Err(OrchestratorError::StepFailed(step_id, task_details.error));
                }
            }
            
            // Remove completed steps from pending
            pending_steps.retain(|step| !completed_steps.contains(&step.base_data.id));
        }
        
        Ok(())
    }
    
    pub async fn get_flow_status(&self, flow_id: u32) -> Result<FlowStatus, OrchestratorError> {
        // Return current status of flow and all its steps
        let flow = self.database.collection::<Flow>()?.get(flow_id)?;
        // Implementation would check step statuses and return overall flow status
        Ok(FlowStatus::Running) // Placeholder
    }
}
pub enum OrchestratorError {
    DatabaseError(String),
    ExecutorError(RhaiDispatcherError),
    NoReadySteps,
    StepFailed(u32, Option<String>),
}
pub enum FlowStatus {
    Pending,
    Running,
    Completed,
    Failed,
}
// Usage examples:
// let orchestrator = Orchestrator::new(DispatcherExecutor::new(dispatcher), db);
// let orchestrator = Orchestrator::new(WebSocketExecutor::new(ws_client), db);
// let orchestrator = Orchestrator::new(HttpExecutor::new(http_client), db);
// let orchestrator = Orchestrator::new(LocalExecutor::new(engine), db);
Key Features
- DAG Validation: Ensures no circular dependencies exist in the depends_onrelationships
- Parallel Execution: Executes independent steps concurrently via multiple workers
- Simple Dependencies: Each step lists the step IDs it depends on
- RhaiDispatcher Integration: Uses existing dispatcher for script execution
- Binary Outcomes: Steps either succeed or fail (keeping it simple as per requirements)
This simple architecture provides DAG-based workflow execution while leveraging the existing rhailib infrastructure and keeping complexity minimal.