Add validation for service methods
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
		@@ -8,7 +8,7 @@ mod script_type;
 | 
			
		||||
 | 
			
		||||
pub use actor::Actor;
 | 
			
		||||
pub use context::Context;
 | 
			
		||||
pub use flow::Flow;
 | 
			
		||||
pub use flow::{Flow, FlowStatus};
 | 
			
		||||
pub use job::{Job, JobStatus};
 | 
			
		||||
pub use message::{Message, MessageFormatType, MessageStatus, MessageType};
 | 
			
		||||
pub use runner::Runner;
 | 
			
		||||
 
 | 
			
		||||
@@ -5,13 +5,13 @@ use crate::time::Timestamp;
 | 
			
		||||
#[derive(Serialize, Deserialize, Clone)]
 | 
			
		||||
pub struct Context {
 | 
			
		||||
    /// Redis DB to use
 | 
			
		||||
    id: u32,
 | 
			
		||||
    pub id: u32,
 | 
			
		||||
    /// Actor ids which have admin rights on this context
 | 
			
		||||
    admins: Vec<u32>,
 | 
			
		||||
    pub admins: Vec<u32>,
 | 
			
		||||
    /// Actor ids which can read the context info
 | 
			
		||||
    readers: Vec<u32>,
 | 
			
		||||
    pub readers: Vec<u32>,
 | 
			
		||||
    /// Actor ids which can execute jobs in this context
 | 
			
		||||
    executors: Vec<u32>,
 | 
			
		||||
    created_at: Timestamp,
 | 
			
		||||
    updated_at: Timestamp,
 | 
			
		||||
    pub executors: Vec<u32>,
 | 
			
		||||
    pub created_at: Timestamp,
 | 
			
		||||
    pub updated_at: Timestamp,
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -7,25 +7,26 @@ use crate::time::Timestamp;
 | 
			
		||||
#[derive(Serialize, Deserialize, Clone)]
 | 
			
		||||
pub struct Flow {
 | 
			
		||||
    /// Job Id set tby the actor which created it
 | 
			
		||||
    id: u32,
 | 
			
		||||
    pub id: u32,
 | 
			
		||||
    /// Actor Id who created this job
 | 
			
		||||
    caller_id: u32,
 | 
			
		||||
    pub caller_id: u32,
 | 
			
		||||
    /// The context in which this job is executed
 | 
			
		||||
    context_id: u32,
 | 
			
		||||
    pub context_id: u32,
 | 
			
		||||
    /// List of jobs which make up the flow
 | 
			
		||||
    jobs: Vec<u32>,
 | 
			
		||||
    pub jobs: Vec<u32>,
 | 
			
		||||
    /// Environment variables, passed to every job when executed
 | 
			
		||||
    env_vars: HashMap<String, String>,
 | 
			
		||||
    pub env_vars: HashMap<String, String>,
 | 
			
		||||
    /// The result of the flow
 | 
			
		||||
    result: HashMap<String, String>,
 | 
			
		||||
    created_at: Timestamp,
 | 
			
		||||
    updated_at: Timestamp,
 | 
			
		||||
    status: FlowStatus,
 | 
			
		||||
    pub result: HashMap<String, String>,
 | 
			
		||||
    pub created_at: Timestamp,
 | 
			
		||||
    pub updated_at: Timestamp,
 | 
			
		||||
    pub status: FlowStatus,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// The status of a flow
 | 
			
		||||
#[derive(Serialize, Deserialize, Clone)]
 | 
			
		||||
#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
 | 
			
		||||
pub enum FlowStatus {
 | 
			
		||||
    Created,
 | 
			
		||||
    Dispatched,
 | 
			
		||||
    Started,
 | 
			
		||||
    Error,
 | 
			
		||||
 
 | 
			
		||||
@@ -7,25 +7,25 @@ use crate::{models::ScriptType, time::Timestamp};
 | 
			
		||||
#[derive(Clone, Serialize, Deserialize)]
 | 
			
		||||
pub struct Job {
 | 
			
		||||
    /// Job Id, this is given by the actor who created the job
 | 
			
		||||
    id: u32,
 | 
			
		||||
    pub id: u32,
 | 
			
		||||
    /// Actor ID which created this job
 | 
			
		||||
    caller_id: u32,
 | 
			
		||||
    pub caller_id: u32,
 | 
			
		||||
    /// Context in which the job is executed
 | 
			
		||||
    context_id: u32,
 | 
			
		||||
    script: String,
 | 
			
		||||
    script_type: ScriptType,
 | 
			
		||||
    pub context_id: u32,
 | 
			
		||||
    pub script: String,
 | 
			
		||||
    pub script_type: ScriptType,
 | 
			
		||||
    /// Timeout in seconds for this job
 | 
			
		||||
    timeout: u32,
 | 
			
		||||
    pub timeout: u32,
 | 
			
		||||
    /// Max amount of times to retry this job
 | 
			
		||||
    retries: u8,
 | 
			
		||||
    env_vars: HashMap<String, String>,
 | 
			
		||||
    result: HashMap<String, String>,
 | 
			
		||||
    prerequisites: Vec<String>,
 | 
			
		||||
    pub retries: u8,
 | 
			
		||||
    pub env_vars: HashMap<String, String>,
 | 
			
		||||
    pub result: HashMap<String, String>,
 | 
			
		||||
    pub prerequisites: Vec<String>,
 | 
			
		||||
    /// Ids of jobs this job depends on, i.e. this job can't start until those have finished
 | 
			
		||||
    depends: Vec<u32>,
 | 
			
		||||
    created_at: Timestamp,
 | 
			
		||||
    updated_at: Timestamp,
 | 
			
		||||
    status: JobStatus,
 | 
			
		||||
    pub depends: Vec<u32>,
 | 
			
		||||
    pub created_at: Timestamp,
 | 
			
		||||
    pub updated_at: Timestamp,
 | 
			
		||||
    pub status: JobStatus,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)]
 | 
			
		||||
 
 | 
			
		||||
@@ -8,25 +8,25 @@ use crate::{
 | 
			
		||||
#[derive(Clone, Serialize, Deserialize)]
 | 
			
		||||
pub struct Message {
 | 
			
		||||
    /// Unique ID for the message, set by the caller
 | 
			
		||||
    id: u32,
 | 
			
		||||
    pub id: u32,
 | 
			
		||||
    /// Id of the actor who sent this message
 | 
			
		||||
    caller_id: u32,
 | 
			
		||||
    pub caller_id: u32,
 | 
			
		||||
    /// Id of the context in which this message was sent
 | 
			
		||||
    context_id: u32,
 | 
			
		||||
    message: String,
 | 
			
		||||
    message_type: ScriptType,
 | 
			
		||||
    message_format_type: MessageFormatType,
 | 
			
		||||
    pub context_id: u32,
 | 
			
		||||
    pub message: String,
 | 
			
		||||
    pub message_type: ScriptType,
 | 
			
		||||
    pub message_format_type: MessageFormatType,
 | 
			
		||||
    /// Seconds for the message to arrive at the destination
 | 
			
		||||
    timeout: u32,
 | 
			
		||||
    pub timeout: u32,
 | 
			
		||||
    /// Seconds for the receiver to acknowledge receipt of the message
 | 
			
		||||
    timeout_ack: u32,
 | 
			
		||||
    pub timeout_ack: u32,
 | 
			
		||||
    /// Seconds for the receiver to send us a reply
 | 
			
		||||
    timeout_result: u32,
 | 
			
		||||
    job: Vec<Job>,
 | 
			
		||||
    logs: Vec<Log>,
 | 
			
		||||
    created_at: Timestamp,
 | 
			
		||||
    updated_at: Timestamp,
 | 
			
		||||
    status: MessageStatus,
 | 
			
		||||
    pub timeout_result: u32,
 | 
			
		||||
    pub job: Vec<Job>,
 | 
			
		||||
    pub logs: Vec<Log>,
 | 
			
		||||
    pub created_at: Timestamp,
 | 
			
		||||
    pub updated_at: Timestamp,
 | 
			
		||||
    pub status: MessageStatus,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug, Clone, Serialize, Deserialize)]
 | 
			
		||||
@@ -36,7 +36,7 @@ pub enum MessageType {
 | 
			
		||||
    Mail,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug, Clone, Serialize, Deserialize)]
 | 
			
		||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
 | 
			
		||||
pub enum MessageStatus {
 | 
			
		||||
    Dispatched,
 | 
			
		||||
    Acknowledged,
 | 
			
		||||
 
 | 
			
		||||
@@ -6,17 +6,17 @@ use crate::time::Timestamp;
 | 
			
		||||
 | 
			
		||||
#[derive(Serialize, Deserialize, Clone)]
 | 
			
		||||
pub struct Runner {
 | 
			
		||||
    id: u32,
 | 
			
		||||
    pub id: u32,
 | 
			
		||||
    /// Mycelium public key
 | 
			
		||||
    pubkey: String,
 | 
			
		||||
    pub pubkey: String,
 | 
			
		||||
    /// Mycelium address
 | 
			
		||||
    address: IpAddr,
 | 
			
		||||
    pub address: IpAddr,
 | 
			
		||||
    /// Needs to be set by the runner, usually `runner<runnerid`
 | 
			
