From 7ce19f8b6ddcaa248620b8db074c8eaca8b2c9ac Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Fri, 22 Aug 2025 14:08:41 +0200 Subject: [PATCH] Add validation for service methods Signed-off-by: Lee Smet --- src/models.rs | 2 +- src/models/context.rs | 12 +- src/models/flow.rs | 21 +- src/models/job.rs | 28 +- src/models/message.rs | 30 +- src/models/runner.rs | 14 +- src/rpc.rs | 228 ++++++++------ src/service.rs | 676 +++++++++++++++++++++++++++++++++++++++++- src/storage/redis.rs | 232 ++++++++++++++- 9 files changed, 1089 insertions(+), 154 deletions(-) diff --git a/src/models.rs b/src/models.rs index e4941cd..cc30447 100644 --- a/src/models.rs +++ b/src/models.rs @@ -8,7 +8,7 @@ mod script_type; pub use actor::Actor; pub use context::Context; -pub use flow::Flow; +pub use flow::{Flow, FlowStatus}; pub use job::{Job, JobStatus}; pub use message::{Message, MessageFormatType, MessageStatus, MessageType}; pub use runner::Runner; diff --git a/src/models/context.rs b/src/models/context.rs index a1de1e2..cb7e9d8 100644 --- a/src/models/context.rs +++ b/src/models/context.rs @@ -5,13 +5,13 @@ use crate::time::Timestamp; #[derive(Serialize, Deserialize, Clone)] pub struct Context { /// Redis DB to use - id: u32, + pub id: u32, /// Actor ids which have admin rights on this context - admins: Vec, + pub admins: Vec, /// Actor ids which can read the context info - readers: Vec, + pub readers: Vec, /// Actor ids which can execute jobs in this context - executors: Vec, - created_at: Timestamp, - updated_at: Timestamp, + pub executors: Vec, + pub created_at: Timestamp, + pub updated_at: Timestamp, } diff --git a/src/models/flow.rs b/src/models/flow.rs index 36b6922..c0d98d2 100644 --- a/src/models/flow.rs +++ b/src/models/flow.rs @@ -7,25 +7,26 @@ use crate::time::Timestamp; #[derive(Serialize, Deserialize, Clone)] pub struct Flow { /// Job Id set tby the actor which created it - id: u32, + pub id: u32, /// Actor Id who created this job - caller_id: u32, + pub caller_id: u32, /// The context in which this job is executed - context_id: u32, + pub context_id: u32, /// List of jobs which make up the flow - jobs: Vec, + pub jobs: Vec, /// Environment variables, passed to every job when executed - env_vars: HashMap, + pub env_vars: HashMap, /// The result of the flow - result: HashMap, - created_at: Timestamp, - updated_at: Timestamp, - status: FlowStatus, + pub result: HashMap, + pub created_at: Timestamp, + pub updated_at: Timestamp, + pub status: FlowStatus, } /// The status of a flow -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] pub enum FlowStatus { + Created, Dispatched, Started, Error, diff --git a/src/models/job.rs b/src/models/job.rs index 18f7855..a43659d 100644 --- a/src/models/job.rs +++ b/src/models/job.rs @@ -7,25 +7,25 @@ use crate::{models::ScriptType, time::Timestamp}; #[derive(Clone, Serialize, Deserialize)] pub struct Job { /// Job Id, this is given by the actor who created the job - id: u32, + pub id: u32, /// Actor ID which created this job - caller_id: u32, + pub caller_id: u32, /// Context in which the job is executed - context_id: u32, - script: String, - script_type: ScriptType, + pub context_id: u32, + pub script: String, + pub script_type: ScriptType, /// Timeout in seconds for this job - timeout: u32, + pub timeout: u32, /// Max amount of times to retry this job - retries: u8, - env_vars: HashMap, - result: HashMap, - prerequisites: Vec, + pub retries: u8, + pub env_vars: HashMap, + pub result: HashMap, + pub prerequisites: Vec, /// Ids of jobs this job depends on, i.e. this job can't start until those have finished - depends: Vec, - created_at: Timestamp, - updated_at: Timestamp, - status: JobStatus, + pub depends: Vec, + pub created_at: Timestamp, + pub updated_at: Timestamp, + pub status: JobStatus, } #[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)] diff --git a/src/models/message.rs b/src/models/message.rs index 98ffba5..60cefca 100644 --- a/src/models/message.rs +++ b/src/models/message.rs @@ -8,25 +8,25 @@ use crate::{ #[derive(Clone, Serialize, Deserialize)] pub struct Message { /// Unique ID for the message, set by the caller - id: u32, + pub id: u32, /// Id of the actor who sent this message - caller_id: u32, + pub caller_id: u32, /// Id of the context in which this message was sent - context_id: u32, - message: String, - message_type: ScriptType, - message_format_type: MessageFormatType, + pub context_id: u32, + pub message: String, + pub message_type: ScriptType, + pub message_format_type: MessageFormatType, /// Seconds for the message to arrive at the destination - timeout: u32, + pub timeout: u32, /// Seconds for the receiver to acknowledge receipt of the message - timeout_ack: u32, + pub timeout_ack: u32, /// Seconds for the receiver to send us a reply - timeout_result: u32, - job: Vec, - logs: Vec, - created_at: Timestamp, - updated_at: Timestamp, - status: MessageStatus, + pub timeout_result: u32, + pub job: Vec, + pub logs: Vec, + pub created_at: Timestamp, + pub updated_at: Timestamp, + pub status: MessageStatus, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -36,7 +36,7 @@ pub enum MessageType { Mail, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum MessageStatus { Dispatched, Acknowledged, diff --git a/src/models/runner.rs b/src/models/runner.rs index e63b514..68bb6a7 100644 --- a/src/models/runner.rs +++ b/src/models/runner.rs @@ -6,17 +6,17 @@ use crate::time::Timestamp; #[derive(Serialize, Deserialize, Clone)] pub struct Runner { - id: u32, + pub id: u32, /// Mycelium public key - pubkey: String, + pub pubkey: String, /// Mycelium address - address: IpAddr, + pub address: IpAddr, /// Needs to be set by the runner, usually `runner, } impl ContextCreate { - pub fn into_domain(self) -> Result { + pub fn into_domain(self) -> Context { let ts = current_timestamp(); - let mut v = serde_json::Map::new(); - v.insert("id".to_string(), Value::from(self.id)); - v.insert( - "admins".to_string(), - serde_json::to_value(self.admins).unwrap(), - ); - v.insert( - "readers".to_string(), - serde_json::to_value(self.readers).unwrap(), - ); - v.insert( - "executors".to_string(), - serde_json::to_value(self.executors).unwrap(), - ); - v.insert("created_at".to_string(), Value::from(ts)); - v.insert("updated_at".to_string(), Value::from(ts)); - serde_json::from_value(Value::Object(v)).map_err(|e| e.to_string()) + + let ContextCreate { + id, + admins, + readers, + executors, + } = self; + + Context { + id, + admins, + readers, + executors, + created_at: ts, + updated_at: ts, + } } } @@ -145,18 +147,26 @@ pub struct RunnerCreate { pub local: bool, } impl RunnerCreate { - pub fn into_domain(self) -> Result { + pub fn into_domain(self) -> Runner { let ts = current_timestamp(); - let v = json!({ - "id": self.id, - "pubkey": self.pubkey, - "address": self.address, - "topic": self.topic, - "local": self.local, - "created_at": ts, - "updated_at": ts, - }); - serde_json::from_value(v).map_err(|e| e.to_string()) + + let RunnerCreate { + id, + pubkey, + address, + topic, + local, + } = self; + + Runner { + id, + pubkey, + address, + topic, + local, + created_at: ts, + updated_at: ts, + } } } @@ -167,24 +177,31 @@ pub struct FlowCreate { pub context_id: u32, pub jobs: Vec, pub env_vars: HashMap, - #[serde(default)] - pub result: Option>, } + impl FlowCreate { - pub fn into_domain(self) -> Result { + pub fn into_domain(self) -> Flow { let ts = current_timestamp(); - let v = json!({ - "id": self.id, - "caller_id": self.caller_id, - "context_id": self.context_id, - "jobs": self.jobs, - "env_vars": self.env_vars, - "result": self.result.unwrap_or_default(), - "created_at": ts, - "updated_at": ts, - "status": "Dispatched", - }); - serde_json::from_value(v).map_err(|e| e.to_string()) + + let FlowCreate { + id, + caller_id, + context_id, + jobs, + env_vars, + } = self; + + Flow { + id, + caller_id, + context_id, + jobs, + env_vars, + result: HashMap::new(), + created_at: ts, + updated_at: ts, + status: FlowStatus::Created, + } } } @@ -198,31 +215,43 @@ pub struct JobCreate { pub timeout: u32, pub retries: u8, pub env_vars: HashMap, - #[serde(default)] - pub result: Option>, pub prerequisites: Vec, pub depends: Vec, } + impl JobCreate { - pub fn into_domain(self) -> Result { + pub fn into_domain(self) -> Job { let ts = current_timestamp(); - let v = json!({ - "id": self.id, - "caller_id": self.caller_id, - "context_id": self.context_id, - "script": self.script, - "script_type": self.script_type, - "timeout": self.timeout, - "retries": self.retries, - "env_vars": self.env_vars, - "result": self.result.unwrap_or_default(), - "prerequisites": self.prerequisites, - "depends": self.depends, - "created_at": ts, - "updated_at": ts, - "status": "Dispatched", - }); - serde_json::from_value(v).map_err(|e| e.to_string()) + + let JobCreate { + id, + caller_id, + context_id, + script, + script_type, + timeout, + retries, + env_vars, + prerequisites, + depends, + } = self; + + Job { + id, + caller_id, + context_id, + script, + script_type, + timeout, + retries, + env_vars, + result: HashMap::new(), + prerequisites, + depends, + created_at: ts, + updated_at: ts, + status: JobStatus::WaitingForPrerequisites, + } } } @@ -238,37 +267,40 @@ pub struct MessageCreate { pub timeout_ack: u32, pub timeout_result: u32, pub job: Vec, - #[serde(default)] - pub logs: Option>, } impl MessageCreate { - pub fn into_domain(self) -> Result { + pub fn into_domain(self) -> Message { let ts = current_timestamp(); - let jobs: Result, String> = self - .job - .into_iter() - .map(|j| { - let jd: Job = j.into_domain()?; - serde_json::to_value(jd).map_err(|e| e.to_string()) - }) - .collect(); - let v = json!({ - "id": self.id, - "caller_id": self.caller_id, - "context_id": self.context_id, - "message": self.message, - "message_type": self.message_type, - "message_format_type": self.message_format_type, // "Html" | "Text" | "Md" - "timeout": self.timeout, - "timeout_ack": self.timeout_ack, - "timeout_result": self.timeout_result, - "job": jobs?, - "logs": self.logs.unwrap_or_default(), - "created_at": ts, - "updated_at": ts, - "status": "Dispatched", - }); - serde_json::from_value(v).map_err(|e| e.to_string()) + + let MessageCreate { + id, + caller_id, + context_id, + message, + message_type, + message_format_type, + timeout, + timeout_ack, + timeout_result, + job, + } = self; + + Message { + id, + caller_id, + context_id, + message, + message_type, + message_format_type, + timeout, + timeout_ack, + timeout_result, + job: job.into_iter().map(JobCreate::into_domain).collect(), + logs: Vec::new(), + created_at: ts, + updated_at: ts, + status: MessageStatus::Dispatched, + } } } @@ -390,7 +422,7 @@ pub fn build_module(state: Arc) -> RpcModule<()> { let state = state.clone(); async move { let p: ContextCreateParams = params.parse().map_err(invalid_params_err)?; - let ctx = p.context.into_domain().map_err(invalid_params_err)?; + let ctx = p.context.into_domain(); let ctx = state .service .create_context(ctx) @@ -427,7 +459,7 @@ pub fn build_module(state: Arc) -> RpcModule<()> { let state = state.clone(); async move { let p: RunnerCreateParams = params.parse().map_err(invalid_params_err)?; - let runner = p.runner.into_domain().map_err(invalid_params_err)?; + let runner = p.runner.into_domain(); let runner = state .service .create_runner(p.context_id, runner) @@ -464,7 +496,7 @@ pub fn build_module(state: Arc) -> RpcModule<()> { let state = state.clone(); async move { let p: FlowCreateParams = params.parse().map_err(invalid_params_err)?; - let flow = p.flow.into_domain().map_err(invalid_params_err)?; + let flow = p.flow.into_domain(); let flow = state .service .create_flow(p.context_id, flow) @@ -518,7 +550,7 @@ pub fn build_module(state: Arc) -> RpcModule<()> { let state = state.clone(); async move { let p: JobCreateParams = params.parse().map_err(invalid_params_err)?; - let job = p.job.into_domain().map_err(invalid_params_err)?; + let job = p.job.into_domain(); let job = state .service .create_job(p.context_id, job) @@ -555,7 +587,7 @@ pub fn build_module(state: Arc) -> RpcModule<()> { let state = state.clone(); async move { let p: MessageCreateParams = params.parse().map_err(invalid_params_err)?; - let message = p.message.into_domain().map_err(invalid_params_err)?; + let message = p.message.into_domain(); let message = state .service .create_message(p.context_id, message) diff --git a/src/service.rs b/src/service.rs index c8cee0c..3047f44 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,7 +1,13 @@ use crate::dag::{DagResult, FlowDag, build_flow_dag}; -use crate::models::{Actor, Context, Flow, Job, JobStatus, Message, Runner}; +use crate::models::{ + Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageStatus, Runner, +}; use crate::storage::RedisDriver; +use serde::Serialize; +use serde_json::Value; +use std::collections::HashMap; + pub type BoxError = Box; #[derive(Debug)] @@ -19,9 +25,286 @@ impl std::fmt::Display for InvalidJobStatusTransition { ) } } - impl std::error::Error for InvalidJobStatusTransition {} +#[derive(Debug)] +struct ValidationError { + msg: String, +} +impl ValidationError { + fn new(msg: impl Into) -> Self { + Self { msg: msg.into() } + } +} +impl std::fmt::Display for ValidationError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Validation error: {}", self.msg) + } +} +impl std::error::Error for ValidationError {} + +#[derive(Debug)] +struct PermissionDeniedError { + actor_id: u32, + context_id: u32, + action: String, +} +impl std::fmt::Display for PermissionDeniedError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Permission denied: actor {} cannot {} in context {}", + self.actor_id, self.action, self.context_id + ) + } +} +impl std::error::Error for PermissionDeniedError {} + +#[derive(Debug)] +struct AlreadyExistsError { + key: String, +} +impl std::fmt::Display for AlreadyExistsError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Already exists: {}", self.key) + } +} +impl std::error::Error for AlreadyExistsError {} + +// ----------------------------- +// Internal helpers +// ----------------------------- + +fn as_json(model: &impl Serialize) -> Result { + Ok(serde_json::to_value(model)?) +} + +fn json_get_u32(v: &Value, key: &str) -> Result { + v.get(key) + .and_then(|v| v.as_u64()) + .map(|x| x as u32) + .ok_or_else(|| { + ValidationError::new(format!("missing or invalid u32 field '{}'", key)).into() + }) +} + +fn json_get_str(v: &Value, key: &str) -> Result { + v.get(key) + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + .ok_or_else(|| { + ValidationError::new(format!("missing or invalid string field '{}'", key)).into() + }) +} + +fn json_get_array(v: &Value, key: &str) -> Result, BoxError> { + let arr = v + .get(key) + .and_then(|v| v.as_array()) + .ok_or_else(|| ValidationError::new(format!("missing or invalid array field '{}'", key)))?; + Ok(arr.clone()) +} + +fn contains_key_not_found(e: &BoxError) -> bool { + e.to_string().contains("Key not found") +} + +fn has_duplicate_u32s(list: &Vec) -> bool { + let mut seen = std::collections::HashSet::new(); + for it in list { + if let Some(x) = it.as_u64() + && !seen.insert(x) + { + return true; + } + } + false +} + +fn vec_u32_contains(list: &[Value], val: u32) -> bool { + list.iter().any(|v| v.as_u64() == Some(val as u64)) +} + +// role = "admins" | "executors" | "readers" +fn context_has_role(ctx: &Context, role: &str, actor_id: u32) -> Result { + let v = as_json(ctx)?; + let arr = v + .get(role) + .and_then(|r| r.as_array()) + .ok_or_else(|| ValidationError::new(format!("Context.{} missing or invalid", role)))?; + Ok(arr.iter().any(|x| x.as_u64() == Some(actor_id as u64))) +} + +// ----------------------------- +// Validation helpers (minimal, spec-aligned) +// ----------------------------- + +fn validate_context(ctx: &Context) -> Result<(), BoxError> { + let v = as_json(ctx)?; + let id = json_get_u32(&v, "id")?; + if id == 0 { + return Err(ValidationError::new("Context.id must be > 0").into()); + } + // admins required + let admins = json_get_array(&v, "admins")?; + if admins.is_empty() { + return Err(ValidationError::new("Context.admins must not be empty").into()); + } + Ok(()) +} + +fn validate_actor(_context_id: u32, actor: &Actor) -> Result<(), BoxError> { + let v = as_json(actor)?; + let id = json_get_u32(&v, "id")?; + if id == 0 { + return Err(ValidationError::new("Actor.id must be > 0").into()); + } + let pubkey = json_get_str(&v, "pubkey")?; + if pubkey.trim().is_empty() { + return Err(ValidationError::new("Actor.pubkey must not be empty").into()); + } + let addr = json_get_array(&v, "address")?; + if addr.is_empty() { + return Err(ValidationError::new("Actor.address must not be empty").into()); + } + Ok(()) +} + +fn validate_runner(_context_id: u32, runner: &Runner) -> Result<(), BoxError> { + let v = as_json(runner)?; + let id = json_get_u32(&v, "id")?; + if id == 0 { + return Err(ValidationError::new("Runner.id must be > 0").into()); + } + let pubkey = json_get_str(&v, "pubkey")?; + if pubkey.trim().is_empty() { + return Err(ValidationError::new("Runner.pubkey must not be empty").into()); + } + let topic = json_get_str(&v, "topic")?; + if topic.trim().is_empty() { + return Err(ValidationError::new("Runner.topic must not be empty").into()); + } + // address presence is ensured by serde typing; no additional validation here + Ok(()) +} + +fn validate_flow(context_id: u32, flow: &Flow) -> Result<(), BoxError> { + let v = as_json(flow)?; + let id = json_get_u32(&v, "id")?; + if id == 0 { + return Err(ValidationError::new("Flow.id must be > 0").into()); + } + let ctx = json_get_u32(&v, "context_id")?; + if ctx != context_id { + return Err(ValidationError::new(format!( + "Flow.context_id ({}) does not match path context_id ({})", + ctx, context_id + )) + .into()); + } + let jobs = json_get_array(&v, "jobs")?; + if has_duplicate_u32s(&jobs) { + return Err(ValidationError::new("Flow.jobs must not contain duplicates").into()); + } + Ok(()) +} + +fn validate_job(context_id: u32, job: &Job) -> Result<(), BoxError> { + let v = as_json(job)?; + let id = json_get_u32(&v, "id")?; + if id == 0 { + return Err(ValidationError::new("Job.id must be > 0").into()); + } + let ctx = json_get_u32(&v, "context_id")?; + if ctx != context_id { + return Err(ValidationError::new(format!( + "Job.context_id ({}) does not match path context_id ({})", + ctx, context_id + )) + .into()); + } + let script = json_get_str(&v, "script")?; + if script.trim().is_empty() { + return Err(ValidationError::new("Job.script must not be empty").into()); + } + let timeout = json_get_u32(&v, "timeout")?; + if timeout == 0 { + return Err(ValidationError::new("Job.timeout must be > 0").into()); + } + let depends = json_get_array(&v, "depends")?; + if has_duplicate_u32s(&depends) { + return Err(ValidationError::new("Job.depends must not contain duplicates").into()); + } + if vec_u32_contains(&depends, id) { + return Err(ValidationError::new("Job.depends must not include the job's own id").into()); + } + Ok(()) +} + +fn validate_message(context_id: u32, msg: &Message) -> Result<(), BoxError> { + let v = as_json(msg)?; + let id = json_get_u32(&v, "id")?; + if id == 0 { + return Err(ValidationError::new("Message.id must be > 0").into()); + } + let ctx = json_get_u32(&v, "context_id")?; + if ctx != context_id { + return Err(ValidationError::new(format!( + "Message.context_id ({}) does not match path context_id ({})", + ctx, context_id + )) + .into()); + } + let body = json_get_str(&v, "message")?; + if body.trim().is_empty() { + return Err(ValidationError::new("Message.message must not be empty").into()); + } + let t = json_get_u32(&v, "timeout")?; + let ta = json_get_u32(&v, "timeout_ack")?; + let tr = json_get_u32(&v, "timeout_result")?; + if t == 0 || ta == 0 || tr == 0 { + return Err(ValidationError::new( + "Message timeouts (timeout|timeout_ack|timeout_result) must be > 0", + ) + .into()); + } + // Validate embedded jobs minimal consistency (caller/context match) + let jobs = json_get_array(&v, "job")?; + let msg_caller = json_get_u32(&v, "caller_id")?; + for jv in jobs { + if let Some(obj) = jv.as_object() { + let mut jid = 0u32; + if let Some(u) = obj.get("id").and_then(|x| x.as_u64()) { + jid = u as u32; + } + if let (Some(jctx), Some(jcaller)) = ( + obj.get("context_id").and_then(|x| x.as_u64()), + obj.get("caller_id").and_then(|x| x.as_u64()), + ) { + if jctx as u32 != context_id { + return Err(ValidationError::new(format!( + "Embedded Job {} context_id mismatch ({} != {})", + jid, jctx as u32, context_id + )) + .into()); + } + if jcaller as u32 != msg_caller { + return Err(ValidationError::new(format!( + "Embedded Job {} caller_id mismatch ({} != {})", + jid, jcaller as u32, msg_caller + )) + .into()); + } + } + } + } + Ok(()) +} + +// ----------------------------- +// Service API +// ----------------------------- + pub struct AppService { redis: RedisDriver, } @@ -35,6 +318,11 @@ impl AppService { // Context // ----------------------------- pub async fn create_context(&self, ctx: Context) -> Result { + validate_context(&ctx)?; + // id + let v = as_json(&ctx)?; + let context_id = json_get_u32(&v, "id")?; + self.ensure_context_not_exists(context_id).await?; self.redis.save_context(&ctx).await?; Ok(ctx) } @@ -48,6 +336,10 @@ impl AppService { // Actor // ----------------------------- pub async fn create_actor(&self, context_id: u32, actor: Actor) -> Result { + validate_actor(context_id, &actor)?; + let v = as_json(&actor)?; + let id = json_get_u32(&v, "id")?; + self.ensure_actor_not_exists(context_id, id).await?; self.redis.save_actor(context_id, &actor).await?; Ok(actor) } @@ -61,6 +353,10 @@ impl AppService { // Runner // ----------------------------- pub async fn create_runner(&self, context_id: u32, runner: Runner) -> Result { + validate_runner(context_id, &runner)?; + let v = as_json(&runner)?; + let id = json_get_u32(&v, "id")?; + self.ensure_runner_not_exists(context_id, id).await?; self.redis.save_runner(context_id, &runner).await?; Ok(runner) } @@ -74,6 +370,18 @@ impl AppService { // Flow // ----------------------------- pub async fn create_flow(&self, context_id: u32, flow: Flow) -> Result { + validate_flow(context_id, &flow)?; + + // Permission: require that flow.caller_id is admin in the context + let v = as_json(&flow)?; + let fid = json_get_u32(&v, "id")?; + let caller_id = json_get_u32(&v, "caller_id")?; + self.require_admin(context_id, caller_id, "create flow") + .await?; + + self.ensure_flow_not_exists(context_id, fid).await?; + // Require that the context exists + let _ = self.redis.load_context(context_id).await?; self.redis.save_flow(context_id, &flow).await?; Ok(flow) } @@ -91,6 +399,12 @@ impl AppService { // Job // ----------------------------- pub async fn create_job(&self, context_id: u32, job: Job) -> Result { + validate_job(context_id, &job)?; + let v = as_json(&job)?; + let id = json_get_u32(&v, "id")?; + let caller_id = json_get_u32(&v, "caller_id")?; + self.ensure_job_not_exists(context_id, caller_id, id) + .await?; self.redis.save_job(context_id, &job).await?; Ok(job) } @@ -117,10 +431,13 @@ impl AppService { pub async fn update_job_status( &self, context_id: u32, + executor_id: u32, caller_id: u32, id: u32, new_status: JobStatus, ) -> Result<(), BoxError> { + self.require_executor(context_id, executor_id, "update job status") + .await?; let job = self.redis.load_job(context_id, caller_id, id).await?; let current = job.status(); @@ -151,8 +468,10 @@ impl AppService { self.redis .update_job_status(context_id, caller_id, id, new_status) .await?; + Ok(()) } + // ----------------------------- // Message // ----------------------------- @@ -161,6 +480,12 @@ impl AppService { context_id: u32, message: Message, ) -> Result { + validate_message(context_id, &message)?; + let v = as_json(&message)?; + let id = json_get_u32(&v, "id")?; + let caller_id = json_get_u32(&v, "caller_id")?; + self.ensure_message_not_exists(context_id, caller_id, id) + .await?; self.redis.save_message(context_id, &message).await?; Ok(message) } @@ -174,5 +499,352 @@ impl AppService { let msg = self.redis.load_message(context_id, caller_id, id).await?; Ok(msg) } + + pub async fn update_flow_status( + &self, + context_id: u32, + requestor_id: u32, + id: u32, + new_status: FlowStatus, + ) -> Result<(), BoxError> { + self.require_admin(context_id, requestor_id, "update flow status") + .await?; + let flow = self.redis.load_flow(context_id, id).await?; + let v = as_json(&flow)?; + let cur_raw = v + .get("status") + .cloned() + .unwrap_or(Value::String("Dispatched".to_string())); + + let cur = match cur_raw { + Value::String(s) if s == "Dispatched" => FlowStatus::Dispatched, + Value::String(s) if s == "Started" => FlowStatus::Started, + Value::String(s) if s == "Error" => FlowStatus::Error, + Value::String(s) if s == "Finished" => FlowStatus::Finished, + _ => FlowStatus::Dispatched, + }; + + if cur == new_status { + return Ok(()); + } + + let allowed = match cur { + FlowStatus::Created => matches!(new_status, FlowStatus::Dispatched | FlowStatus::Error), + FlowStatus::Dispatched => matches!(new_status, FlowStatus::Started | FlowStatus::Error), + FlowStatus::Started => matches!(new_status, FlowStatus::Finished | FlowStatus::Error), + FlowStatus::Finished | FlowStatus::Error => false, + }; + if !allowed { + return Err(ValidationError::new(format!( + "Invalid flow status transition: {:?} -> {:?}", + cur, new_status + )) + .into()); + } + + self.redis + .update_flow_status(context_id, id, new_status) + .await + } + + pub async fn update_message_status( + &self, + context_id: u32, + caller_id: u32, + id: u32, + new_status: MessageStatus, + ) -> Result<(), BoxError> { + let msg = self.redis.load_message(context_id, caller_id, id).await?; + let v = as_json(&msg)?; + let cur_raw = v + .get("status") + .cloned() + .unwrap_or(Value::String("Dispatched".to_string())); + + let cur = match cur_raw { + Value::String(s) if s == "Dispatched" => MessageStatus::Dispatched, + Value::String(s) if s == "Acknowledged" => MessageStatus::Acknowledged, + Value::String(s) if s == "Error" => MessageStatus::Error, + Value::String(s) if s == "Processed" => MessageStatus::Processed, + _ => MessageStatus::Dispatched, + }; + + if cur == new_status { + return Ok(()); + } + + let allowed = match cur { + MessageStatus::Dispatched => { + matches!( + new_status, + MessageStatus::Acknowledged | MessageStatus::Error + ) + } + MessageStatus::Acknowledged => { + matches!(new_status, MessageStatus::Processed | MessageStatus::Error) + } + MessageStatus::Processed | MessageStatus::Error => false, + }; + if !allowed { + return Err(ValidationError::new(format!( + "Invalid message status transition: {:?} -> {:?}", + cur, new_status + )) + .into()); + } + + self.redis + .update_message_status(context_id, caller_id, id, new_status) + .await + } + + pub async fn update_flow_env_vars_merge( + &self, + context_id: u32, + requestor_id: u32, + flow_id: u32, + patch: HashMap, + ) -> Result<(), BoxError> { + self.require_admin(context_id, requestor_id, "update flow env_vars") + .await?; + // Ensure flow exists + let _ = self.redis.load_flow(context_id, flow_id).await?; + self.redis + .update_flow_env_vars_merge(context_id, flow_id, patch) + .await + } + + pub async fn update_flow_result_merge( + &self, + context_id: u32, + requestor_id: u32, + flow_id: u32, + patch: HashMap, + ) -> Result<(), BoxError> { + self.require_admin(context_id, requestor_id, "update flow result") + .await?; + let _ = self.redis.load_flow(context_id, flow_id).await?; + self.redis + .update_flow_result_merge(context_id, flow_id, patch) + .await + } + + pub async fn update_flow_jobs_add_remove( + &self, + context_id: u32, + requestor_id: u32, + flow_id: u32, + add: Vec, + remove: Vec, + ) -> Result<(), BoxError> { + self.require_admin(context_id, requestor_id, "update flow jobs") + .await?; + let flow = self.redis.load_flow(context_id, flow_id).await?; + let mut set: std::collections::BTreeSet = flow.jobs().iter().copied().collect(); + for a in add { + set.insert(a); + } + for r in remove { + set.remove(&r); + } + let new_jobs: Vec = set.into_iter().collect(); + self.redis + .update_flow_jobs_set(context_id, flow_id, new_jobs) + .await + } + + pub async fn update_job_env_vars_merge( + &self, + context_id: u32, + requestor_id: u32, + caller_id: u32, + job_id: u32, + patch: HashMap, + ) -> Result<(), BoxError> { + self.require_admin(context_id, requestor_id, "update job env_vars") + .await?; + let _ = self.redis.load_job(context_id, caller_id, job_id).await?; + self.redis + .update_job_env_vars_merge(context_id, caller_id, job_id, patch) + .await + } + + pub async fn update_job_result_merge( + &self, + context_id: u32, + requestor_id: u32, + caller_id: u32, + job_id: u32, + patch: HashMap, + ) -> Result<(), BoxError> { + // Allow if admin OR executor + let ctx = self.redis.load_context(context_id).await?; + let is_admin = context_has_role(&ctx, "admins", requestor_id)?; + let is_exec = context_has_role(&ctx, "executors", requestor_id)?; + if !(is_admin || is_exec) { + return Err(Box::new(PermissionDeniedError { + actor_id: requestor_id, + context_id, + action: "update job result".to_string(), + })); + } + let _ = self.redis.load_job(context_id, caller_id, job_id).await?; + self.redis + .update_job_result_merge(context_id, caller_id, job_id, patch) + .await + } + + pub async fn append_message_logs( + &self, + context_id: u32, + caller_id: u32, + id: u32, + new_logs: Vec, + ) -> Result<(), BoxError> { + let _ = self.redis.load_message(context_id, caller_id, id).await?; + self.redis + .append_message_logs(context_id, caller_id, id, new_logs) + .await + } } +// ----------------------------- +// Existence checks (strict create) and permissions +// ----------------------------- +impl AppService { + async fn ensure_context_not_exists(&self, id: u32) -> Result<(), BoxError> { + match self.redis.load_context(id).await { + Ok(_) => Err(Box::new(AlreadyExistsError { + key: format!("context:{}", id), + })), + Err(e) => { + if contains_key_not_found(&e) { + Ok(()) + } else { + Err(e) + } + } + } + } + + async fn ensure_actor_not_exists(&self, db: u32, id: u32) -> Result<(), BoxError> { + match self.redis.load_actor(db, id).await { + Ok(_) => Err(Box::new(AlreadyExistsError { + key: format!("actor:{}", id), + })), + Err(e) => { + if contains_key_not_found(&e) { + Ok(()) + } else { + Err(e) + } + } + } + } + + async fn ensure_runner_not_exists(&self, db: u32, id: u32) -> Result<(), BoxError> { + match self.redis.load_runner(db, id).await { + Ok(_) => Err(Box::new(AlreadyExistsError { + key: format!("runner:{}", id), + })), + Err(e) => { + if contains_key_not_found(&e) { + Ok(()) + } else { + Err(e) + } + } + } + } + + async fn ensure_flow_not_exists(&self, db: u32, id: u32) -> Result<(), BoxError> { + match self.redis.load_flow(db, id).await { + Ok(_) => Err(Box::new(AlreadyExistsError { + key: format!("flow:{}", id), + })), + Err(e) => { + if contains_key_not_found(&e) { + Ok(()) + } else { + Err(e) + } + } + } + } + + async fn ensure_job_not_exists( + &self, + db: u32, + caller_id: u32, + id: u32, + ) -> Result<(), BoxError> { + match self.redis.load_job(db, caller_id, id).await { + Ok(_) => Err(Box::new(AlreadyExistsError { + key: format!("job:{}:{}", caller_id, id), + })), + Err(e) => { + if contains_key_not_found(&e) { + Ok(()) + } else { + Err(e) + } + } + } + } + + async fn ensure_message_not_exists( + &self, + db: u32, + caller_id: u32, + id: u32, + ) -> Result<(), BoxError> { + match self.redis.load_message(db, caller_id, id).await { + Ok(_) => Err(Box::new(AlreadyExistsError { + key: format!("message:{}:{}", caller_id, id), + })), + Err(e) => { + if contains_key_not_found(&e) { + Ok(()) + } else { + Err(e) + } + } + } + } + + async fn require_admin( + &self, + context_id: u32, + actor_id: u32, + action: &str, + ) -> Result<(), BoxError> { + let ctx = self.redis.load_context(context_id).await?; + let ok = context_has_role(&ctx, "admins", actor_id)?; + if !ok { + return Err(Box::new(PermissionDeniedError { + actor_id, + context_id, + action: action.to_string(), + })); + } + Ok(()) + } + + async fn require_executor( + &self, + context_id: u32, + actor_id: u32, + action: &str, + ) -> Result<(), BoxError> { + let ctx = self.redis.load_context(context_id).await?; + let ok = context_has_role(&ctx, "executors", actor_id)?; + if !ok { + return Err(Box::new(PermissionDeniedError { + actor_id, + context_id, + action: action.to_string(), + })); + } + Ok(()) + } +} diff --git a/src/storage/redis.rs b/src/storage/redis.rs index 7dbe60e..c0b7e11 100644 --- a/src/storage/redis.rs +++ b/src/storage/redis.rs @@ -6,7 +6,9 @@ use serde::de::DeserializeOwned; use serde_json::{Map as JsonMap, Value}; use tokio::sync::Mutex; -use crate::models::{Actor, Context, Flow, Job, JobStatus, Message, Runner}; +use crate::models::{ + Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageStatus, Runner, +}; type Result = std::result::Result>; @@ -303,4 +305,232 @@ impl RedisDriver { let key = Self::message_key(caller_id, id); self.hget_model(db, &key).await } + + // ----------------------------- + // Partial update helpers + // ----------------------------- + + /// Flow: update only status and updated_at + pub async fn update_flow_status(&self, db: u32, id: u32, status: FlowStatus) -> Result<()> { + let mut cm = self.manager_for_db(db).await?; + let key = Self::flow_key(id); + + let status_str = match serde_json::to_value(&status)? { + Value::String(s) => s, + v => v.to_string(), + }; + let ts = crate::time::current_timestamp(); + + let pairs = vec![ + ("status".to_string(), status_str), + ("updated_at".to_string(), ts.to_string()), + ]; + let _: usize = cm.hset_multiple(key, &pairs).await?; + Ok(()) + } + + /// Message: update only status and updated_at + pub async fn update_message_status( + &self, + db: u32, + caller_id: u32, + id: u32, + status: MessageStatus, + ) -> Result<()> { + let mut cm = self.manager_for_db(db).await?; + let key = Self::message_key(caller_id, id); + + let status_str = match serde_json::to_value(&status)? { + Value::String(s) => s, + v => v.to_string(), + }; + let ts = crate::time::current_timestamp(); + + let pairs = vec![ + ("status".to_string(), status_str), + ("updated_at".to_string(), ts.to_string()), + ]; + let _: usize = cm.hset_multiple(key, &pairs).await?; + Ok(()) + } + + /// Flow: merge env_vars map and bump updated_at + pub async fn update_flow_env_vars_merge( + &self, + db: u32, + id: u32, + patch: StdHashMap, + ) -> Result<()> { + let mut cm = self.manager_for_db(db).await?; + let key = Self::flow_key(id); + + let current: Option = cm.hget(&key, "env_vars").await.ok(); + let mut obj = match current + .and_then(|s| serde_json::from_str::(&s).ok()) + .and_then(|v| v.as_object().cloned()) + { + Some(m) => m, + None => JsonMap::new(), + }; + + for (k, v) in patch { + obj.insert(k, Value::String(v)); + } + + let env_vars_str = Value::Object(obj).to_string(); + let ts = crate::time::current_timestamp(); + let pairs = vec![ + ("env_vars".to_string(), env_vars_str), + ("updated_at".to_string(), ts.to_string()), + ]; + let _: usize = cm.hset_multiple(key, &pairs).await?; + Ok(()) + } + + /// Flow: merge result map and bump updated_at + pub async fn update_flow_result_merge( + &self, + db: u32, + id: u32, + patch: StdHashMap, + ) -> Result<()> { + let mut cm = self.manager_for_db(db).await?; + let key = Self::flow_key(id); + + let current: Option = cm.hget(&key, "result").await.ok(); + let mut obj = match current + .and_then(|s| serde_json::from_str::(&s).ok()) + .and_then(|v| v.as_object().cloned()) + { + Some(m) => m, + None => JsonMap::new(), + }; + + for (k, v) in patch { + obj.insert(k, Value::String(v)); + } + + let result_str = Value::Object(obj).to_string(); + let ts = crate::time::current_timestamp(); + let pairs = vec![ + ("result".to_string(), result_str), + ("updated_at".to_string(), ts.to_string()), + ]; + let _: usize = cm.hset_multiple(key, &pairs).await?; + Ok(()) + } + + /// Job: merge env_vars map and bump updated_at + pub async fn update_job_env_vars_merge( + &self, + db: u32, + caller_id: u32, + id: u32, + patch: StdHashMap, + ) -> Result<()> { + let mut cm = self.manager_for_db(db).await?; + let key = Self::job_key(caller_id, id); + + let current: Option = cm.hget(&key, "env_vars").await.ok(); + let mut obj = match current + .and_then(|s| serde_json::from_str::(&s).ok()) + .and_then(|v| v.as_object().cloned()) + { + Some(m) => m, + None => JsonMap::new(), + }; + + for (k, v) in patch { + obj.insert(k, Value::String(v)); + } + + let env_vars_str = Value::Object(obj).to_string(); + let ts = crate::time::current_timestamp(); + let pairs = vec![ + ("env_vars".to_string(), env_vars_str), + ("updated_at".to_string(), ts.to_string()), + ]; + let _: usize = cm.hset_multiple(key, &pairs).await?; + Ok(()) + } + + /// Job: merge result map and bump updated_at + pub async fn update_job_result_merge( + &self, + db: u32, + caller_id: u32, + id: u32, + patch: StdHashMap, + ) -> Result<()> { + let mut cm = self.manager_for_db(db).await?; + let key = Self::job_key(caller_id, id); + + let current: Option = cm.hget(&key, "result").await.ok(); + let mut obj = match current + .and_then(|s| serde_json::from_str::(&s).ok()) + .and_then(|v| v.as_object().cloned()) + { + Some(m) => m, + None => JsonMap::new(), + }; + + for (k, v) in patch { + obj.insert(k, Value::String(v)); + } + + let result_str = Value::Object(obj).to_string(); + let ts = crate::time::current_timestamp(); + let pairs = vec![ + ("result".to_string(), result_str), + ("updated_at".to_string(), ts.to_string()), + ]; + let _: usize = cm.hset_multiple(key, &pairs).await?; + Ok(()) + } + + /// Flow: set jobs list and bump updated_at + pub async fn update_flow_jobs_set(&self, db: u32, id: u32, new_jobs: Vec) -> Result<()> { + let mut cm = self.manager_for_db(db).await?; + let key = Self::flow_key(id); + + let jobs_str = serde_json::to_string(&new_jobs)?; + let ts = crate::time::current_timestamp(); + let pairs = vec![ + ("jobs".to_string(), jobs_str), + ("updated_at".to_string(), ts.to_string()), + ]; + let _: usize = cm.hset_multiple(key, &pairs).await?; + Ok(()) + } + + /// Message: append logs (no dedup) and bump updated_at + pub async fn append_message_logs( + &self, + db: u32, + caller_id: u32, + id: u32, + new_logs: Vec, + ) -> Result<()> { + let mut cm = self.manager_for_db(db).await?; + let key = Self::message_key(caller_id, id); + + let current: Option = cm.hget(&key, "logs").await.ok(); + let mut arr: Vec = current + .and_then(|s| serde_json::from_str::(&s).ok()) + .and_then(|v| v.as_array().cloned()) + .unwrap_or_default(); + + for l in new_logs { + arr.push(Value::String(l)); + } + + let logs_str = Value::Array(arr).to_string(); + let ts = crate::time::current_timestamp(); + let pairs = vec![ + ("logs".to_string(), logs_str), + ("updated_at".to_string(), ts.to_string()), + ]; + let _: usize = cm.hset_multiple(key, &pairs).await?; + Ok(()) + } }