Restructure job crate: separate rust and vlang folders, extract Job types from runner_rust

This commit is contained in:
Timur Gordon
2025-11-04 13:37:36 +01:00
parent 3ba9e84aa0
commit e3d8147eaa
5 changed files with 563 additions and 307 deletions

View File

@@ -1,3 +1,8 @@
# Job # Hero Job
Job model and client for supervisor Shared job types and utilities for the Hero ecosystem.
## Structure
- `rust/` - Rust implementation of job types
- `vlang/` - V language implementation (future)

File diff suppressed because it is too large Load Diff

View File

@@ -11,7 +11,11 @@ serde_json = "1.0"
uuid = { version = "1.0", features = ["v4"] } uuid = { version = "1.0", features = ["v4"] }
thiserror = "1.0" thiserror = "1.0"
redis = { version = "0.25", features = ["aio", "tokio-comp"] } redis = { version = "0.25", features = ["aio", "tokio-comp"] }
tokio = { version = "1.0", features = ["rt", "time"] }
log = "0.4" log = "0.4"
secp256k1 = { version = "0.28", features = ["recovery"] }
sha2 = "0.10"
hex = "0.4"
[lib] [lib]
name = "hero_job" name = "hero_job"

View File

@@ -138,7 +138,7 @@ impl Client {
.await .await
.map_err(|e| JobError::Redis(e))?; .map_err(|e| JobError::Redis(e))?;
conn.hset_multiple(&job_key, &[ let _: () = conn.hset_multiple(&job_key, &[
("error", error), ("error", error),
("status", JobStatus::Error.as_str()), ("status", JobStatus::Error.as_str()),
("updated_at", &now.to_rfc3339()), ("updated_at", &now.to_rfc3339()),
@@ -161,7 +161,7 @@ impl Client {
.await .await
.map_err(|e| JobError::Redis(e))?; .map_err(|e| JobError::Redis(e))?;
conn.hset_multiple(&job_key, &[ let _: () = conn.hset_multiple(&job_key, &[
("status", status.as_str()), ("status", status.as_str()),
("updated_at", &now.to_rfc3339()), ("updated_at", &now.to_rfc3339()),
]).await ]).await
@@ -204,8 +204,8 @@ impl Client {
Ok(()) Ok(())
} }
/// Store this job in Redis /// Store this job in Redis with the specified status
pub async fn store_job_in_redis(&self, job: &Job) -> Result<(), JobError> { pub async fn store_job_in_redis_with_status(&self, job: &Job, status: JobStatus) -> Result<(), JobError> {
let mut conn = self.redis_client let mut conn = self.redis_client
.get_multiplexed_async_connection() .get_multiplexed_async_connection()
.await .await
@@ -215,12 +215,12 @@ impl Client {
// Serialize the job data // Serialize the job data
let job_data = serde_json::to_string(job) let job_data = serde_json::to_string(job)
.map_err(|e| JobError::Serialization(e.to_string()))?; .map_err(|e| JobError::Serialization(e))?;
// Store job data in Redis hash // Store job data in Redis hash
let _: () = conn.hset_multiple(&job_key, &[ let _: () = conn.hset_multiple(&job_key, &[
("data", job_data), ("data", job_data),
("status", JobStatus::Dispatched.as_str().to_string()), ("status", status.as_str().to_string()),
("created_at", job.created_at.to_rfc3339()), ("created_at", job.created_at.to_rfc3339()),
("updated_at", job.updated_at.to_rfc3339()), ("updated_at", job.updated_at.to_rfc3339()),
]).await ]).await
@@ -232,6 +232,11 @@ impl Client {
Ok(()) Ok(())
} }
/// Store this job in Redis (defaults to Dispatched status for backwards compatibility)
pub async fn store_job_in_redis(&self, job: &Job) -> Result<(), JobError> {
self.store_job_in_redis_with_status(job, JobStatus::Dispatched).await
}
/// Load a job from Redis by ID /// Load a job from Redis by ID
pub async fn load_job_from_redis( pub async fn load_job_from_redis(
@@ -252,7 +257,7 @@ impl Client {
match job_data { match job_data {
Some(data) => { Some(data) => {
let job: Job = serde_json::from_str(&data) let job: Job = serde_json::from_str(&data)
.map_err(|e| JobError::Serialization(e.to_string()))?; .map_err(|e| JobError::Serialization(e))?;
Ok(job) Ok(job)
} }
None => Err(JobError::NotFound(job_id.to_string())), None => Err(JobError::NotFound(job_id.to_string())),
@@ -312,6 +317,21 @@ impl Client {
Ok(result) Ok(result)
} }
/// Get job result from Redis
pub async fn get_error(
&self,
job_id: &str,
) -> Result<Option<String>, JobError> {
let job_key = self.job_key(job_id);
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
let result: Option<String> = conn.hget(&job_key, "error").await
.map_err(|e| JobError::Redis(e))?;
Ok(result)
}
/// Get a job ID from the work queue (blocking pop) /// Get a job ID from the work queue (blocking pop)
pub async fn get_job_id(&self, queue_key: &str) -> Result<Option<String>, JobError> { pub async fn get_job_id(&self, queue_key: &str) -> Result<Option<String>, JobError> {
let mut conn = self.redis_client let mut conn = self.redis_client
@@ -330,4 +350,113 @@ impl Client {
pub async fn get_job(&self, job_id: &str) -> Result<Job, JobError> { pub async fn get_job(&self, job_id: &str) -> Result<Job, JobError> {
self.load_job_from_redis(job_id).await self.load_job_from_redis(job_id).await
} }
/// Dispatch a job to a runner's queue
pub async fn dispatch_job(&self, job_id: &str, runner_name: &str) -> Result<(), JobError> {
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
let queue_key = self.runner_key(runner_name);
// Push job ID to the runner's queue (LPUSH for FIFO with BRPOP)
let _: () = conn.lpush(&queue_key, job_id).await
.map_err(|e| JobError::Redis(e))?;
Ok(())
}
/// Run a job: dispatch it, wait for completion, and return the result
///
/// This is a convenience method that:
/// 1. Stores the job in Redis
/// 2. Dispatches it to the runner's queue
/// 3. Waits for the job to complete (polls status)
/// 4. Returns the result or error
///
/// # Arguments
/// * `job` - The job to run
/// * `runner_name` - The name of the runner to dispatch to
/// * `timeout_secs` - Maximum time to wait for job completion (in seconds)
///
/// # Returns
/// * `Ok(String)` - The job result if successful
/// * `Err(JobError)` - If the job fails, times out, or encounters an error
pub async fn run_job(
&self,
job: &Job,
runner_name: &str,
timeout_secs: u64,
) -> Result<String, JobError> {
use tokio::time::{Duration, timeout};
// Store the job in Redis
self.store_job_in_redis(job).await?;
// Dispatch to runner queue
self.dispatch_job(&job.id, runner_name).await?;
// Wait for job to complete with timeout
let result = timeout(
Duration::from_secs(timeout_secs),
self.wait_for_job_completion(&job.id)
).await;
match result {
Ok(Ok(job_result)) => Ok(job_result),
Ok(Err(e)) => Err(e),
Err(_) => Err(JobError::Timeout(format!(
"Job {} did not complete within {} seconds",
job.id, timeout_secs
))),
}
}
/// Wait for a job to complete by polling its status
///
/// This polls the job status every 500ms until it reaches a terminal state
/// (Finished or Error), then returns the result or error.
async fn wait_for_job_completion(&self, job_id: &str) -> Result<String, JobError> {
use tokio::time::{sleep, Duration};
loop {
// Check job status
let status = self.get_status(job_id).await?;
match status {
JobStatus::Finished => {
// Job completed successfully, get the result
let result = self.get_result(job_id).await?;
return result.ok_or_else(|| {
JobError::InvalidData(format!("Job {} finished but has no result", job_id))
});
}
JobStatus::Error => {
// Job failed, get the error message
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
let error_msg: Option<String> = conn
.hget(&self.job_key(job_id), "error")
.await
.map_err(|e| JobError::Redis(e))?;
return Err(JobError::InvalidData(
error_msg.unwrap_or_else(|| format!("Job {} failed with unknown error", job_id))
));
}
JobStatus::Stopping => {
return Err(JobError::InvalidData(format!("Job {} was stopped", job_id)));
}
// Job is still running (Dispatched, WaitingForPrerequisites, Started)
_ => {
// Wait before polling again
sleep(Duration::from_millis(500)).await;
}
}
}
}
} }

View File

@@ -1,17 +1,26 @@
use chrono::{DateTime, Utc}; use chrono::{Utc};
use redis::AsyncCommands;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use thiserror::Error; use thiserror::Error;
use uuid::Uuid; use uuid::Uuid;
use log::{debug, error}; use log::{error};
pub mod client; pub mod client;
pub use client::Client; pub use client::{Client, ClientBuilder};
/// Signature for a job - contains the signatory's public key and their signature
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobSignature {
/// Public key of the signatory (hex-encoded secp256k1 public key)
pub public_key: String,
/// Signature (hex-encoded secp256k1 signature)
pub signature: String,
}
/// Job status enumeration /// Job status enumeration
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum JobStatus { pub enum JobStatus {
Created,
Dispatched, Dispatched,
WaitingForPrerequisites, WaitingForPrerequisites,
Started, Started,
@@ -23,6 +32,7 @@ pub enum JobStatus {
impl JobStatus { impl JobStatus {
pub fn as_str(&self) -> &'static str { pub fn as_str(&self) -> &'static str {
match self { match self {
JobStatus::Created => "created",
JobStatus::Dispatched => "dispatched", JobStatus::Dispatched => "dispatched",
JobStatus::WaitingForPrerequisites => "waiting_for_prerequisites", JobStatus::WaitingForPrerequisites => "waiting_for_prerequisites",
JobStatus::Started => "started", JobStatus::Started => "started",
@@ -34,6 +44,7 @@ impl JobStatus {
pub fn from_str(s: &str) -> Option<Self> { pub fn from_str(s: &str) -> Option<Self> {
match s { match s {
"created" => Some(JobStatus::Created),
"dispatched" => Some(JobStatus::Dispatched), "dispatched" => Some(JobStatus::Dispatched),
"waiting_for_prerequisites" => Some(JobStatus::WaitingForPrerequisites), "waiting_for_prerequisites" => Some(JobStatus::WaitingForPrerequisites),
"started" => Some(JobStatus::Started), "started" => Some(JobStatus::Started),
@@ -61,6 +72,9 @@ pub struct Job {
pub env_vars: HashMap<String, String>, // environment variables for script execution pub env_vars: HashMap<String, String>, // environment variables for script execution
pub created_at: chrono::DateTime<chrono::Utc>, pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>, pub updated_at: chrono::DateTime<chrono::Utc>,
/// Signatures from authorized signatories (public keys are included in each signature)
pub signatures: Vec<JobSignature>,
} }
/// Error types for job operations /// Error types for job operations
@@ -69,7 +83,7 @@ pub enum JobError {
#[error("Redis error: {0}")] #[error("Redis error: {0}")]
Redis(#[from] redis::RedisError), Redis(#[from] redis::RedisError),
#[error("Serialization error: {0}")] #[error("Serialization error: {0}")]
Serialization(String), Serialization(#[from] serde_json::Error),
#[error("Job not found: {0}")] #[error("Job not found: {0}")]
NotFound(String), NotFound(String),
#[error("Invalid job status: {0}")] #[error("Invalid job status: {0}")]
@@ -78,6 +92,10 @@ pub enum JobError {
Timeout(String), Timeout(String),
#[error("Invalid job data: {0}")] #[error("Invalid job data: {0}")]
InvalidData(String), InvalidData(String),
#[error("Signature verification failed: {0}")]
SignatureVerificationFailed(String),
#[error("Unauthorized: {0}")]
Unauthorized(String),
} }
impl Job { impl Job {
@@ -101,73 +119,77 @@ impl Job {
env_vars: HashMap::new(), env_vars: HashMap::new(),
created_at: now, created_at: now,
updated_at: now, updated_at: now,
signatures: Vec::new(),
} }
} }
/// Update job status in Redis using default client /// Get the canonical representation of the job for signing
pub async fn update_status( /// This creates a deterministic string representation that can be hashed and signed
redis_conn: &mut redis::aio::MultiplexedConnection, /// Note: Signatures are excluded from the canonical representation
job_id: &str, pub fn canonical_representation(&self) -> String {
status: JobStatus, // Create a deterministic representation excluding signatures
) -> Result<(), JobError> { // Sort env_vars keys for deterministic ordering
let now = Utc::now(); let mut env_vars_sorted: Vec<_> = self.env_vars.iter().collect();
let job_key = format!("hero:job:{}", job_id); env_vars_sorted.sort_by_key(|&(k, _)| k);
let _: () = redis_conn.hset_multiple(&job_key, &[ format!(
("status", status.as_str()), "{}:{}:{}:{}:{}:{}:{}:{:?}",
("updated_at", &now.to_rfc3339()), self.id,
]).await self.caller_id,
.map_err(|e| JobError::Redis(e))?; self.context_id,
Ok(()) self.payload,
self.runner,
self.executor,
self.timeout,
env_vars_sorted
)
} }
/// Set job result in Redis using default client /// Get list of signatory public keys from signatures
pub async fn set_result( pub fn signatories(&self) -> Vec<String> {
redis_conn: &mut redis::aio::MultiplexedConnection, self.signatures.iter()
job_id: &str, .map(|sig| sig.public_key.clone())
result: &str, .collect()
) -> Result<(), JobError> { }
let job_key = format!("hero:job:{}", job_id);
let now = Utc::now(); /// Verify that all signatures are valid
/// Returns Ok(()) if verification passes, Err otherwise
/// Empty signatures list is allowed - loop simply won't execute
pub fn verify_signatures(&self) -> Result<(), JobError> {
use secp256k1::{Message, PublicKey, Secp256k1, ecdsa::Signature};
use sha2::{Sha256, Digest};
let _: () = redis_conn.hset_multiple(&job_key, &[ // Get the canonical representation and hash it
("result", result), let canonical = self.canonical_representation();
("status", JobStatus::Finished.as_str()), let mut hasher = Sha256::new();
("updated_at", &now.to_rfc3339()), hasher.update(canonical.as_bytes());
]).await let hash = hasher.finalize();
.map_err(|e| JobError::Redis(e))?;
Ok(()) let secp = Secp256k1::verification_only();
} let message = Message::from_digest_slice(&hash)
.map_err(|e| JobError::SignatureVerificationFailed(format!("Invalid message: {}", e)))?;
/// Set job error in Redis using default client
pub async fn set_error( // Verify each signature (if any)
redis_conn: &mut redis::aio::MultiplexedConnection, for sig_data in &self.signatures {
job_id: &str, // Decode public key
error: &str, let pubkey_bytes = hex::decode(&sig_data.public_key)
) -> Result<(), JobError> { .map_err(|e| JobError::SignatureVerificationFailed(format!("Invalid public key hex: {}", e)))?;
let job_key = format!("hero:job:{}", job_id); let pubkey = PublicKey::from_slice(&pubkey_bytes)
let now = Utc::now(); .map_err(|e| JobError::SignatureVerificationFailed(format!("Invalid public key: {}", e)))?;
// Decode signature
let sig_bytes = hex::decode(&sig_data.signature)
.map_err(|e| JobError::SignatureVerificationFailed(format!("Invalid signature hex: {}", e)))?;
let signature = Signature::from_compact(&sig_bytes)
.map_err(|e| JobError::SignatureVerificationFailed(format!("Invalid signature: {}", e)))?;
// Verify signature
secp.verify_ecdsa(&message, &signature, &pubkey)
.map_err(|e| JobError::SignatureVerificationFailed(format!("Signature verification failed: {}", e)))?;
}
let _: () = redis_conn.hset_multiple(&job_key, &[
("error", error),
("status", JobStatus::Error.as_str()),
("updated_at", &now.to_rfc3339()),
]).await
.map_err(|e| JobError::Redis(e))?;
Ok(()) Ok(())
} }
/// Delete job from Redis using default client
pub async fn delete(
redis_conn: &mut redis::aio::MultiplexedConnection,
job_id: &str,
) -> Result<(), JobError> {
let job_key = format!("hero:job:{}", job_id);
let _: () = redis_conn.del(&job_key).await
.map_err(|e| JobError::Redis(e))?;
Ok(())
}
} }
/// Builder for constructing job execution requests. /// Builder for constructing job execution requests.
@@ -179,6 +201,7 @@ pub struct JobBuilder {
executor: String, executor: String,
timeout: u64, // timeout in seconds timeout: u64, // timeout in seconds
env_vars: HashMap<String, String>, env_vars: HashMap<String, String>,
signatures: Vec<JobSignature>,
} }
impl JobBuilder { impl JobBuilder {
@@ -191,6 +214,7 @@ impl JobBuilder {
executor: "".to_string(), executor: "".to_string(),
timeout: 300, // 5 minutes default timeout: 300, // 5 minutes default
env_vars: HashMap::new(), env_vars: HashMap::new(),
signatures: Vec::new(),
} }
} }
@@ -248,6 +272,27 @@ impl JobBuilder {
self self
} }
/// Add a signature (public key and signature)
pub fn signature(mut self, public_key: &str, signature: &str) -> Self {
self.signatures.push(JobSignature {
public_key: public_key.to_string(),
signature: signature.to_string(),
});
self
}
/// Set multiple signatures
pub fn signatures(mut self, signatures: Vec<JobSignature>) -> Self {
self.signatures = signatures;
self
}
/// Clear all signatures
pub fn clear_signatures(mut self) -> Self {
self.signatures.clear();
self
}
/// Build the job /// Build the job
pub fn build(self) -> Result<Job, JobError> { pub fn build(self) -> Result<Job, JobError> {
if self.caller_id.is_empty() { if self.caller_id.is_empty() {
@@ -276,6 +321,7 @@ impl JobBuilder {
job.timeout = self.timeout; job.timeout = self.timeout;
job.env_vars = self.env_vars; job.env_vars = self.env_vars;
job.signatures = self.signatures;
Ok(job) Ok(job)
} }