		||||
    topic: String,
 | 
			
		||||
    pub topic: String,
 | 
			
		||||
    /// If this is true, the runner also listens on a local redis queue
 | 
			
		||||
    local: bool,
 | 
			
		||||
    created_at: Timestamp,
 | 
			
		||||
    updated_at: Timestamp,
 | 
			
		||||
    pub local: bool,
 | 
			
		||||
    pub created_at: Timestamp,
 | 
			
		||||
    pub updated_at: Timestamp,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Serialize, Deserialize, Clone)]
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										228
									
								
								src/rpc.rs
									
									
									
									
									
								
							
							
						
						
									
										228
									
								
								src/rpc.rs
									
									
									
									
									
								
							@@ -14,7 +14,10 @@ use serde_json::{Value, json};
 | 
			
		||||
 | 
			
		||||
use crate::{
 | 
			
		||||
    dag::{DagError, FlowDag},
 | 
			
		||||
    models::{Actor, Context, Flow, Job, Message, MessageFormatType, Runner, ScriptType},
 | 
			
		||||
    models::{
 | 
			
		||||
        Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageFormatType,
 | 
			
		||||
        MessageStatus, Runner, ScriptType,
 | 
			
		||||
    },
 | 
			
		||||
    service::AppService,
 | 
			
		||||
    time::current_timestamp,
 | 
			
		||||
};
 | 
			
		||||
@@ -114,25 +117,24 @@ pub struct ContextCreate {
 | 
			
		||||
    pub executors: Vec<u32>,
 | 
			
		||||
}
 | 
			
		||||
impl ContextCreate {
 | 
			
