From 2c24b120dec843fe0745968a2e61903d841c459e Mon Sep 17 00:00:00 2001 From: Timur Gordon <31495328+timurgordon@users.noreply.github.com> Date: Fri, 14 Nov 2025 11:01:43 +0100 Subject: [PATCH] fix: rename overview.md files to avoid conflicts and add collection name --- bin/coordinator/src/models/runner.rs | 2 - bin/coordinator/src/rpc.rs | 6 -- bin/coordinator/src/service.rs | 4 +- bin/runners/hero/README.md | 25 ++++++-- bin/runners/hero/src/executor.rs | 24 +++++--- bin/supervisor/src/store.rs | 3 +- bin/supervisor/src/supervisor.rs | 1 - docs/.collection | 1 + docs/README.md | 6 +- docs/architecture.md | 6 +- .../{overview.md => coordinator.md} | 0 docs/runner/{overview.md => runners.md} | 0 .../supervisor/{overview.md => supervisor.md} | 0 lib/clients/osiris/src/lib.rs | 1 - lib/clients/supervisor/src/wasm.rs | 15 +---- lib/models/job/lib.rs | 17 +----- lib/runner/runner_trait.rs | 2 +- lib/runner/script_mode.rs | 1 - tests/runner_hero.rs | 59 +++++++++++-------- tests/supervisor.rs | 3 +- 20 files changed, 85 insertions(+), 91 deletions(-) rename docs/coordinator/{overview.md => coordinator.md} (100%) rename docs/runner/{overview.md => runners.md} (100%) rename docs/supervisor/{overview.md => supervisor.md} (100%) diff --git a/bin/coordinator/src/models/runner.rs b/bin/coordinator/src/models/runner.rs index 82b3fe6..10343b3 100644 --- a/bin/coordinator/src/models/runner.rs +++ b/bin/coordinator/src/models/runner.rs @@ -13,8 +13,6 @@ pub struct Runner { pub address: IpAddr, /// Needs to be set by the runner, usually `runner, @@ -150,7 +148,6 @@ impl RunnerCreate { pubkey, address, topic, - executor, local, secret, } = self; @@ -160,7 +157,6 @@ impl RunnerCreate { pubkey, address, topic, - executor, local, secret, created_at: ts, @@ -211,7 +207,6 @@ pub struct JobCreate { pub context_id: u32, pub script: String, pub runner: Option, - pub executor: Option, pub timeout: u32, pub retries: u8, pub env_vars: HashMap, @@ -231,7 +226,6 @@ impl JobCreate { context_id: self.context_id.to_string(), payload: self.script, runner: self.runner.unwrap_or_else(|| "default-runner".to_string()), - executor: self.executor.unwrap_or_else(|| "python".to_string()), timeout: self.timeout as u64, env_vars: self.env_vars, created_at: Utc::now(), diff --git a/bin/coordinator/src/service.rs b/bin/coordinator/src/service.rs index 516414a..017c821 100644 --- a/bin/coordinator/src/service.rs +++ b/bin/coordinator/src/service.rs @@ -425,7 +425,7 @@ impl AppService { context_id, flow_id, message: "job.run".to_string(), - message_type: job.executor.clone(), + message_type: job.runner.clone(), message_format_type: MessageFormatType::Text, timeout: job.timeout as u32, timeout_ack: 10, @@ -503,7 +503,7 @@ impl AppService { context_id, flow_id, // Add flow_id for DAG tracking message: "job.run".to_string(), - message_type: job.executor.clone(), + message_type: job.runner.clone(), message_format_type: MessageFormatType::Text, timeout: job.timeout as u32, timeout_ack: 10, diff --git a/bin/runners/hero/README.md b/bin/runners/hero/README.md index 7505fbc..7f4e6a1 100644 --- a/bin/runners/hero/README.md +++ b/bin/runners/hero/README.md @@ -4,16 +4,16 @@ A specialized runner for the Hero ecosystem that executes heroscripts using the ## Overview -The Hero runner executes heroscripts by calling `hero run -h ` for each job. This makes it ideal for: +The Hero runner executes heroscripts by piping the payload to `hero run -s` via stdin for each job. This makes it ideal for: - Running heroscripts from job payloads -- Executing Hero automation tasks +- Executing Hero automation tasks (e.g., `!!git.list`, `!!docker.start`) - Integrating with the Hero CLI ecosystem - Running scripted workflows ## Features -- **Heroscript Execution**: Executes `hero run -h ` for each job +- **Heroscript Execution**: Pipes payload to `hero run -s` via stdin (no temp files) - **Environment Variables**: Passes job environment variables to the hero command - **Timeout Support**: Respects job timeout settings - **Signature Verification**: Verifies job signatures before execution @@ -38,15 +38,28 @@ herorunner my-hero-runner --redis-url redis://localhost:6379 ## Job Payload Format -The job payload should contain the heroscript content that will be passed to `hero run -h`. +The job payload should contain the heroscript content. The runner will pipe it directly to `hero run -s` via stdin. -### Example Payload +### Example Payloads +**Simple print:** ``` print("Hello from heroscript!") ``` -The runner will execute: `hero run -h 'print("Hello from heroscript!")'` +**Hero actions:** +``` +!!git.list +``` + +**Multi-line script:** +``` +!!git.list +print("Repositories listed") +!!docker.ps +``` + +The runner executes: `echo "" | hero run -s` ## Examples diff --git a/bin/runners/hero/src/executor.rs b/bin/runners/hero/src/executor.rs index fcbda95..85ef13b 100644 --- a/bin/runners/hero/src/executor.rs +++ b/bin/runners/hero/src/executor.rs @@ -25,21 +25,22 @@ impl HeroExecutor { /// Execute a command from the job payload fn execute_command(&self, job: &Job) -> Result> { - info!("Runner '{}': Executing hero run -h for job {}", self.runner_id, job.id); + info!("Runner '{}': Executing hero run for job {}", self.runner_id, job.id); - // Always execute: hero run -h + // Execute: hero run -s (reads from stdin) let mut cmd = Command::new("hero"); - cmd.args(&["run", "-h", &job.payload]); + cmd.args(&["run", "-s"]); - debug!("Runner '{}': Executing: hero run -h {}", self.runner_id, job.payload); + debug!("Runner '{}': Executing: hero run -s with stdin", self.runner_id); // Set environment variables from job for (key, value) in &job.env_vars { cmd.env(key, value); } - // Configure stdio - cmd.stdout(Stdio::piped()) + // Configure stdio - pipe stdin to send heroscript content + cmd.stdin(Stdio::piped()) + .stdout(Stdio::piped()) .stderr(Stdio::piped()); // Execute command with timeout @@ -49,7 +50,16 @@ impl HeroExecutor { info!("Runner '{}': Starting command execution for job {}", self.runner_id, job.id); let mut child = cmd.spawn() - .map_err(|e| format!("Failed to spawn 'hero run -h': {}", e))?; + .map_err(|e| format!("Failed to spawn 'hero run -s': {}", e))?; + + // Write heroscript payload to stdin + if let Some(mut stdin) = child.stdin.take() { + use std::io::Write; + stdin.write_all(job.payload.as_bytes()) + .map_err(|e| format!("Failed to write to stdin: {}", e))?; + // Close stdin to signal EOF + drop(stdin); + } // Wait for command with timeout let output = loop { diff --git a/bin/supervisor/src/store.rs b/bin/supervisor/src/store.rs index 1d66135..5f99d1c 100644 --- a/bin/supervisor/src/store.rs +++ b/bin/supervisor/src/store.rs @@ -152,11 +152,10 @@ mod tests { } fn create_test_job(id: &str, runner: &str) -> Job { - let mut job = JobBuilder::new() + let job = JobBuilder::new() .caller_id("test_caller") .context_id("test_context") .runner(runner) - .executor("test") .payload("test payload") .build() .unwrap(); diff --git a/bin/supervisor/src/supervisor.rs b/bin/supervisor/src/supervisor.rs index 7f84461..ad903ec 100644 --- a/bin/supervisor/src/supervisor.rs +++ b/bin/supervisor/src/supervisor.rs @@ -81,7 +81,6 @@ impl Supervisor { .context_id("ping_context") .payload("ping") .runner(runner_id) - .executor("ping") .timeout(10) .build() .map_err(|e| SupervisorError::QueueError { diff --git a/docs/.collection b/docs/.collection index e69de29..b87040f 100644 --- a/docs/.collection +++ b/docs/.collection @@ -0,0 +1 @@ +horus \ No newline at end of file diff --git a/docs/README.md b/docs/README.md index 91965cd..c610318 100644 --- a/docs/README.md +++ b/docs/README.md @@ -15,19 +15,19 @@ Horus is a distributed job execution system with three layers: Coordinator, Supe ### Coordinator Workflow orchestration engine for DAG-based execution. -- [Overview](./coordinator/overview.md) +- [Overview](./coordinator/coordinator.md) ### Supervisor Job dispatcher with authentication and routing. -- [Overview](./supervisor/overview.md) +- [Overview](./supervisor/supervisor.md) - [Authentication](./supervisor/auth.md) - [OpenRPC API](./supervisor/openrpc.json) ### Runners Job executors for different workload types. -- [Runner Overview](./runner/overview.md) +- [Runner Overview](./runner/runners.md) - [Hero Runner](./runner/hero.md) - Heroscript execution - [SAL Runner](./runner/sal.md) - System operations - [Osiris Runner](./runner/osiris.md) - Database operations diff --git a/docs/architecture.md b/docs/architecture.md index c3bbf99..cf5e527 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -56,7 +56,7 @@ Horus is a hierarchical orchestration runtime with three layers: Coordinator, Su - Parallel execution is required - Complex data pipelines -[→ Coordinator Documentation](./coordinator/overview.md) +[→ Coordinator Documentation](./coordinator/coordinator.md) ### 2. Supervisor (Required) **Purpose:** Job admission, authentication, and routing @@ -74,7 +74,7 @@ Horus is a hierarchical orchestration runtime with three layers: Coordinator, Su - Signature-based authentication - Runner health monitoring -[→ Supervisor Documentation](./supervisor/overview.md) +[→ Supervisor Documentation](./supervisor/supervisor.md) ### 3. Runners (Required) **Purpose:** Execute actual job workloads @@ -90,7 +90,7 @@ Horus is a hierarchical orchestration runtime with three layers: Coordinator, Su - Timeout support - Environment variable handling -[→ Runner Documentation](./runner/overview.md) +[→ Runner Documentation](./runner/runners.md) ## Communication Protocols diff --git a/docs/coordinator/overview.md b/docs/coordinator/coordinator.md similarity index 100% rename from docs/coordinator/overview.md rename to docs/coordinator/coordinator.md diff --git a/docs/runner/overview.md b/docs/runner/runners.md similarity index 100% rename from docs/runner/overview.md rename to docs/runner/runners.md diff --git a/docs/supervisor/overview.md b/docs/supervisor/supervisor.md similarity index 100% rename from docs/supervisor/overview.md rename to docs/supervisor/supervisor.md diff --git a/lib/clients/osiris/src/lib.rs b/lib/clients/osiris/src/lib.rs index 0ce00ab..4051b8f 100644 --- a/lib/clients/osiris/src/lib.rs +++ b/lib/clients/osiris/src/lib.rs @@ -229,7 +229,6 @@ impl OsirisClient { .context_id("command-execution") .runner(&self.runner_name) .payload(script) - .executor("rhai") .timeout(self.timeout) .build() .map_err(|e| OsirisClientError::CommandFailed(format!("Failed to build job: {}", e)))?; diff --git a/lib/clients/supervisor/src/wasm.rs b/lib/clients/supervisor/src/wasm.rs index 95c497a..8be6349 100644 --- a/lib/clients/supervisor/src/wasm.rs +++ b/lib/clients/supervisor/src/wasm.rs @@ -254,14 +254,12 @@ impl WasmSupervisorClient { pub async fn create_job_with_secret(&self, secret: String, job: hero_job::Job) -> Result { // Backend expects RunJobParams struct with secret and job fields - wrap in array like register_runner let params = serde_json::json!([{ - "secret": secret, "job": { "id": job.id, "caller_id": job.caller_id, "context_id": job.context_id, "payload": job.payload, "runner": job.runner, - "executor": job.executor, "timeout": job.timeout, "env_vars": serde_json::from_str::(&serde_json::to_string(&job.env_vars).unwrap_or_else(|_| "{}".to_string())).unwrap_or(serde_json::json!({})), "created_at": job.created_at, @@ -286,14 +284,12 @@ impl WasmSupervisorClient { pub async fn run_job(&self, secret: String, job: hero_job::Job) -> Result { // Backend expects RunJobParams struct with secret and job fields - wrap in array like register_runner let params = serde_json::json!([{ - "secret": secret, "job": { "id": job.id, "caller_id": job.caller_id, "context_id": job.context_id, "payload": job.payload, "runner": job.runner, - "executor": job.executor, "timeout": job.timeout, "env_vars": serde_json::from_str::(&serde_json::to_string(&job.env_vars).unwrap_or_else(|_| "{}".to_string())).unwrap_or(serde_json::json!({})), "created_at": job.created_at, @@ -369,7 +365,6 @@ impl WasmSupervisorClient { caller_id: String, context_id: String, payload: String, - executor: String, ) -> Result { // Generate a unique job ID let job_id = format!("job-{}", uuid::Uuid::new_v4()); @@ -380,7 +375,6 @@ impl WasmSupervisorClient { "caller_id": caller_id, "context_id": context_id, "payload": payload, - "executor": executor, "timeout": 30, "env": {} }); @@ -416,7 +410,8 @@ impl WasmSupervisorClient { /// Get a job by job ID pub async fn get_job(&self, job_id: &str) -> Result { let params = serde_json::json!([job_id]); - match self.call_method("get_job", params).await { + + match self.call_method("job.run", params).await { Ok(result) => { // Convert the Job result to hero_job::Job if let Ok(job_value) = serde_json::from_value::(result) { @@ -426,7 +421,6 @@ impl WasmSupervisorClient { let context_id = job_value.get("context_id").and_then(|v| v.as_str()).unwrap_or("").to_string(); let payload = job_value.get("payload").and_then(|v| v.as_str()).unwrap_or("").to_string(); let runner = job_value.get("runner").and_then(|v| v.as_str()).unwrap_or("").to_string(); - let executor = job_value.get("executor").and_then(|v| v.as_str()).unwrap_or("").to_string(); let timeout_secs = job_value.get("timeout").and_then(|v| v.get("secs")).and_then(|v| v.as_u64()).unwrap_or(30); let env_vars = job_value.get("env_vars").map(|v| v.to_string()).unwrap_or_else(|| "{}".to_string()); let created_at = job_value.get("created_at").and_then(|v| v.as_str()).unwrap_or("").to_string(); @@ -438,7 +432,6 @@ impl WasmSupervisorClient { context_id, payload, runner, - executor, timeout: timeout_secs, env_vars: serde_json::from_str(&env_vars).unwrap_or_default(), created_at: chrono::DateTime::parse_from_rfc3339(&created_at) @@ -830,7 +823,6 @@ pub fn create_job_canonical_repr( context_id: String, payload: String, runner: String, - executor: String, timeout: u64, env_vars_json: String, ) -> Result { @@ -844,13 +836,12 @@ pub fn create_job_canonical_repr( // Create canonical representation (matches Job::canonical_representation in runner_rust) let canonical = format!( - "{}:{}:{}:{}:{}:{}:{}:{:?}", + "{}:{}:{}:{}:{}:{}:{:?}", id, caller_id, context_id, payload, runner, - executor, timeout, env_vars_sorted ); diff --git a/lib/models/job/lib.rs b/lib/models/job/lib.rs index ba522b6..76bee7f 100644 --- a/lib/models/job/lib.rs +++ b/lib/models/job/lib.rs @@ -73,7 +73,6 @@ pub struct Job { pub context_id: String, pub payload: String, pub runner: String, // name of the runner to execute this job - pub executor: String, // name of the executor the runner will use to execute this job pub timeout: u64, // timeout in seconds #[cfg_attr(target_arch = "wasm32", wasm_bindgen(skip))] pub env_vars: HashMap, // environment variables for script execution @@ -109,7 +108,6 @@ impl Job { context_id: String, payload: String, runner: String, - executor: String, ) -> Self { let now = Utc::now(); Self { @@ -118,7 +116,6 @@ impl Job { context_id, payload, runner, - executor, timeout: 300, // 5 minutes default env_vars: HashMap::new(), created_at: now, @@ -137,13 +134,12 @@ impl Job { env_vars_sorted.sort_by_key(|&(k, _)| k); format!( - "{}:{}:{}:{}:{}:{}:{}:{:?}", + "{}:{}:{}:{}:{}:{}:{:?}", self.id, self.caller_id, self.context_id, self.payload, self.runner, - self.executor, self.timeout, env_vars_sorted ) @@ -202,7 +198,6 @@ pub struct JobBuilder { context_id: String, payload: String, runner: String, - executor: String, timeout: u64, // timeout in seconds env_vars: HashMap, signatures: Vec, @@ -215,7 +210,6 @@ impl JobBuilder { context_id: "".to_string(), payload: "".to_string(), runner: "".to_string(), - executor: "".to_string(), timeout: 300, // 5 minutes default env_vars: HashMap::new(), signatures: Vec::new(), @@ -246,11 +240,6 @@ impl JobBuilder { self } - /// Set the executor for this job - pub fn executor(mut self, executor: &str) -> Self { - self.executor = executor.to_string(); - self - } /// Set the timeout for job execution (in seconds) pub fn timeout(mut self, timeout: u64) -> Self { @@ -311,16 +300,12 @@ impl JobBuilder { if self.runner.is_empty() { return Err(JobError::InvalidData("runner is required".to_string())); } - if self.executor.is_empty() { - return Err(JobError::InvalidData("executor is required".to_string())); - } let mut job = Job::new( self.caller_id, self.context_id, self.payload, self.runner, - self.executor, ); job.timeout = self.timeout; diff --git a/lib/runner/runner_trait.rs b/lib/runner/runner_trait.rs index c6f00de..1096e67 100644 --- a/lib/runner/runner_trait.rs +++ b/lib/runner/runner_trait.rs @@ -1,7 +1,7 @@ //! Runner trait abstraction for job processing use crate::{Job, JobStatus, Client}; -use log::{debug, error, info}; +use log::{error, info}; use redis::AsyncCommands; use std::sync::Arc; diff --git a/lib/runner/script_mode.rs b/lib/runner/script_mode.rs index fb10903..5690c38 100644 --- a/lib/runner/script_mode.rs +++ b/lib/runner/script_mode.rs @@ -32,7 +32,6 @@ where .caller_id("script_mode") .payload(script_content) .runner(runner_id) - .executor("rhai") .timeout(job_timeout.as_secs()) .build()?; diff --git a/tests/runner_hero.rs b/tests/runner_hero.rs index 84d13fe..4f1fdfd 100644 --- a/tests/runner_hero.rs +++ b/tests/runner_hero.rs @@ -139,22 +139,14 @@ async fn test_02_simple_heroscript() { let job_id = job.id.clone(); // Save and queue job - client.store_job_in_redis(&job).await.expect("Failed to save job"); - client.job_run(&job_id, RUNNER_ID).await.expect("Failed to queue job"); - - // Wait for job to complete - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - - // Check job status - let status = client.get_status(&job_id).await.expect("Failed to get job status"); - println!("Job status: {:?}", status); - - // Get result or error - if let Some(result) = client.get_result(&job_id).await.expect("Failed to get result") { - println!("Job result: {}", result); - } - if let Some(error) = client.get_error(&job_id).await.expect("Failed to get error") { - println!("Job error: {}", error); + match client.job_run_wait(&job, RUNNER_ID, 5).await { + Ok(result) => { + println!("✅ Job succeeded with result:\n{}", result); + } + Err(e) => { + println!("❌ Job failed with error: {:?}", e); + panic!("Job execution failed"); + } } println!("✅ Heroscript job completed"); @@ -180,11 +172,19 @@ async fn test_03_job_with_env_vars() { // Check job status let status = client.get_status(&job_id).await.expect("Failed to get job status"); - println!("Job status: {:?}", status); + println!("📊 Job status: {:?}", status); - // Get result - if let Some(result) = client.get_result(&job_id).await.expect("Failed to get result") { - println!("Job result: {}", result); + // Get result or error + match (client.get_result(&job_id).await, client.get_error(&job_id).await) { + (Ok(Some(result)), _) => { + println!("✅ Job succeeded with result:\n{}", result); + } + (_, Ok(Some(error))) => { + println!("❌ Job failed with error:\n{}", error); + } + _ => { + println!("⚠️ No result or error available"); + } } println!("✅ Job with env vars completed"); @@ -196,8 +196,13 @@ async fn test_04_job_timeout() { let client = create_client().await; - // Create job with short timeout - let mut job = create_test_job("sleep 10"); + // Create job with short timeout - use a heroscript that loops forever + let mut job = create_test_job(r#" +for i in 1..1000 { + print("Loop iteration: ${i}") + sleep(100) +} +"#); job.timeout = 2; // 2 second timeout let job_id = job.id.clone(); @@ -210,15 +215,17 @@ async fn test_04_job_timeout() { // Check job status - should be error due to timeout let status = client.get_status(&job_id).await.expect("Failed to get job status"); - println!("Job status: {:?}", status); + println!("📊 Job status: {:?}", status); // Should have error if let Some(error) = client.get_error(&job_id).await.expect("Failed to get error") { - println!("Job error (expected timeout): {}", error); + println!("❌ Job error (expected timeout):\n{}", error); assert!(error.contains("timeout") || error.contains("timed out"), "Error should mention timeout"); + println!("✅ Job timeout handled correctly"); + } else { + println!("⚠️ Expected timeout error but got none"); + panic!("Job should have timed out"); } - - println!("✅ Job timeout handled correctly"); } /// Final test that ensures cleanup happens diff --git a/tests/supervisor.rs b/tests/supervisor.rs index b880c7e..c48dfb6 100644 --- a/tests/supervisor.rs +++ b/tests/supervisor.rs @@ -111,11 +111,10 @@ async fn create_client() -> SupervisorClient { /// Helper to create a test job (always uses TEST_RUNNER_NAME) fn create_test_job(payload: &str) -> Job { JobBuilder::new() - .caller_id("e2e-test") + .caller_id("test-caller") .context_id("test-context") .runner(TEST_RUNNER_NAME) .payload(payload) - .executor("rhai") .timeout(30) .build() .expect("Failed to build test job")