Files
horus/lib/clients/coordinator
Timur Gordon f66edba1d3 Add coordinator client library, installation scripts, and new test runners
- Add coordinator client library to workspace
- Add installation documentation and heroscript
- Add new test runners for Osiris and Sal
- Update hero runner test to handle invalid heroscript errors
- Update README with installation instructions
2025-11-17 10:56:13 +01:00
..

Hero Coordinator Client

Rust client library for interacting with the Hero Coordinator JSON-RPC API.

Features

  • Actor Management: Create and load actors
  • Context Management: Create contexts with admin/reader/executor permissions
  • Runner Management: Register runners in contexts
  • Job Management: Create jobs with dependencies
  • Flow Management: Create and manage job flows (DAGs)
  • Flow Polling: Poll flows until completion with timeout support
  • Helper Methods: *_create_or_load methods for idempotent operations

Installation

Add to your Cargo.toml:

[dependencies]
hero-coordinator-client = { path = "path/to/lib/clients/coordinator" }

Usage

Basic Example

use hero_coordinator_client::{CoordinatorClient, models::*};
use std::collections::HashMap;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create client
    let client = CoordinatorClient::new("http://127.0.0.1:9652")?;
    
    // Create actor
    let actor = client.actor_create(ActorCreate {
        id: 11001,
        pubkey: "demo-pubkey".to_string(),
        address: vec!["127.0.0.1".parse()?],
    }).await?;
    
    // Create context
    let context = client.context_create(ContextCreate {
        id: 2,
        admins: vec![11001],
        readers: vec![11001],
        executors: vec![11001],
    }).await?;
    
    // Create runner
    let runner = client.runner_create(2, RunnerCreate {
        id: 12001,
        pubkey: "".to_string(),
        address: "127.0.0.1".parse()?,
        topic: "supervisor.rpc".to_string(),
        script_type: ScriptType::Python,
        local: false,
        secret: None,
    }).await?;
    
    // Create job
    let job = client.job_create(2, JobCreate {
        id: 20000,
        caller_id: 11001,
        context_id: 2,
        script: "print('Hello from job')".to_string(),
        script_type: ScriptType::Python,
        timeout: 60,
        retries: 0,
        env_vars: HashMap::new(),
        prerequisites: vec![],
        depends: vec![],
    }).await?;
    
    // Create flow
    let flow = client.flow_create(2, FlowCreate {
        id: 13001,
        caller_id: 11001,
        context_id: 2,
        jobs: vec![20000],
        env_vars: HashMap::new(),
    }).await?;
    
    // Start flow
    client.flow_start(2, 13001).await?;
    
    // Poll until completion
    let final_flow = client.flow_poll_until_complete(
        2,
        13001,
        std::time::Duration::from_secs(2),
        std::time::Duration::from_secs(600),
    ).await?;
    
    println!("Flow status: {:?}", final_flow.status);
    Ok(())
}

Idempotent Operations

Use *_create_or_load methods to handle existing resources:

// Will create if doesn't exist, or load if it does
let actor = client.actor_create_or_load(ActorCreate {
    id: 11001,
    pubkey: "demo-pubkey".to_string(),
    address: vec!["127.0.0.1".parse()?],
}).await?;

Flow with Dependencies

Create a chain of dependent jobs:

// Job 0 (root)
let job0 = client.job_create(2, JobCreate {
    id: 20000,
    caller_id: 11001,
    context_id: 2,
    script: "print('Job 0')".to_string(),
    script_type: ScriptType::Python,
    timeout: 60,
    retries: 0,
    env_vars: HashMap::new(),
    prerequisites: vec![],
    depends: vec![],
}).await?;

// Job 1 (depends on Job 0)
let job1 = client.job_create(2, JobCreate {
    id: 20001,
    caller_id: 11001,
    context_id: 2,
    script: "print('Job 1')".to_string(),
    script_type: ScriptType::Python,
    timeout: 60,
    retries: 0,
    env_vars: HashMap::new(),
    prerequisites: vec![],
    depends: vec![20000], // Depends on job0
}).await?;

// Create flow with both jobs
let flow = client.flow_create(2, FlowCreate {
    id: 13001,
    caller_id: 11001,
    context_id: 2,
    jobs: vec![20000, 20001],
    env_vars: HashMap::new(),
}).await?;

Examples

See the examples/ directory for complete examples:

# Run the flow demo
COORDINATOR_URL=http://127.0.0.1:9652 cargo run --example flow_demo -- \
  --dst-ip 127.0.0.1 \
  --context-id 2 \
  --actor-id 11001 \
  --runner-id 12001 \
  --flow-id 13001 \
  --jobs 3

API Methods

Actor

  • actor_create(actor: ActorCreate) -> Actor
  • actor_load(id: u32) -> Actor
  • actor_create_or_load(actor: ActorCreate) -> Actor

Context

  • context_create(context: ContextCreate) -> Context
  • context_load(id: u32) -> Context
  • context_create_or_load(context: ContextCreate) -> Context

Runner

  • runner_create(context_id: u32, runner: RunnerCreate) -> Runner
  • runner_load(context_id: u32, id: u32) -> Runner
  • runner_create_or_load(context_id: u32, runner: RunnerCreate) -> Runner

Job

  • job_create(context_id: u32, job: JobCreate) -> Job
  • job_load(context_id: u32, caller_id: u32, id: u32) -> Job
  • job_create_or_load(context_id: u32, job: JobCreate) -> Job

Flow

  • flow_create(context_id: u32, flow: FlowCreate) -> Flow
  • flow_load(context_id: u32, id: u32) -> Flow
  • flow_create_or_load(context_id: u32, flow: FlowCreate) -> Flow
  • flow_dag(context_id: u32, id: u32) -> FlowDag
  • flow_start(context_id: u32, id: u32) -> bool
  • flow_poll_until_complete(context_id, flow_id, poll_interval, timeout) -> Flow

Message

  • message_create(context_id: u32, message: MessageCreate) -> Message
  • message_load(context_id: u32, id: u32) -> Message

Error Handling

The client uses a custom CoordinatorError type:

use hero_coordinator_client::error::CoordinatorError;

match client.actor_create(actor).await {
    Ok(actor) => println!("Created: {:?}", actor),
    Err(CoordinatorError::AlreadyExists) => {
        // Resource already exists, load it instead
        let actor = client.actor_load(actor.id).await?;
    }
    Err(e) => return Err(e.into()),
}

License

MIT OR Apache-2.0