		||||
    pub fn into_domain(self) -> Result<Context, String> {
 | 
			
		||||
    pub fn into_domain(self) -> Context {
 | 
			
		||||
        let ts = current_timestamp();
 | 
			
		||||
        let mut v = serde_json::Map::new();
 | 
			
		||||
        v.insert("id".to_string(), Value::from(self.id));
 | 
			
		||||
        v.insert(
 | 
			
		||||
            "admins".to_string(),
 | 
			
		||||
            serde_json::to_value(self.admins).unwrap(),
 | 
			
		||||
        );
 | 
			
		||||
        v.insert(
 | 
			
		||||
            "readers".to_string(),
 | 
			
		||||
            serde_json::to_value(self.readers).unwrap(),
 | 
			
		||||
        );
 | 
			
		||||
        v.insert(
 | 
			
		||||
            "executors".to_string(),
 | 
			
		||||
            serde_json::to_value(self.executors).unwrap(),
 | 
			
		||||
        );
 | 
			
		||||
        v.insert("created_at".to_string(), Value::from(ts));
 | 
			
		||||
        v.insert("updated_at".to_string(), Value::from(ts));
 | 
			
		||||
        serde_json::from_value(Value::Object(v)).map_err(|e| e.to_string())
 | 
			
		||||
 | 
			
		||||
        let ContextCreate {
 | 
			
		||||
            id,
 | 
			
		||||
            admins,
 | 
			
		||||
            readers,
 | 
			
		||||
            executors,
 | 
			
		||||
        } = self;
 | 
			
		||||
 | 
			
		||||
        Context {
 | 
			
		||||
            id,
 | 
			
		||||
            admins,
 | 
			
		||||
            readers,
 | 
			
		||||
            executors,
 | 
			
		||||
            created_at: ts,
 | 
			
		||||
            updated_at: ts,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -145,18 +147,26 @@ pub struct RunnerCreate {
 | 
			
		||||
    pub local: bool,
 | 
			
		||||
}
 | 
			
		||||
impl RunnerCreate {
 | 
			
		||||
    pub fn into_domain(self) -> Result<Runner, String> {
 | 
			
		||||
    pub fn into_domain(self) -> Runner {
 | 
			
		||||
        let ts = current_timestamp();
 | 
			
		||||
        let v = json!({
 | 
			
		||||
            "id": self.id,
 | 
			
		||||
            "pubkey": self.pubkey,
 | 
			
		||||
            "address": self.address,
 | 
			
		||||
            "topic": self.topic,
 | 
			
		||||
            "local": self.local,
 | 
			
		||||
            "created_at": ts,
 | 
			
		||||
            "updated_at": ts,
 | 
			
		||||
        });
 | 
			
		||||
        serde_json::from_value(v).map_err(|e| e.to_string())
 | 
			
		||||
 | 
			
		||||
        let RunnerCreate {
 | 
			
		||||
            id,
 | 
			
		||||
            pubkey,
 | 
			
		||||
            address,
 | 
			
		||||
            topic,
 | 
			
		||||
            local,
 | 
			
		||||
        } = self;
 | 
			
		||||
 | 
			
		||||
        Runner {
 | 
			
		||||
            id,
 | 
			
		||||
            pubkey,
 | 
			
		||||
            address,
 | 
			
		||||
            topic,
 | 
			
		||||
            local,
 | 
			
		||||
            created_at: ts,
 | 
			
		||||
            updated_at: ts,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -167,24 +177,31 @@ pub struct FlowCreate {
 | 
			
		||||
    pub context_id: u32,
 | 
			
		||||
    pub jobs: Vec<u32>,
 | 
			
		||||
    pub env_vars: HashMap<String, String>,
 | 
			
		||||
    #[serde(default)]
 | 
			
		||||
    pub result: Option<HashMap<String, String>>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl FlowCreate {
 | 
			
		||||
    pub fn into_domain(self) -> Result<Flow, String> {
 | 
			
		||||
    pub fn into_domain(self) -> Flow {
 | 
			
		||||
        let ts = current_timestamp();
 | 
			
		||||
        let v = json!({
 | 
			
		||||
            "id": self.id,
 | 
			
		||||
            "caller_id": self.caller_id,
 | 
			
		||||
            "context_id": self.context_id,
 | 
			
		||||
            "jobs": self.jobs,
 | 
			
		||||
            "env_vars": self.env_vars,
 | 
			
		||||
            "result": self.result.unwrap_or_default(),
 | 
			
		||||
            "created_at": ts,
 | 
			
		||||
            "updated_at": ts,
 | 
			
		||||
            "status": "Dispatched",
 | 
			
		||||
        });
 | 
			
		||||
        serde_json::from_value(v).map_err(|e| e.to_string())
 | 
			
		||||
 | 
			
		||||
        let FlowCreate {
 | 
			
		||||
            id,
 | 
			
		||||
            caller_id,
 | 
			
		||||
            context_id,
 | 
			
		||||
            jobs,
 | 
			
		||||
            env_vars,
 | 
			
		||||
        } = self;
 | 
			
		||||
 | 
			
		||||
        Flow {
 | 
			
		||||
            id,
 | 
			
		||||
            caller_id,
 | 
			
		||||
            context_id,
 | 
			
		||||
            jobs,
 | 
			
		||||
            env_vars,
 | 
			
		||||
            result: HashMap::new(),
 | 
			
		||||
            created_at: ts,
 | 
			
		||||
            updated_at: ts,
 | 
			
		||||
            status: FlowStatus::Created,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -198,31 +215,43 @@ pub struct JobCreate {
 | 
			
		||||
    pub timeout: u32,
 | 
			
		||||
    pub retries: u8,
 | 
			
		||||
    pub env_vars: HashMap<String, String>,
 | 
			
		||||
    #[serde(default)]
 | 
			
		||||
    pub result: Option<HashMap<String, String>>,
 | 
			
		||||
    pub prerequisites: Vec<String>,
 | 
			
		||||
    pub depends: Vec<u32>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl JobCreate {
 | 
			
		||||
    pub fn into_domain(self) -> Result<Job, String> {
 | 
			
		||||
    pub fn into_domain(self) -> Job {
 | 
			
		||||
        let ts = current_timestamp();
 | 
			
		||||
        let v = json!({
 | 
			
		||||
            "id": self.id,
 | 
			
		||||
            "caller_id": self.caller_id,
 | 
			
		||||
            "context_id": self.context_id,
 | 
			
		||||
            "script": self.script,
 | 
			
		||||
            "script_type": self.script_type,
 | 
			
		||||
            "timeout": self.timeout,
 | 
			
		||||
            "retries": self.retries,
 | 
			
		||||
            "env_vars": self.env_vars,
 | 
			
		||||
            "result": self.result.unwrap_or_default(),
 | 
			
		||||
            "prerequisites": self.prerequisites,
 | 
			
		||||
            "depends": self.depends,
 | 
			
		||||
            "created_at": ts,
 | 
			
		||||
            "updated_at": ts,
 | 
			
		||||
            "status": "Dispatched",
 | 
			
		||||
        });
 | 
			
		||||
        serde_json::from_value(v).map_err(|e| e.to_string())
 | 
			
		||||
 | 
			
		||||
        let JobCreate {
 | 
			
		||||
            id,
 | 
			
		||||
            caller_id,
 | 
			
		||||
            context_id,
 | 
			
		||||
            script,
 | 
			
		||||
            script_type,
 | 
			
		||||
            timeout,
 | 
			
		||||
            retries,
 | 
			
		||||
            env_vars,
 | 
			
		||||
            prerequisites,
 | 
			
		||||
            depends,
 | 
			
		||||
        } = self;
 | 
			
		||||
 | 
			
		||||
        Job {
 | 
			
		||||
            id,
 | 
			
		||||
            caller_id,
 | 
			
		||||
            context_id,
 | 
			
		||||
            script,
 | 
			
		||||
            script_type,
 | 
			
		||||
            timeout,
 | 
			
		||||
            retries,
 | 
			
		||||
            env_vars,
 | 
			
		||||
            result: HashMap::new(),
 | 
			
		||||
            prerequisites,
 | 
			
		||||
            depends,
 | 
			
		||||
            created_at: ts,
 | 
			
		||||
            updated_at: ts,
 | 
			
		||||
            status: JobStatus::WaitingForPrerequisites,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -238,37 +267,40 @@ pub struct MessageCreate {
 | 
			
		||||
    pub timeout_ack: u32,
 | 
			
		||||
    pub timeout_result: u32,
 | 
			
		||||
    pub job: Vec<JobCreate>,
 | 
			
		||||
    #[serde(default)]
 | 
			
		||||
    pub logs: Option<Vec<String>>,
 | 
			
		||||
}
 | 
			
		||||
impl MessageCreate {
 | 
			
		||||
    pub fn into_domain(self) -> Result<Message, String> {
 | 
			
		||||
    pub fn into_domain(self) -> Message {
 | 
			
		||||
        let ts = current_timestamp();
 | 
			
		||||
        let jobs: Result<Vec<Value>, String> = self
 | 
			
		||||
            .job
 | 
			
		||||
            .into_iter()
 | 
			
		||||
            .map(|j| {
 | 
			
		||||
                let jd: Job = j.into_domain()?;
 | 
			
		||||
                serde_json::to_value(jd).map_err(|e| e.to_string())
 | 
			
		||||
            })
 | 
			
		||||
            .collect();
 | 
			
		||||
        let v = json!({
 | 
			
		||||
            "id": self.id,
 | 
			
		||||
            "caller_id": self.caller_id,
 | 
			
		||||
            "context_id": self.context_id,
 | 
			
		||||
            "message": self.message,
 | 
			
		||||
            "message_type": self.message_type,
 | 
			
		||||
            "message_format_type": self.message_format_type, // "Html" | "Text" | "Md"
 | 
			
		||||
            "timeout": self.timeout,
 | 
			
		||||
            "timeout_ack": self.timeout_ack,
 | 
			
		||||
            "timeout_result": self.timeout_result,
 | 
			
		||||
            "job": jobs?,
 | 
			
		||||
            "logs": self.logs.unwrap_or_default(),
 | 
			
		||||
            "created_at": ts,
 | 
			
		||||
            "updated_at": ts,
 | 
			
		||||
            "status": "Dispatched",
 | 
			
		||||
        });
 | 
			
		||||
        serde_json::from_value(v).map_err(|e| e.to_string())
 | 
			
		||||
 | 
			
		||||
        let MessageCreate {
 | 
			
		||||
            id,
 | 
			
		||||
            caller_id,
 | 
			
		||||
            context_id,
 | 
			
		||||
            message,
 | 
			
		||||
            message_type,
 | 
			
		||||
            message_format_type,
 | 
			
		||||
            timeout,
 | 
			
		||||
            timeout_ack,
 | 
			
		||||
            timeout_result,
 | 
			
		||||
            job,
 | 
			
		||||
        } = self;
 | 
			
		||||
 | 
			
		||||
        Message {
 | 
			
		||||
            id,
 | 
			
		||||
            caller_id,
 | 
			
		||||
            context_id,
 | 
			
		||||
            message,
 | 
			
		||||
            message_type,
 | 
			
		||||
            message_format_type,
 | 
			
		||||
            timeout,
 | 
			
		||||
            timeout_ack,
 | 
			
		||||
            timeout_result,
 | 
			
		||||
            job: job.into_iter().map(JobCreate::into_domain).collect(),
 | 
			
		||||
            logs: Vec::new(),
 | 
			
		||||
            created_at: ts,
 | 
			
		||||
            updated_at: ts,
 | 
			
		||||
            status: MessageStatus::Dispatched,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -390,7 +422,7 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
 | 
			
		||||
                let state = state.clone();
 | 
			
		||||
                async move {
 | 
			
		||||
                    let p: ContextCreateParams = params.parse().map_err(invalid_params_err)?;
 | 
			
		||||
                    let ctx = p.context.into_domain().map_err(invalid_params_err)?;
 | 
			
		||||
                    let ctx = p.context.into_domain();
 | 
			
		||||
                    let ctx = state
 | 
			
		||||
                        .service
 | 
			
		||||
                        .create_context(ctx)
 | 
			
		||||
@@ -427,7 +459,7 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
 | 
			
		||||
                let state = state.clone();
 | 
			
		||||
                async move {
 | 
			
		||||
                    let p: RunnerCreateParams = params.parse().map_err(invalid_params_err)?;
 | 
			
		||||
                    let runner = p.runner.into_domain().map_err(invalid_params_err)?;
 | 
			
		||||
                    let runner = p.runner.into_domain();
 | 
			
		||||
                    let runner = state
 | 
			
		||||
                        .service
 | 
			
		||||
                        .create_runner(p.context_id, runner)
 | 
			
		||||
@@ -464,7 +496,7 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
 | 
			
		||||
                let state = state.clone();
 | 
			
		||||
                async move {
 | 
			
		||||
                    let p: FlowCreateParams = params.parse().map_err(invalid_params_err)?;
 | 
			
		||||
                    let flow = p.flow.into_domain().map_err(invalid_params_err)?;
 | 
			
		||||
                    let flow = p.flow.into_domain();
 | 
			
		||||
                    let flow = state
 | 
			
		||||
                        .service
 | 
			
		||||
                        .create_flow(p.context_id, flow)
 | 
			
		||||
@@ -518,7 +550,7 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
 | 
			
		||||
                let state = state.clone();
 | 
			
		||||
                async move {
 | 
			
		||||
                    let p: JobCreateParams = params.parse().map_err(invalid_params_err)?;
 | 
			
		||||
                    let job = p.job.into_domain().map_err(invalid_params_err)?;
 | 
			
		||||
                    let job = p.job.into_domain();
 | 
			
		||||
                    let job = state
 | 
			
		||||
                        .service
 | 
			
		||||
                        .create_job(p.context_id, job)
 | 
			
		||||
@@ -555,7 +587,7 @@ pub fn build_module(state: Arc<AppState>) -> RpcModule<()> {
 | 
			
		||||
                let state = state.clone();
 | 
			
		||||
                async move {
 | 
			
		||||
                    let p: MessageCreateParams = params.parse().map_err(invalid_params_err)?;
 | 
			
		||||
                    let message = p.message.into_domain().map_err(invalid_params_err)?;
 | 
			
		||||
                    let message = p.message.into_domain();
 | 
			
		||||
                    let message = state
 | 
			
		||||
                        .service
 | 
			
		||||
                        .create_message(p.context_id, message)
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										676
									
								
								src/service.rs
									
									
									
									
									
								
							
							
						
						
									
										676
									
								
								src/service.rs
									
									
									
									
									
								
							@@ -1,7 +1,13 @@
 | 
			
		||||
use crate::dag::{DagResult, FlowDag, build_flow_dag};
 | 
			
		||||
use crate::models::{Actor, Context, Flow, Job, JobStatus, Message, Runner};
 | 
			
		||||
use crate::models::{
 | 
			
		||||
    Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageStatus, Runner,
 | 
			
		||||
};
 | 
			
		||||
use crate::storage::RedisDriver;
 | 
			
		||||
 | 
			
		||||
use serde::Serialize;
 | 
			
		||||
use serde_json::Value;
 | 
			
		||||
use std::collections::HashMap;
 | 
			
		||||
 | 
			
		||||
pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
@@ -19,9 +25,286 @@ impl std::fmt::Display for InvalidJobStatusTransition {
 | 
			
		||||
        )
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl std::error::Error for InvalidJobStatusTransition {}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
struct ValidationError {
 | 
			
		||||
    msg: String,
 | 
			
		||||
}
 | 
			
		||||
impl ValidationError {
 | 
			
		||||
    fn new(msg: impl Into<String>) -> Self {
 | 
			
		||||
        Self { msg: msg.into() }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
impl std::fmt::Display for ValidationError {
 | 
			
		||||
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 | 
			
		||||
        write!(f, "Validation error: {}", self.msg)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
impl std::error::Error for ValidationError {}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
struct PermissionDeniedError {
 | 
			
		||||
    actor_id: u32,
 | 
			
		||||
    context_id: u32,
 | 
			
		||||
    action: String,
 | 
			
		||||
}
 | 
			
		||||
impl std::fmt::Display for PermissionDeniedError {
 | 
			
		||||
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 | 
			
		||||
        write!(
 | 
			
		||||
            f,
 | 
			
		||||
            "Permission denied: actor {} cannot {} in context {}",
 | 
			
		||||
            self.actor_id, self.action, self.context_id
 | 
			
		||||
        )
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
impl std::error::Error for PermissionDeniedError {}
 | 
			
		||||
 | 
			
		||||
#[derive(Debug)]
 | 
			
		||||
struct AlreadyExistsError {
 | 
			
		||||
    key: String,
 | 
			
		||||
}
 | 
			
		||||
impl std::fmt::Display for AlreadyExistsError {
 | 
			
		||||
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
 | 
			
		||||
        write!(f, "Already exists: {}", self.key)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
impl std::error::Error for AlreadyExistsError {}
 | 
			
		||||
 | 
			
		||||
// -----------------------------
 | 
			
		||||
// Internal helpers
 | 
			
		||||
// -----------------------------
 | 
			
		||||
 | 
			
		||||
fn as_json(model: &impl Serialize) -> Result<Value, BoxError> {
 | 
			
		||||
    Ok(serde_json::to_value(model)?)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn json_get_u32(v: &Value, key: &str) -> Result<u32, BoxError> {
 | 
			
		||||
    v.get(key)
 | 
			
		||||
        .and_then(|v| v.as_u64())
 | 
			
		||||
        .map(|x| x as u32)
 | 
			
		||||
        .ok_or_else(|| {
 | 
			
		||||
            ValidationError::new(format!("missing or invalid u32 field '{}'", key)).into()
 | 
			
		||||
        })
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn json_get_str(v: &Value, key: &str) -> Result<String, BoxError> {
 | 
			
		||||
    v.get(key)
 | 
			
		||||
        .and_then(|v| v.as_str())
 | 
			
		||||
        .map(|s| s.to_string())
 | 
			
		||||
        .ok_or_else(|| {
 | 
			
		||||
            ValidationError::new(format!("missing or invalid string field '{}'", key)).into()
 | 
			
		||||
        })
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn json_get_array(v: &Value, key: &str) -> Result<Vec<Value>, BoxError> {
 | 
			
		||||
    let arr = v
 | 
			
		||||
        .get(key)
 | 
			
		||||
        .and_then(|v| v.as_array())
 | 
			
		||||
        .ok_or_else(|| ValidationError::new(format!("missing or invalid array field '{}'", key)))?;
 | 
			
		||||
    Ok(arr.clone())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn contains_key_not_found(e: &BoxError) -> bool {
 | 
			
		||||
    e.to_string().contains("Key not found")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn has_duplicate_u32s(list: &Vec<Value>) -> bool {
 | 
			
		||||
    let mut seen = std::collections::HashSet::new();
 | 
			
		||||
    for it in list {
 | 
			
		||||
        if let Some(x) = it.as_u64()
 | 
			
		||||
            && !seen.insert(x)
 | 
			
		||||
        {
 | 
			
		||||
            return true;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn vec_u32_contains(list: &[Value], val: u32) -> bool {
 | 
			
		||||
    list.iter().any(|v| v.as_u64() == Some(val as u64))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// role = "admins" | "executors" | "readers"
 | 
			
		||||
fn context_has_role(ctx: &Context, role: &str, actor_id: u32) -> Result<bool, BoxError> {
 | 
			
		||||
    let v = as_json(ctx)?;
 | 
			
		||||
    let arr = v
 | 
			
		||||
        .get(role)
 | 
			
		||||
        .and_then(|r| r.as_array())
 | 
			
		||||
        .ok_or_else(|| ValidationError::new(format!("Context.{} missing or invalid", role)))?;
 | 
			
		||||
    Ok(arr.iter().any(|x| x.as_u64() == Some(actor_id as u64)))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// -----------------------------
 | 
			
		||||
// Validation helpers (minimal, spec-aligned)
 | 
			
		||||
// -----------------------------
 | 
			
		||||
 | 
			
		||||
fn validate_context(ctx: &Context) -> Result<(), BoxError> {
 | 
			
		||||
    let v = as_json(ctx)?;
 | 
			
		||||
    let id = json_get_u32(&v, "id")?;
 | 
			
		||||
    if id == 0 {
 | 
			
		||||
        return Err(ValidationError::new("Context.id must be > 0").into());
 | 
			
		||||
    }
 | 
			
		||||
    // admins required
 | 
			
		||||
    let admins = json_get_array(&v, "admins")?;
 | 
			
		||||
    if admins.is_empty() {
 | 
			
		||||
        return Err(ValidationError::new("Context.admins must not be empty").into());
 | 
			
		||||
    }
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn validate_actor(_context_id: u32, actor: &Actor) -> Result<(), BoxError> {
 | 
			
		||||
    let v = as_json(actor)?;
 | 
			
		||||
    let id = json_get_u32(&v, "id")?;
 | 
			
		||||
    if id == 0 {
 | 
			
		||||
        return Err(ValidationError::new("Actor.id must be > 0").into());
 | 
			
		||||
    }
 | 
			
		||||
    let pubkey = json_get_str(&v, "pubkey")?;
 | 
			
		||||
    if pubkey.trim().is_empty() {
 | 
			
		||||
        return Err(ValidationError::new("Actor.pubkey must not be empty").into());
 | 
			
		||||
    }
 | 
			
		||||
    let addr = json_get_array(&v, "address")?;
 | 
			
		||||
    if addr.is_empty() {
 | 
			
		||||
        return Err(ValidationError::new("Actor.address must not be empty").into());
 | 
			
		||||
    }
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn validate_runner(_context_id: u32, runner: &Runner) -> Result<(), BoxError> {
 | 
			
		||||
    let v = as_json(runner)?;
 | 
			
		||||
    let id = json_get_u32(&v, "id")?;
 | 
			
		||||
    if id == 0 {
 | 
			
		||||
        return Err(ValidationError::new("Runner.id must be > 0").into());
 | 
			
		||||
    }
 | 
			
		||||
    let pubkey = json_get_str(&v, "pubkey")?;
 | 
			
		||||
    if pubkey.trim().is_empty() {
 | 
			
		||||
        return Err(ValidationError::new("Runner.pubkey must not be empty").into());
 | 
			
		||||
    }
 | 
			
		||||
    let topic = json_get_str(&v, "topic")?;
 | 
			
		||||
    if topic.trim().is_empty() {
 | 
			
		||||
        return Err(ValidationError::new("Runner.topic must not be empty").into());
 | 
			
		||||
    }
 | 
			
		||||
    // address presence is ensured by serde typing; no additional validation here
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn validate_flow(context_id: u32, flow: &Flow) -> Result<(), BoxError> {
 | 
			
		||||
    let v = as_json(flow)?;
 | 
			
		||||
    let id = json_get_u32(&v, "id")?;
 | 
			
		||||
    if id == 0 {
 | 
			
		||||
        return Err(ValidationError::new("Flow.id must be > 0").into());
 | 
			
		||||
    }
 | 
			
		||||
    let ctx = json_get_u32(&v, "context_id")?;
 | 
			
		||||
    if ctx != context_id {
 | 
			
		||||
        return Err(ValidationError::new(format!(
 | 
			
		||||
            "Flow.context_id ({}) does not match path context_id ({})",
 | 
			
		||||
            ctx, context_id
 | 
			
		||||
        ))
 | 
			
		||||
        .into());
 | 
			
		||||
    }
 | 
			
		||||
    let jobs = json_get_array(&v, "jobs")?;
 | 
			
		||||
    if has_duplicate_u32s(&jobs) {
 | 
			
		||||
        return Err(ValidationError::new("Flow.jobs must not contain duplicates").into());
 | 
			
		||||
    }
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn validate_job(context_id: u32, job: &Job) -> Result<(), BoxError> {
 | 
			
		||||
    let v = as_json(job)?;
 | 
			
		||||
    let id = json_get_u32(&v, "id")?;
 | 
			
		||||
    if id == 0 {
 | 
			
		||||
        return Err(ValidationError::new("Job.id must be > 0").into());
 | 
			
		||||
    }
 | 
			
		||||
    let ctx = json_get_u32(&v, "context_id")?;
 | 
			
		||||
    if ctx != context_id {
 | 
			
		||||
        return Err(ValidationError::new(format!(
 | 
			
		||||
            "Job.context_id ({}) does not match path context_id ({})",
 | 
			
		||||
            ctx, context_id
 | 
			
		||||
        ))
 | 
			
		||||
        .into());
 | 
			
		||||
    }
 | 
			
		||||
    let script = json_get_str(&v, "script")?;
 | 
			
		||||
    if script.trim().is_empty() {
 | 
			
		||||
        return Err(ValidationError::new("Job.script must not be empty").into());
 | 
			
		||||
    }
 | 
			
		||||
    let timeout = json_get_u32(&v, "timeout")?;
 | 
			
		||||
    if timeout == 0 {
 | 
			
		||||
        return Err(ValidationError::new("Job.timeout must be > 0").into());
 | 
			
		||||
    }
 | 
			
		||||
    let depends = json_get_array(&v, "depends")?;
 | 
			
		||||
    if has_duplicate_u32s(&depends) {
 | 
			
		||||
        return Err(ValidationError::new("Job.depends must not contain duplicates").into());
 | 
			
		||||
    }
 | 
			
		||||
    if vec_u32_contains(&depends, id) {
 | 
			
		||||
        return Err(ValidationError::new("Job.depends must not include the job's own id").into());
 | 
			
		||||
    }
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn validate_message(context_id: u32, msg: &Message) -> Result<(), BoxError> {
 | 
			
		||||
    let v = as_json(msg)?;
 | 
			
		||||
    let id = json_get_u32(&v, "id")?;
 | 
			
		||||
    if id == 0 {
 | 
			
		||||
        return Err(ValidationError::new("Message.id must be > 0").into());
 | 
			
		||||
    }
 | 
			
		||||
    let ctx = json_get_u32(&v, "context_id")?;
 | 
			
		||||
    if ctx != context_id {
 | 
			
		||||
        return Err(ValidationError::new(format!(
 | 
			
		||||
            "Message.context_id ({}) does not match path context_id ({})",
 | 
			
		||||
            ctx, context_id
 | 
			
		||||
        ))
 | 
			
		||||
        .into());
 | 
			
		||||
    }
 | 
			
		||||
    let body = json_get_str(&v, "message")?;
 | 
			
		||||
    if body.trim().is_empty() {
 | 
			
		||||
        return Err(ValidationError::new("Message.message must not be empty").into());
 | 
			
		||||
    }
 | 
			
		||||
    let t = json_get_u32(&v, "timeout")?;
 | 
			
		||||
    let ta = json_get_u32(&v, "timeout_ack")?;
 | 
			
		||||
    let tr = json_get_u32(&v, "timeout_result")?;
 | 
			
		||||
    if t == 0 || ta == 0 || tr == 0 {
 | 
			
		||||
        return Err(ValidationError::new(
 | 
			
		||||
            "Message timeouts (timeout|timeout_ack|timeout_result) must be > 0",
 | 
			
		||||
        )
 | 
			
		||||
        .into());
 | 
			
		||||
    }
 | 
			
		||||
    // Validate embedded jobs minimal consistency (caller/context match)
 | 
			
		||||
    let jobs = json_get_array(&v, "job")?;
 | 
			
		||||
    let msg_caller = json_get_u32(&v, "caller_id")?;
 | 
			
		||||
    for jv in jobs {
 | 
			
		||||
        if let Some(obj) = jv.as_object() {
 | 
			
		||||
            let mut jid = 0u32;
 | 
			
		||||
            if let Some(u) = obj.get("id").and_then(|x| x.as_u64()) {
 | 
			
		||||
                jid = u as u32;
 | 
			
		||||
            }
 | 
			
		||||
            if let (Some(jctx), Some(jcaller)) = (
 | 
			
		||||
                obj.get("context_id").and_then(|x| x.as_u64()),
 | 
			
		||||
                obj.get("caller_id").and_then(|x| x.as_u64()),
 | 
			
		||||
            ) {
 | 
			
		||||
                if jctx as u32 != context_id {
 | 
			
		||||
                    return Err(ValidationError::new(format!(
 | 
			
		||||
                        "Embedded Job {} context_id mismatch ({} != {})",
 | 
			
		||||
                        jid, jctx as u32, context_id
 | 
			
		||||
                    ))
 | 
			
		||||
                    .into());
 | 
			
		||||
                }
 | 
			
		||||
                if jcaller as u32 != msg_caller {
 | 
			
		||||
                    return Err(ValidationError::new(format!(
 | 
			
		||||
                        "Embedded Job {} caller_id mismatch ({} != {})",
 | 
			
		||||
                        jid, jcaller as u32, msg_caller
 | 
			
		||||
                    ))
 | 
			
		||||
                    .into());
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// -----------------------------
 | 
			
		||||
// Service API
 | 
			
		||||
// -----------------------------
 | 
			
		||||
 | 
			
		||||
pub struct AppService {
 | 
			
		||||
    redis: RedisDriver,
 | 
			
		||||
}
 | 
			
		||||
@@ -35,6 +318,11 @@ impl AppService {
 | 
			
		||||
    // Context
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
    pub async fn create_context(&self, ctx: Context) -> Result<Context, BoxError> {
 | 
			
		||||
        validate_context(&ctx)?;
 | 
			
		||||
        // id
 | 
			
		||||
        let v = as_json(&ctx)?;
 | 
			
		||||
        let context_id = json_get_u32(&v, "id")?;
 | 
			
		||||
        self.ensure_context_not_exists(context_id).await?;
 | 
			
		||||
        self.redis.save_context(&ctx).await?;
 | 
			
		||||
        Ok(ctx)
 | 
			
		||||
    }
 | 
			
		||||
@@ -48,6 +336,10 @@ impl AppService {
 | 
			
		||||
    // Actor
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
    pub async fn create_actor(&self, context_id: u32, actor: Actor) -> Result<Actor, BoxError> {
 | 
			
		||||
        validate_actor(context_id, &actor)?;
 | 
			
		||||
        let v = as_json(&actor)?;
 | 
			
		||||
        let id = json_get_u32(&v, "id")?;
 | 
			
		||||
        self.ensure_actor_not_exists(context_id, id).await?;
 | 
			
		||||
        self.redis.save_actor(context_id, &actor).await?;
 | 
			
		||||
        Ok(actor)
 | 
			
		||||
    }
 | 
			
		||||
@@ -61,6 +353,10 @@ impl AppService {
 | 
			
		||||
    // Runner
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
    pub async fn create_runner(&self, context_id: u32, runner: Runner) -> Result<Runner, BoxError> {
 | 
			
		||||
        validate_runner(context_id, &runner)?;
 | 
			
		||||
        let v = as_json(&runner)?;
 | 
			
		||||
        let id = json_get_u32(&v, "id")?;
 | 
			
		||||
        self.ensure_runner_not_exists(context_id, id).await?;
 | 
			
		||||
        self.redis.save_runner(context_id, &runner).await?;
 | 
			
		||||
        Ok(runner)
 | 
			
		||||
    }
 | 
			
		||||
@@ -74,6 +370,18 @@ impl AppService {
 | 
			
		||||
    // Flow
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
    pub async fn create_flow(&self, context_id: u32, flow: Flow) -> Result<Flow, BoxError> {
 | 
			
		||||
        validate_flow(context_id, &flow)?;
 | 
			
		||||
 | 
			
		||||
        // Permission: require that flow.caller_id is admin in the context
 | 
			
		||||
        let v = as_json(&flow)?;
 | 
			
		||||
        let fid = json_get_u32(&v, "id")?;
 | 
			
		||||
        let caller_id = json_get_u32(&v, "caller_id")?;
 | 
			
		||||
        self.require_admin(context_id, caller_id, "create flow")
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        self.ensure_flow_not_exists(context_id, fid).await?;
 | 
			
		||||
        // Require that the context exists
 | 
			
		||||
        let _ = self.redis.load_context(context_id).await?;
 | 
			
		||||
        self.redis.save_flow(context_id, &flow).await?;
 | 
			
		||||
        Ok(flow)
 | 
			
		||||
    }
 | 
			
		||||
@@ -91,6 +399,12 @@ impl AppService {
 | 
			
		||||
    // Job
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
    pub async fn create_job(&self, context_id: u32, job: Job) -> Result<Job, BoxError> {
 | 
			
		||||
        validate_job(context_id, &job)?;
 | 
			
		||||
        let v = as_json(&job)?;
 | 
			
		||||
        let id = json_get_u32(&v, "id")?;
 | 
			
		||||
        let caller_id = json_get_u32(&v, "caller_id")?;
 | 
			
		||||
        self.ensure_job_not_exists(context_id, caller_id, id)
 | 
			
		||||
            .await?;
 | 
			
		||||
        self.redis.save_job(context_id, &job).await?;
 | 
			
		||||
        Ok(job)
 | 
			
		||||
    }
 | 
			
		||||
@@ -117,10 +431,13 @@ impl AppService {
 | 
			
		||||
    pub async fn update_job_status(
 | 
			
		||||
        &self,
 | 
			
		||||
        context_id: u32,
 | 
			
		||||
        executor_id: u32,
 | 
			
		||||
        caller_id: u32,
 | 
			
		||||
        id: u32,
 | 
			
		||||
        new_status: JobStatus,
 | 
			
		||||
    ) -> Result<(), BoxError> {
 | 
			
		||||
        self.require_executor(context_id, executor_id, "update job status")
 | 
			
		||||
            .await?;
 | 
			
		||||
        let job = self.redis.load_job(context_id, caller_id, id).await?;
 | 
			
		||||
        let current = job.status();
 | 
			
		||||
 | 
			
		||||
@@ -151,8 +468,10 @@ impl AppService {
 | 
			
		||||
        self.redis
 | 
			
		||||
            .update_job_status(context_id, caller_id, id, new_status)
 | 
			
		||||
            .await?;
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
    // Message
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
@@ -161,6 +480,12 @@ impl AppService {
 | 
			
		||||
        context_id: u32,
 | 
			
		||||
        message: Message,
 | 
			
		||||
    ) -> Result<Message, BoxError> {
 | 
			
		||||
        validate_message(context_id, &message)?;
 | 
			
		||||
        let v = as_json(&message)?;
 | 
			
		||||
        let id = json_get_u32(&v, "id")?;
 | 
			
		||||
        let caller_id = json_get_u32(&v, "caller_id")?;
 | 
			
		||||
        self.ensure_message_not_exists(context_id, caller_id, id)
 | 
			
		||||
            .await?;
 | 
			
		||||
        self.redis.save_message(context_id, &message).await?;
 | 
			
		||||
        Ok(message)
 | 
			
		||||
    }
 | 
			
		||||
@@ -174,5 +499,352 @@ impl AppService {
 | 
			
		||||
        let msg = self.redis.load_message(context_id, caller_id, id).await?;
 | 
			
		||||
        Ok(msg)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn update_flow_status(
 | 
			
		||||
        &self,
 | 
			
		||||
        context_id: u32,
 | 
			
		||||
        requestor_id: u32,
 | 
			
		||||
        id: u32,
 | 
			
		||||
        new_status: FlowStatus,
 | 
			
		||||
    ) -> Result<(), BoxError> {
 | 
			
		||||
        self.require_admin(context_id, requestor_id, "update flow status")
 | 
			
		||||
            .await?;
 | 
			
		||||
        let flow = self.redis.load_flow(context_id, id).await?;
 | 
			
		||||
        let v = as_json(&flow)?;
 | 
			
		||||
        let cur_raw = v
 | 
			
		||||
            .get("status")
 | 
			
		||||
            .cloned()
 | 
			
		||||
            .unwrap_or(Value::String("Dispatched".to_string()));
 | 
			
		||||
 | 
			
		||||
        let cur = match cur_raw {
 | 
			
		||||
            Value::String(s) if s == "Dispatched" => FlowStatus::Dispatched,
 | 
			
		||||
            Value::String(s) if s == "Started" => FlowStatus::Started,
 | 
			
		||||
            Value::String(s) if s == "Error" => FlowStatus::Error,
 | 
			
		||||
            Value::String(s) if s == "Finished" => FlowStatus::Finished,
 | 
			
		||||
            _ => FlowStatus::Dispatched,
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        if cur == new_status {
 | 
			
		||||
            return Ok(());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let allowed = match cur {
 | 
			
		||||
            FlowStatus::Created => matches!(new_status, FlowStatus::Dispatched | FlowStatus::Error),
 | 
			
		||||
            FlowStatus::Dispatched => matches!(new_status, FlowStatus::Started | FlowStatus::Error),
 | 
			
		||||
            FlowStatus::Started => matches!(new_status, FlowStatus::Finished | FlowStatus::Error),
 | 
			
		||||
            FlowStatus::Finished | FlowStatus::Error => false,
 | 
			
		||||
        };
 | 
			
		||||
        if !allowed {
 | 
			
		||||
            return Err(ValidationError::new(format!(
 | 
			
		||||
                "Invalid flow status transition: {:?} -> {:?}",
 | 
			
		||||
                cur, new_status
 | 
			
		||||
            ))
 | 
			
		||||
            .into());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        self.redis
 | 
			
		||||
            .update_flow_status(context_id, id, new_status)
 | 
			
		||||
            .await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn update_message_status(
 | 
			
		||||
        &self,
 | 
			
		||||
        context_id: u32,
 | 
			
		||||
        caller_id: u32,
 | 
			
		||||
        id: u32,
 | 
			
		||||
        new_status: MessageStatus,
 | 
			
		||||
    ) -> Result<(), BoxError> {
 | 
			
		||||
        let msg = self.redis.load_message(context_id, caller_id, id).await?;
 | 
			
		||||
        let v = as_json(&msg)?;
 | 
			
		||||
        let cur_raw = v
 | 
			
		||||
            .get("status")
 | 
			
		||||
            .cloned()
 | 
			
		||||
            .unwrap_or(Value::String("Dispatched".to_string()));
 | 
			
		||||
 | 
			
		||||
        let cur = match cur_raw {
 | 
			
		||||
            Value::String(s) if s == "Dispatched" => MessageStatus::Dispatched,
 | 
			
		||||
            Value::String(s) if s == "Acknowledged" => MessageStatus::Acknowledged,
 | 
			
		||||
            Value::String(s) if s == "Error" => MessageStatus::Error,
 | 
			
		||||
            Value::String(s) if s == "Processed" => MessageStatus::Processed,
 | 
			
		||||
            _ => MessageStatus::Dispatched,
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        if cur == new_status {
 | 
			
		||||
            return Ok(());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let allowed = match cur {
 | 
			
		||||
            MessageStatus::Dispatched => {
 | 
			
		||||
                matches!(
 | 
			
		||||
                    new_status,
 | 
			
		||||
                    MessageStatus::Acknowledged | MessageStatus::Error
 | 
			
		||||
                )
 | 
			
		||||
            }
 | 
			
		||||
            MessageStatus::Acknowledged => {
 | 
			
		||||
                matches!(new_status, MessageStatus::Processed | MessageStatus::Error)
 | 
			
		||||
            }
 | 
			
		||||
            MessageStatus::Processed | MessageStatus::Error => false,
 | 
			
		||||
        };
 | 
			
		||||
        if !allowed {
 | 
			
		||||
            return Err(ValidationError::new(format!(
 | 
			
		||||
                "Invalid message status transition: {:?} -> {:?}",
 | 
			
		||||
                cur, new_status
 | 
			
		||||
            ))
 | 
			
		||||
            .into());
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        self.redis
 | 
			
		||||
            .update_message_status(context_id, caller_id, id, new_status)
 | 
			
		||||
            .await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn update_flow_env_vars_merge(
 | 
			
		||||
        &self,
 | 
			
		||||
        context_id: u32,
 | 
			
		||||
        requestor_id: u32,
 | 
			
		||||
        flow_id: u32,
 | 
			
		||||
        patch: HashMap<String, String>,
 | 
			
		||||
    ) -> Result<(), BoxError> {
 | 
			
		||||
        self.require_admin(context_id, requestor_id, "update flow env_vars")
 | 
			
		||||
            .await?;
 | 
			
		||||
        // Ensure flow exists
 | 
			
		||||
        let _ = self.redis.load_flow(context_id, flow_id).await?;
 | 
			
		||||
        self.redis
 | 
			
		||||
            .update_flow_env_vars_merge(context_id, flow_id, patch)
 | 
			
		||||
            .await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn update_flow_result_merge(
 | 
			
		||||
        &self,
 | 
			
		||||
        context_id: u32,
 | 
			
		||||
        requestor_id: u32,
 | 
			
		||||
        flow_id: u32,
 | 
			
		||||
        patch: HashMap<String, String>,
 | 
			
		||||
    ) -> Result<(), BoxError> {
 | 
			
		||||
        self.require_admin(context_id, requestor_id, "update flow result")
 | 
			
		||||
            .await?;
 | 
			
		||||
        let _ = self.redis.load_flow(context_id, flow_id).await?;
 | 
			
		||||
        self.redis
 | 
			
		||||
            .update_flow_result_merge(context_id, flow_id, patch)
 | 
			
		||||
            .await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn update_flow_jobs_add_remove(
 | 
			
		||||
        &self,
 | 
			
		||||
        context_id: u32,
 | 
			
		||||
        requestor_id: u32,
 | 
			
		||||
        flow_id: u32,
 | 
			
		||||
        add: Vec<u32>,
 | 
			
		||||
        remove: Vec<u32>,
 | 
			
		||||
    ) -> Result<(), BoxError> {
 | 
			
		||||
        self.require_admin(context_id, requestor_id, "update flow jobs")
 | 
			
		||||
            .await?;
 | 
			
		||||
        let flow = self.redis.load_flow(context_id, flow_id).await?;
 | 
			
		||||
        let mut set: std::collections::BTreeSet<u32> = flow.jobs().iter().copied().collect();
 | 
			
		||||
        for a in add {
 | 
			
		||||
            set.insert(a);
 | 
			
		||||
        }
 | 
			
		||||
        for r in remove {
 | 
			
		||||
            set.remove(&r);
 | 
			
		||||
        }
 | 
			
		||||
        let new_jobs: Vec<u32> = set.into_iter().collect();
 | 
			
		||||
        self.redis
 | 
			
		||||
            .update_flow_jobs_set(context_id, flow_id, new_jobs)
 | 
			
		||||
            .await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn update_job_env_vars_merge(
 | 
			
		||||
        &self,
 | 
			
		||||
        context_id: u32,
 | 
			
		||||
        requestor_id: u32,
 | 
			
		||||
        caller_id: u32,
 | 
			
		||||
        job_id: u32,
 | 
			
		||||
        patch: HashMap<String, String>,
 | 
			
		||||
    ) -> Result<(), BoxError> {
 | 
			
		||||
        self.require_admin(context_id, requestor_id, "update job env_vars")
 | 
			
		||||
            .await?;
 | 
			
		||||
        let _ = self.redis.load_job(context_id, caller_id, job_id).await?;
 | 
			
		||||
        self.redis
 | 
			
		||||
            .update_job_env_vars_merge(context_id, caller_id, job_id, patch)
 | 
			
		||||
            .await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn update_job_result_merge(
 | 
			
		||||
        &self,
 | 
			
		||||
        context_id: u32,
 | 
			
		||||
        requestor_id: u32,
 | 
			
		||||
        caller_id: u32,
 | 
			
		||||
        job_id: u32,
 | 
			
		||||
        patch: HashMap<String, String>,
 | 
			
		||||
    ) -> Result<(), BoxError> {
 | 
			
		||||
        // Allow if admin OR executor
 | 
			
		||||
        let ctx = self.redis.load_context(context_id).await?;
 | 
			
		||||
        let is_admin = context_has_role(&ctx, "admins", requestor_id)?;
 | 
			
		||||
        let is_exec = context_has_role(&ctx, "executors", requestor_id)?;
 | 
			
		||||
        if !(is_admin || is_exec) {
 | 
			
		||||
            return Err(Box::new(PermissionDeniedError {
 | 
			
		||||
                actor_id: requestor_id,
 | 
			
		||||
                context_id,
 | 
			
		||||
                action: "update job result".to_string(),
 | 
			
		||||
            }));
 | 
			
		||||
        }
 | 
			
		||||
        let _ = self.redis.load_job(context_id, caller_id, job_id).await?;
 | 
			
		||||
        self.redis
 | 
			
		||||
            .update_job_result_merge(context_id, caller_id, job_id, patch)
 | 
			
		||||
            .await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn append_message_logs(
 | 
			
		||||
        &self,
 | 
			
		||||
        context_id: u32,
 | 
			
		||||
        caller_id: u32,
 | 
			
		||||
        id: u32,
 | 
			
		||||
        new_logs: Vec<String>,
 | 
			
		||||
    ) -> Result<(), BoxError> {
 | 
			
		||||
        let _ = self.redis.load_message(context_id, caller_id, id).await?;
 | 
			
		||||
        self.redis
 | 
			
		||||
            .append_message_logs(context_id, caller_id, id, new_logs)
 | 
			
		||||
            .await
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// -----------------------------
 | 
			
		||||
// Existence checks (strict create) and permissions
 | 
			
		||||
// -----------------------------
 | 
			
		||||
impl AppService {
 | 
			
		||||
    async fn ensure_context_not_exists(&self, id: u32) -> Result<(), BoxError> {
 | 
			
		||||
        match self.redis.load_context(id).await {
 | 
			
		||||
            Ok(_) => Err(Box::new(AlreadyExistsError {
 | 
			
		||||
                key: format!("context:{}", id),
 | 
			
		||||
            })),
 | 
			
		||||
            Err(e) => {
 | 
			
		||||
                if contains_key_not_found(&e) {
 | 
			
		||||
                    Ok(())
 | 
			
		||||
                } else {
 | 
			
		||||
                    Err(e)
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn ensure_actor_not_exists(&self, db: u32, id: u32) -> Result<(), BoxError> {
 | 
			
		||||
        match self.redis.load_actor(db, id).await {
 | 
			
		||||
            Ok(_) => Err(Box::new(AlreadyExistsError {
 | 
			
		||||
                key: format!("actor:{}", id),
 | 
			
		||||
            })),
 | 
			
		||||
            Err(e) => {
 | 
			
		||||
                if contains_key_not_found(&e) {
 | 
			
		||||
                    Ok(())
 | 
			
		||||
                } else {
 | 
			
		||||
                    Err(e)
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn ensure_runner_not_exists(&self, db: u32, id: u32) -> Result<(), BoxError> {
 | 
			
		||||
        match self.redis.load_runner(db, id).await {
 | 
			
		||||
            Ok(_) => Err(Box::new(AlreadyExistsError {
 | 
			
		||||
                key: format!("runner:{}", id),
 | 
			
		||||
            })),
 | 
			
		||||
            Err(e) => {
 | 
			
		||||
                if contains_key_not_found(&e) {
 | 
			
		||||
                    Ok(())
 | 
			
		||||
                } else {
 | 
			
		||||
                    Err(e)
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn ensure_flow_not_exists(&self, db: u32, id: u32) -> Result<(), BoxError> {
 | 
			
		||||
        match self.redis.load_flow(db, id).await {
 | 
			
		||||
            Ok(_) => Err(Box::new(AlreadyExistsError {
 | 
			
		||||
                key: format!("flow:{}", id),
 | 
			
		||||
            })),
 | 
			
		||||
            Err(e) => {
 | 
			
		||||
                if contains_key_not_found(&e) {
 | 
			
		||||
                    Ok(())
 | 
			
		||||
                } else {
 | 
			
		||||
                    Err(e)
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn ensure_job_not_exists(
 | 
			
		||||
        &self,
 | 
			
		||||
        db: u32,
 | 
			
		||||
        caller_id: u32,
 | 
			
		||||
        id: u32,
 | 
			
		||||
    ) -> Result<(), BoxError> {
 | 
			
		||||
        match self.redis.load_job(db, caller_id, id).await {
 | 
			
		||||
            Ok(_) => Err(Box::new(AlreadyExistsError {
 | 
			
		||||
                key: format!("job:{}:{}", caller_id, id),
 | 
			
		||||
            })),
 | 
			
		||||
            Err(e) => {
 | 
			
		||||
                if contains_key_not_found(&e) {
 | 
			
		||||
                    Ok(())
 | 
			
		||||
                } else {
 | 
			
		||||
                    Err(e)
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn ensure_message_not_exists(
 | 
			
		||||
        &self,
 | 
			
		||||
        db: u32,
 | 
			
		||||
        caller_id: u32,
 | 
			
		||||
        id: u32,
 | 
			
		||||
    ) -> Result<(), BoxError> {
 | 
			
		||||
        match self.redis.load_message(db, caller_id, id).await {
 | 
			
		||||
            Ok(_) => Err(Box::new(AlreadyExistsError {
 | 
			
		||||
                key: format!("message:{}:{}", caller_id, id),
 | 
			
		||||
            })),
 | 
			
		||||
            Err(e) => {
 | 
			
		||||
                if contains_key_not_found(&e) {
 | 
			
		||||
                    Ok(())
 | 
			
		||||
                } else {
 | 
			
		||||
                    Err(e)
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn require_admin(
 | 
			
		||||
        &self,
 | 
			
		||||
        context_id: u32,
 | 
			
		||||
        actor_id: u32,
 | 
			
		||||
        action: &str,
 | 
			
		||||
    ) -> Result<(), BoxError> {
 | 
			
		||||
        let ctx = self.redis.load_context(context_id).await?;
 | 
			
		||||
        let ok = context_has_role(&ctx, "admins", actor_id)?;
 | 
			
		||||
        if !ok {
 | 
			
		||||
            return Err(Box::new(PermissionDeniedError {
 | 
			
		||||
                actor_id,
 | 
			
		||||
                context_id,
 | 
			
		||||
                action: action.to_string(),
 | 
			
		||||
            }));
 | 
			
		||||
        }
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn require_executor(
 | 
			
		||||
        &self,
 | 
			
		||||
        context_id: u32,
 | 
			
		||||
        actor_id: u32,
 | 
			
		||||
        action: &str,
 | 
			
		||||
    ) -> Result<(), BoxError> {
 | 
			
		||||
        let ctx = self.redis.load_context(context_id).await?;
 | 
			
		||||
        let ok = context_has_role(&ctx, "executors", actor_id)?;
 | 
			
		||||
        if !ok {
 | 
			
		||||
            return Err(Box::new(PermissionDeniedError {
 | 
			
		||||
                actor_id,
 | 
			
		||||
                context_id,
 | 
			
		||||
                action: action.to_string(),
 | 
			
		||||
            }));
 | 
			
		||||
        }
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -6,7 +6,9 @@ use serde::de::DeserializeOwned;
 | 
			
		||||
use serde_json::{Map as JsonMap, Value};
 | 
			
		||||
use tokio::sync::Mutex;
 | 
			
		||||
 | 
			
		||||
use crate::models::{Actor, Context, Flow, Job, JobStatus, Message, Runner};
 | 
			
		||||
use crate::models::{
 | 
			
		||||
    Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageStatus, Runner,
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
 | 
			
		||||
 | 
			
		||||
@@ -303,4 +305,232 @@ impl RedisDriver {
 | 
			
		||||
        let key = Self::message_key(caller_id, id);
 | 
			
		||||
        self.hget_model(db, &key).await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
    // Partial update helpers
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
 | 
			
		||||
    /// Flow: update only status and updated_at
 | 
			
		||||
    pub async fn update_flow_status(&self, db: u32, id: u32, status: FlowStatus) -> Result<()> {
 | 
			
		||||
        let mut cm = self.manager_for_db(db).await?;
 | 
			
		||||
        let key = Self::flow_key(id);
 | 
			
		||||
 | 
			
		||||
        let status_str = match serde_json::to_value(&status)? {
 | 
			
		||||
            Value::String(s) => s,
 | 
			
		||||
            v => v.to_string(),
 | 
			
		||||
        };
 | 
			
		||||
        let ts = crate::time::current_timestamp();
 | 
			
		||||
 | 
			
		||||
        let pairs = vec![
 | 
			
		||||
            ("status".to_string(), status_str),
 | 
			
		||||
            ("updated_at".to_string(), ts.to_string()),
 | 
			
		||||
        ];
 | 
			
		||||
        let _: usize = cm.hset_multiple(key, &pairs).await?;
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Message: update only status and updated_at
 | 
			
		||||
    pub async fn update_message_status(
 | 
			
		||||
        &self,
 | 
			
		||||
        db: u32,
 | 
			
		||||
        caller_id: u32,
 | 
			
		||||
        id: u32,
 | 
			
		||||
        status: MessageStatus,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
        let mut cm = self.manager_for_db(db).await?;
 | 
			
		||||
        let key = Self::message_key(caller_id, id);
 | 
			
		||||
 | 
			
		||||
        let status_str = match serde_json::to_value(&status)? {
 | 
			
		||||
            Value::String(s) => s,
 | 
			
		||||
            v => v.to_string(),
 | 
			
		||||
        };
 | 
			
		||||
        let ts = crate::time::current_timestamp();
 | 
			
		||||
 | 
			
		||||
        let pairs = vec![
 | 
			
		||||
            ("status".to_string(), status_str),
 | 
			
		||||
            ("updated_at".to_string(), ts.to_string()),
 | 
			
		||||
        ];
 | 
			
		||||
        let _: usize = cm.hset_multiple(key, &pairs).await?;
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Flow: merge env_vars map and bump updated_at
 | 
			
		||||
    pub async fn update_flow_env_vars_merge(
 | 
			
		||||
        &self,
 | 
			
		||||
        db: u32,
 | 
			
		||||
        id: u32,
 | 
			
		||||
        patch: StdHashMap<String, String>,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
        let mut cm = self.manager_for_db(db).await?;
 | 
			
		||||
        let key = Self::flow_key(id);
 | 
			
		||||
 | 
			
		||||
        let current: Option<String> = cm.hget(&key, "env_vars").await.ok();
 | 
			
		||||
        let mut obj = match current
 | 
			
		||||
            .and_then(|s| serde_json::from_str::<Value>(&s).ok())
 | 
			
		||||
            .and_then(|v| v.as_object().cloned())
 | 
			
		||||
        {
 | 
			
		||||
            Some(m) => m,
 | 
			
		||||
            None => JsonMap::new(),
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        for (k, v) in patch {
 | 
			
		||||
            obj.insert(k, Value::String(v));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let env_vars_str = Value::Object(obj).to_string();
 | 
			
		||||
        let ts = crate::time::current_timestamp();
 | 
			
		||||
        let pairs = vec![
 | 
			
		||||
            ("env_vars".to_string(), env_vars_str),
 | 
			
		||||
            ("updated_at".to_string(), ts.to_string()),
 | 
			
		||||
        ];
 | 
			
		||||
        let _: usize = cm.hset_multiple(key, &pairs).await?;
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Flow: merge result map and bump updated_at
 | 
			
		||||
    pub async fn update_flow_result_merge(
 | 
			
		||||
        &self,
 | 
			
		||||
        db: u32,
 | 
			
		||||
        id: u32,
 | 
			
		||||
        patch: StdHashMap<String, String>,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
        let mut cm = self.manager_for_db(db).await?;
 | 
			
		||||
        let key = Self::flow_key(id);
 | 
			
		||||
 | 
			
		||||
        let current: Option<String> = cm.hget(&key, "result").await.ok();
 | 
			
		||||
        let mut obj = match current
 | 
			
		||||
            .and_then(|s| serde_json::from_str::<Value>(&s).ok())
 | 
			
		||||
            .and_then(|v| v.as_object().cloned())
 | 
			
		||||
        {
 | 
			
		||||
            Some(m) => m,
 | 
			
		||||
            None => JsonMap::new(),
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        for (k, v) in patch {
 | 
			
		||||
            obj.insert(k, Value::String(v));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let result_str = Value::Object(obj).to_string();
 | 
			
		||||
        let ts = crate::time::current_timestamp();
 | 
			
		||||
        let pairs = vec![
 | 
			
		||||
            ("result".to_string(), result_str),
 | 
			
		||||
            ("updated_at".to_string(), ts.to_string()),
 | 
			
		||||
        ];
 | 
			
		||||
        let _: usize = cm.hset_multiple(key, &pairs).await?;
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Job: merge env_vars map and bump updated_at
 | 
			
		||||
    pub async fn update_job_env_vars_merge(
 | 
			
		||||
        &self,
 | 
			
		||||
        db: u32,
 | 
			
		||||
        caller_id: u32,
 | 
			
		||||
        id: u32,
 | 
			
		||||
        patch: StdHashMap<String, String>,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
        let mut cm = self.manager_for_db(db).await?;
 | 
			
		||||
        let key = Self::job_key(caller_id, id);
 | 
			
		||||
 | 
			
		||||
        let current: Option<String> = cm.hget(&key, "env_vars").await.ok();
 | 
			
		||||
        let mut obj = match current
 | 
			
		||||
            .and_then(|s| serde_json::from_str::<Value>(&s).ok())
 | 
			
		||||
            .and_then(|v| v.as_object().cloned())
 | 
			
		||||
        {
 | 
			
		||||
            Some(m) => m,
 | 
			
		||||
            None => JsonMap::new(),
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        for (k, v) in patch {
 | 
			
		||||
            obj.insert(k, Value::String(v));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let env_vars_str = Value::Object(obj).to_string();
 | 
			
		||||
        let ts = crate::time::current_timestamp();
 | 
			
		||||
        let pairs = vec![
 | 
			
		||||
            ("env_vars".to_string(), env_vars_str),
 | 
			
		||||
            ("updated_at".to_string(), ts.to_string()),
 | 
			
		||||
        ];
 | 
			
		||||
        let _: usize = cm.hset_multiple(key, &pairs).await?;
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Job: merge result map and bump updated_at
 | 
			
		||||
    pub async fn update_job_result_merge(
 | 
			
		||||
        &self,
 | 
			
		||||
        db: u32,
 | 
			
		||||
        caller_id: u32,
 | 
			
		||||
        id: u32,
 | 
			
		||||
        patch: StdHashMap<String, String>,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
        let mut cm = self.manager_for_db(db).await?;
 | 
			
		||||
        let key = Self::job_key(caller_id, id);
 | 
			
		||||
 | 
			
		||||
        let current: Option<String> = cm.hget(&key, "result").await.ok();
 | 
			
		||||
        let mut obj = match current
 | 
			
		||||
            .and_then(|s| serde_json::from_str::<Value>(&s).ok())
 | 
			
		||||
            .and_then(|v| v.as_object().cloned())
 | 
			
		||||
        {
 | 
			
		||||
            Some(m) => m,
 | 
			
		||||
            None => JsonMap::new(),
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        for (k, v) in patch {
 | 
			
		||||
            obj.insert(k, Value::String(v));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let result_str = Value::Object(obj).to_string();
 | 
			
		||||
        let ts = crate::time::current_timestamp();
 | 
			
		||||
        let pairs = vec![
 | 
			
		||||
            ("result".to_string(), result_str),
 | 
			
		||||
            ("updated_at".to_string(), ts.to_string()),
 | 
			
		||||
        ];
 | 
			
		||||
        let _: usize = cm.hset_multiple(key, &pairs).await?;
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Flow: set jobs list and bump updated_at
 | 
			
		||||
    pub async fn update_flow_jobs_set(&self, db: u32, id: u32, new_jobs: Vec<u32>) -> Result<()> {
 | 
			
		||||
        let mut cm = self.manager_for_db(db).await?;
 | 
			
		||||
        let key = Self::flow_key(id);
 | 
			
		||||
 | 
			
		||||
        let jobs_str = serde_json::to_string(&new_jobs)?;
 | 
			
		||||
        let ts = crate::time::current_timestamp();
 | 
			
		||||
        let pairs = vec![
 | 
			
		||||
            ("jobs".to_string(), jobs_str),
 | 
			
		||||
            ("updated_at".to_string(), ts.to_string()),
 | 
			
		||||
        ];
 | 
			
		||||
        let _: usize = cm.hset_multiple(key, &pairs).await?;
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Message: append logs (no dedup) and bump updated_at
 | 
			
		||||
    pub async fn append_message_logs(
 | 
			
		||||
        &self,
 | 
			
		||||
        db: u32,
 | 
			
		||||
        caller_id: u32,
 | 
			
		||||
        id: u32,
 | 
			
		||||
        new_logs: Vec<String>,
 | 
			
		||||
    ) -> Result<()> {
 | 
			
		||||
        let mut cm = self.manager_for_db(db).await?;
 | 
			
		||||
        let key = Self::message_key(caller_id, id);
 | 
			
		||||
 | 
			
		||||
        let current: Option<String> = cm.hget(&key, "logs").await.ok();
 | 
			
		||||
        let mut arr: Vec<Value> = current
 | 
			
		||||
            .and_then(|s| serde_json::from_str::<Value>(&s).ok())
 | 
			
		||||
            .and_then(|v| v.as_array().cloned())
 | 
			
		||||
            .unwrap_or_default();
 | 
			
		||||
 | 
			
		||||
        for l in new_logs {
 | 
			
		||||
            arr.push(Value::String(l));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let logs_str = Value::Array(arr).to_string();
 | 
			
		||||
        let ts = crate::time::current_timestamp();
 | 
			
		||||
        let pairs = vec![
 | 
			
		||||
            ("logs".to_string(), logs_str),
 | 
			
		||||
            ("updated_at".to_string(), ts.to_string()),
 | 
			
		||||
        ];
 | 
			
		||||
        let _: usize = cm.hset_multiple(key, &pairs).await?;
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user