initial commit

This commit is contained in:
Timur Gordon
2025-09-01 16:19:02 +02:00
commit 1f7cd4ded8
6 changed files with 1684 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
target

1041
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

18
Cargo.toml Normal file
View File

@@ -0,0 +1,18 @@
[package]
name = "hero-job"
version = "0.1.0"
edition = "2021"
description = "Job types and utilities for the Hero ecosystem"
[dependencies]
chrono = { version = "0.4", features = ["serde"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
uuid = { version = "1.0", features = ["v4"] }
thiserror = "1.0"
redis = { version = "0.25", features = ["aio", "tokio-comp"] }
log = "0.4"
[lib]
name = "hero_job"
path = "src/lib.rs"

3
README.md Normal file
View File

@@ -0,0 +1,3 @@
# Job
Job model and client for supervisor

333
src/client.rs Normal file
View File

@@ -0,0 +1,333 @@
//! Job client implementation for managing jobs in Redis
use chrono::Utc;
use redis::AsyncCommands;
use crate::{Job, JobStatus, JobError};
/// Client for managing jobs in Redis
#[derive(Debug, Clone)]
pub struct Client {
redis_client: redis::Client,
namespace: String,
}
pub struct ClientBuilder {
/// Redis URL for connection
redis_url: String,
/// Namespace for queue keys
namespace: String,
}
impl ClientBuilder {
/// Create a new client builder
pub fn new() -> Self {
Self {
redis_url: "redis://localhost:6379".to_string(),
namespace: "".to_string(),
}
}
/// Set the Redis URL
pub fn redis_url<S: Into<String>>(mut self, url: S) -> Self {
self.redis_url = url.into();
self
}
/// Set the namespace for queue keys
pub fn namespace<S: Into<String>>(mut self, namespace: S) -> Self {
self.namespace = namespace.into();
self
}
/// Build the client
pub async fn build(self) -> Result<Client, JobError> {
// Create Redis client
let redis_client = redis::Client::open(self.redis_url.as_str())
.map_err(|e| JobError::Redis(e))?;
Ok(Client {
redis_client,
namespace: self.namespace,
})
}
}
impl Default for Client {
fn default() -> Self {
// Note: Default implementation creates an empty client
// Use Client::builder() for proper initialization
Self {
redis_client: redis::Client::open("redis://localhost:6379").unwrap(),
namespace: "".to_string(),
}
}
}
impl Client {
/// Create a new client builder
pub fn builder() -> ClientBuilder {
ClientBuilder::new()
}
/// List all job IDs from Redis
pub async fn list_jobs(&self) -> Result<Vec<String>, JobError> {
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
let keys: Vec<String> = conn.keys(format!("{}:*", &self.jobs_key())).await
.map_err(|e| JobError::Redis(e))?;
let job_ids: Vec<String> = keys
.into_iter()
.filter_map(|key| {
if key.starts_with(&format!("{}:", self.jobs_key())) {
key.strip_prefix(&format!("{}:", self.jobs_key()))
.map(|s| s.to_string())
} else {
None
}
})
.collect();
Ok(job_ids)
}
fn jobs_key(&self) -> String {
if self.namespace.is_empty() {
format!("job")
} else {
format!("{}:job", self.namespace)
}
}
pub fn job_key(&self, job_id: &str) -> String {
if self.namespace.is_empty() {
format!("job:{}", job_id)
} else {
format!("{}:job:{}", self.namespace, job_id)
}
}
pub fn job_reply_key(&self, job_id: &str) -> String {
if self.namespace.is_empty() {
format!("reply:{}", job_id)
} else {
format!("{}:reply:{}", self.namespace, job_id)
}
}
pub fn runner_key(&self, runner_name: &str) -> String {
if self.namespace.is_empty() {
format!("runner:{}", runner_name)
} else {
format!("{}:runner:{}", self.namespace, runner_name)
}
}
/// Set job error in Redis
pub async fn set_error(&self,
job_id: &str,
error: &str,
) -> Result<(), JobError> {
let job_key = self.job_key(job_id);
let now = Utc::now();
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
conn.hset_multiple(&job_key, &[
("error", error),
("status", JobStatus::Error.as_str()),
("updated_at", &now.to_rfc3339()),
]).await
.map_err(|e| JobError::Redis(e))?;
Ok(())
}
/// Set job status in Redis
pub async fn set_job_status(&self,
job_id: &str,
status: JobStatus,
) -> Result<(), JobError> {
let job_key = self.job_key(job_id);
let now = Utc::now();
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
conn.hset_multiple(&job_key, &[
("status", status.as_str()),
("updated_at", &now.to_rfc3339()),
]).await
.map_err(|e| JobError::Redis(e))?;
Ok(())
}
/// Get job status from Redis
pub async fn get_status(
&self,
job_id: &str,
) -> Result<JobStatus, JobError> {
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
let status_str: Option<String> = conn.hget(&self.job_key(job_id), "status").await
.map_err(|e| JobError::Redis(e))?;
match status_str {
Some(s) => JobStatus::from_str(&s).ok_or_else(|| JobError::InvalidStatus(s)),
None => Err(JobError::NotFound(job_id.to_string())),
}
}
/// Delete job from Redis
pub async fn delete_from_redis(
&self,
job_id: &str,
) -> Result<(), JobError> {
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
let job_key = self.job_key(job_id);
let _: () = conn.del(&job_key).await
.map_err(|e| JobError::Redis(e))?;
Ok(())
}
/// Store this job in Redis
pub async fn store_job_in_redis(&self, job: &Job) -> Result<(), JobError> {
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
let job_key = self.job_key(&job.id);
// Serialize the job data
let job_data = serde_json::to_string(job)
.map_err(|e| JobError::Serialization(e.to_string()))?;
// Store job data in Redis hash
let _: () = conn.hset_multiple(&job_key, &[
("data", job_data),
("status", JobStatus::Dispatched.as_str().to_string()),
("created_at", job.created_at.to_rfc3339()),
("updated_at", job.updated_at.to_rfc3339()),
]).await
.map_err(|e| JobError::Redis(e))?;
// Set TTL for the job (24 hours)
let _: () = conn.expire(&job_key, 86400).await
.map_err(|e| JobError::Redis(e))?;
Ok(())
}
/// Load a job from Redis by ID
pub async fn load_job_from_redis(
&self,
job_id: &str,
) -> Result<Job, JobError> {
let job_key = self.job_key(job_id);
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
// Get job data from Redis
let job_data: Option<String> = conn.hget(&job_key, "data").await
.map_err(|e| JobError::Redis(e))?;
match job_data {
Some(data) => {
let job: Job = serde_json::from_str(&data)
.map_err(|e| JobError::Serialization(e.to_string()))?;
Ok(job)
}
None => Err(JobError::NotFound(job_id.to_string())),
}
}
/// Delete a job by ID
pub async fn delete_job(&mut self, job_id: &str) -> Result<(), JobError> {
let mut conn = self.redis_client.get_multiplexed_async_connection().await
.map_err(|e| JobError::Redis(e))?;
let job_key = self.job_key(job_id);
let deleted_count: i32 = conn.del(&job_key).await
.map_err(|e| JobError::Redis(e))?;
if deleted_count == 0 {
return Err(JobError::NotFound(job_id.to_string()));
}
Ok(())
}
/// Set job result in Redis
pub async fn set_result(
&self,
job_id: &str,
result: &str,
) -> Result<(), JobError> {
let job_key = self.job_key(&job_id);
let now = Utc::now();
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
let _: () = conn.hset_multiple(&job_key, &[
("result", result),
("status", JobStatus::Finished.as_str()),
("updated_at", &now.to_rfc3339()),
]).await
.map_err(|e| JobError::Redis(e))?;
Ok(())
}
/// Get job result from Redis
pub async fn get_result(
&self,
job_id: &str,
) -> Result<Option<String>, JobError> {
let job_key = self.job_key(job_id);
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
let result: Option<String> = conn.hget(&job_key, "result").await
.map_err(|e| JobError::Redis(e))?;
Ok(result)
}
/// Get a job ID from the work queue (blocking pop)
pub async fn get_job_id(&self, queue_key: &str) -> Result<Option<String>, JobError> {
let mut conn = self.redis_client
.get_multiplexed_async_connection()
.await
.map_err(|e| JobError::Redis(e))?;
// Use BRPOP with a short timeout to avoid blocking indefinitely
let result: Option<(String, String)> = conn.brpop(queue_key, 1.0).await
.map_err(|e| JobError::Redis(e))?;
Ok(result.map(|(_, job_id)| job_id))
}
/// Get a job by ID (alias for load_job_from_redis)
pub async fn get_job(&self, job_id: &str) -> Result<Job, JobError> {
self.load_job_from_redis(job_id).await
}
}

288
src/lib.rs Normal file
View File

@@ -0,0 +1,288 @@
use chrono::{DateTime, Utc};
use redis::AsyncCommands;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use thiserror::Error;
use uuid::Uuid;
use log::{debug, error};
pub mod client;
pub use client::Client;
/// Job status enumeration
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum JobStatus {
Dispatched,
WaitingForPrerequisites,
Started,
Error,
Stopping,
Finished,
}
impl JobStatus {
pub fn as_str(&self) -> &'static str {
match self {
JobStatus::Dispatched => "dispatched",
JobStatus::WaitingForPrerequisites => "waiting_for_prerequisites",
JobStatus::Started => "started",
JobStatus::Error => "error",
JobStatus::Stopping => "stopping",
JobStatus::Finished => "finished",
}
}
pub fn from_str(s: &str) -> Option<Self> {
match s {
"dispatched" => Some(JobStatus::Dispatched),
"waiting_for_prerequisites" => Some(JobStatus::WaitingForPrerequisites),
"started" => Some(JobStatus::Started),
"error" => Some(JobStatus::Error),
"stopping" => Some(JobStatus::Stopping),
"finished" => Some(JobStatus::Finished),
_ => None,
}
}
}
/// Representation of a script execution request.
///
/// This structure contains all the information needed to execute a script
/// on a actor service, including the script content, dependencies, and metadata.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Job {
pub id: String,
pub caller_id: String,
pub context_id: String,
pub payload: String,
pub runner: String, // name of the runner to execute this job
pub executor: String, // name of the executor the runner will use to execute this job
pub timeout: u64, // timeout in seconds
pub env_vars: HashMap<String, String>, // environment variables for script execution
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
}
/// Error types for job operations
#[derive(Error, Debug)]
pub enum JobError {
#[error("Redis error: {0}")]
Redis(#[from] redis::RedisError),
#[error("Serialization error: {0}")]
Serialization(String),
#[error("Job not found: {0}")]
NotFound(String),
#[error("Invalid job status: {0}")]
InvalidStatus(String),
#[error("Timeout error: {0}")]
Timeout(String),
#[error("Invalid job data: {0}")]
InvalidData(String),
}
impl Job {
/// Create a new job with the given parameters
pub fn new(
caller_id: String,
context_id: String,
payload: String,
runner: String,
executor: String,
) -> Self {
let now = Utc::now();
Self {
id: Uuid::new_v4().to_string(),
caller_id,
context_id,
payload,
runner,
executor,
timeout: 300, // 5 minutes default
env_vars: HashMap::new(),
created_at: now,
updated_at: now,
}
}
/// Update job status in Redis using default client
pub async fn update_status(
redis_conn: &mut redis::aio::MultiplexedConnection,
job_id: &str,
status: JobStatus,
) -> Result<(), JobError> {
let now = Utc::now();
let job_key = format!("hero:job:{}", job_id);
let _: () = redis_conn.hset_multiple(&job_key, &[
("status", status.as_str()),
("updated_at", &now.to_rfc3339()),
]).await
.map_err(|e| JobError::Redis(e))?;
Ok(())
}
/// Set job result in Redis using default client
pub async fn set_result(
redis_conn: &mut redis::aio::MultiplexedConnection,
job_id: &str,
result: &str,
) -> Result<(), JobError> {
let job_key = format!("hero:job:{}", job_id);
let now = Utc::now();
let _: () = redis_conn.hset_multiple(&job_key, &[
("result", result),
("status", JobStatus::Finished.as_str()),
("updated_at", &now.to_rfc3339()),
]).await
.map_err(|e| JobError::Redis(e))?;
Ok(())
}
/// Set job error in Redis using default client
pub async fn set_error(
redis_conn: &mut redis::aio::MultiplexedConnection,
job_id: &str,
error: &str,
) -> Result<(), JobError> {
let job_key = format!("hero:job:{}", job_id);
let now = Utc::now();
let _: () = redis_conn.hset_multiple(&job_key, &[
("error", error),
("status", JobStatus::Error.as_str()),
("updated_at", &now.to_rfc3339()),
]).await
.map_err(|e| JobError::Redis(e))?;
Ok(())
}
/// Delete job from Redis using default client
pub async fn delete(
redis_conn: &mut redis::aio::MultiplexedConnection,
job_id: &str,
) -> Result<(), JobError> {
let job_key = format!("hero:job:{}", job_id);
let _: () = redis_conn.del(&job_key).await
.map_err(|e| JobError::Redis(e))?;
Ok(())
}
}
/// Builder for constructing job execution requests.
pub struct JobBuilder {
caller_id: String,
context_id: String,
payload: String,
runner: String,
executor: String,
timeout: u64, // timeout in seconds
env_vars: HashMap<String, String>,
}
impl JobBuilder {
pub fn new() -> Self {
Self {
caller_id: "".to_string(),
context_id: "".to_string(),
payload: "".to_string(),
runner: "".to_string(),
executor: "".to_string(),
timeout: 300, // 5 minutes default
env_vars: HashMap::new(),
}
}
/// Set the caller ID for this job
pub fn caller_id(mut self, caller_id: &str) -> Self {
self.caller_id = caller_id.to_string();
self
}
/// Set the context ID for this job
pub fn context_id(mut self, context_id: &str) -> Self {
self.context_id = context_id.to_string();
self
}
/// Set the payload (script content) for this job
pub fn payload(mut self, payload: &str) -> Self {
self.payload = payload.to_string();
self
}
/// Set the runner name for this job
pub fn runner(mut self, runner: &str) -> Self {
self.runner = runner.to_string();
self
}
/// Set the executor for this job
pub fn executor(mut self, executor: &str) -> Self {
self.executor = executor.to_string();
self
}
/// Set the timeout for job execution (in seconds)
pub fn timeout(mut self, timeout: u64) -> Self {
self.timeout = timeout;
self
}
/// Set a single environment variable
pub fn env_var(mut self, key: &str, value: &str) -> Self {
self.env_vars.insert(key.to_string(), value.to_string());
self
}
/// Set multiple environment variables from a HashMap
pub fn env_vars(mut self, env_vars: HashMap<String, String>) -> Self {
self.env_vars = env_vars;
self
}
/// Clear all environment variables
pub fn clear_env_vars(mut self) -> Self {
self.env_vars.clear();
self
}
/// Build the job
pub fn build(self) -> Result<Job, JobError> {
if self.caller_id.is_empty() {
return Err(JobError::InvalidData("caller_id is required".to_string()));
}
if self.context_id.is_empty() {
return Err(JobError::InvalidData("context_id is required".to_string()));
}
if self.payload.is_empty() {
return Err(JobError::InvalidData("payload is required".to_string()));
}
if self.runner.is_empty() {
return Err(JobError::InvalidData("runner is required".to_string()));
}
if self.executor.is_empty() {
return Err(JobError::InvalidData("executor is required".to_string()));
}
let mut job = Job::new(
self.caller_id,
self.context_id,
self.payload,
self.runner,
self.executor,
);
job.timeout = self.timeout;
job.env_vars = self.env_vars;
Ok(job)
}
}
impl Default for JobBuilder {
fn default() -> Self {
Self::new()
}
}