use std::{ collections::HashMap, net::{IpAddr, SocketAddr}, sync::Arc, }; use jsonrpsee::{ RpcModule, server::{ServerBuilder, ServerHandle}, types::error::ErrorObjectOwned, }; use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; use crate::{ models::{Actor, Context, Flow, Job, Message, Runner, ScriptType}, storage::RedisDriver, time::current_timestamp, }; pub struct AppState { pub redis: RedisDriver, } impl AppState { pub fn new(redis: RedisDriver) -> Self { Self { redis } } } // ----------------------------- // Error helpers // ----------------------------- fn invalid_params_err(e: E) -> ErrorObjectOwned { ErrorObjectOwned::owned(-32602, "Invalid params", Some(Value::String(e.to_string()))) } fn storage_err(e: Box) -> ErrorObjectOwned { let msg = e.to_string(); if msg.contains("Key not found") { ErrorObjectOwned::owned(-32001, "Not Found", Some(Value::String(msg))) } else { ErrorObjectOwned::owned(-32010, "Storage Error", Some(Value::String(msg))) } } // ----------------------------- // Local enums for DTOs (to keep quirks isolated) // ----------------------------- #[derive(Debug, Deserialize, Serialize, Clone, Copy)] #[serde(rename_all = "PascalCase")] pub enum MessageFormatTypeDto { Html, Text, Md, } // ----------------------------- // Create DTOs and Param wrappers // ----------------------------- #[derive(Debug, Deserialize)] pub struct ActorCreate { pub id: u32, pub pubkey: String, pub address: Vec, } impl ActorCreate { pub fn into_domain(self) -> Result { let ts = current_timestamp(); let v = json!({ "id": self.id, "pubkey": self.pubkey, "address": self.address, "created_at": ts, "updated_at": ts, }); serde_json::from_value(v).map_err(|e| e.to_string()) } } #[derive(Debug, Deserialize)] pub struct ContextCreate { pub id: u32, pub admins: Vec, pub readers: Vec, pub executors: Vec, } impl ContextCreate { pub fn into_domain(self) -> Result { let ts = current_timestamp(); // Note: keep current code quirk: "upddated_at" let mut v = serde_json::Map::new(); v.insert("id".to_string(), Value::from(self.id)); v.insert( "admins".to_string(), serde_json::to_value(self.admins).unwrap(), ); v.insert( "readers".to_string(), serde_json::to_value(self.readers).unwrap(), ); v.insert( "executors".to_string(), serde_json::to_value(self.executors).unwrap(), ); v.insert("created_at".to_string(), Value::from(ts)); v.insert("upddated_at".to_string(), Value::from(ts)); serde_json::from_value(Value::Object(v)).map_err(|e| e.to_string()) } } #[derive(Debug, Deserialize)] pub struct RunnerCreate { pub id: u32, pub pubkey: String, pub address: IpAddr, pub topic: String, pub local: bool, } impl RunnerCreate { pub fn into_domain(self) -> Result { let ts = current_timestamp(); // Note: keep current code quirk: "crated_at" let v = json!({ "id": self.id, "pubkey": self.pubkey, "address": self.address, "topic": self.topic, "local": self.local, "crated_at": ts, "updated_at": ts, }); serde_json::from_value(v).map_err(|e| e.to_string()) } } #[derive(Debug, Deserialize)] pub struct FlowCreate { pub id: u32, pub caller_id: u32, pub context_id: u32, pub jobs: Vec, pub env_vars: HashMap, #[serde(default)] pub result: Option>, } impl FlowCreate { pub fn into_domain(self) -> Result { let ts = current_timestamp(); let v = json!({ "id": self.id, "caller_id": self.caller_id, "context_id": self.context_id, "jobs": self.jobs, "env_vars": self.env_vars, "result": self.result.unwrap_or_default(), "created_at": ts, "updated_at": ts, "status": "Dispatched", }); serde_json::from_value(v).map_err(|e| e.to_string()) } } #[derive(Debug, Deserialize)] pub struct JobCreate { pub id: u32, pub caller_id: u32, pub context_id: u32, pub script: String, pub script_type: ScriptType, pub timeout: u32, pub retries: u8, pub env_vars: HashMap, #[serde(default)] pub result: Option>, pub prerequisites: Vec, pub depends: Vec, } impl JobCreate { pub fn into_domain(self) -> Result { let ts = current_timestamp(); let v = json!({ "id": self.id, "caller_id": self.caller_id, "context_id": self.context_id, "script": self.script, "script_type": self.script_type, "timeout": self.timeout, "retries": self.retries, "env_vars": self.env_vars, "result": self.result.unwrap_or_default(), "prerequisites": self.prerequisites, "depends": self.depends, "created_at": ts, "updated_at": ts, "status": "Dispatched", }); serde_json::from_value(v).map_err(|e| e.to_string()) } } #[derive(Debug, Deserialize)] pub struct MessageCreate { pub id: u32, pub caller_id: u32, pub context_id: u32, pub message: String, // Note: model uses ScriptType for message_type (keep as-is) pub message_type: ScriptType, pub message_format_type: MessageFormatTypeDto, pub timeout: u32, pub timeout_ack: u32, pub timeout_result: u32, pub job: Vec, #[serde(default)] pub logs: Option>, } impl MessageCreate { pub fn into_domain(self) -> Result { let ts = current_timestamp(); let jobs: Result, String> = self .job .into_iter() .map(|j| { let jd: Job = j.into_domain()?; serde_json::to_value(jd).map_err(|e| e.to_string()) }) .collect(); let v = json!({ "id": self.id, "caller_id": self.caller_id, "context_id": self.context_id, "message": self.message, "message_type": self.message_type, "message_format_type": self.message_format_type, // "Html" | "Text" | "Md" "timeout": self.timeout, "timeout_ack": self.timeout_ack, "timeout_result": self.timeout_result, "job": jobs?, "logs": self.logs.unwrap_or_default(), "created_at": ts, "updated_at": ts, "status": "Dispatched", }); serde_json::from_value(v).map_err(|e| e.to_string()) } } #[derive(Debug, Deserialize)] pub struct ActorCreateParams { pub context_id: u32, pub actor: ActorCreate, } #[derive(Debug, Deserialize)] pub struct ActorLoadParams { pub context_id: u32, pub id: u32, } #[derive(Debug, Deserialize)] pub struct ContextCreateParams { pub context: ContextCreate, } #[derive(Debug, Deserialize)] pub struct ContextLoadParams { pub id: u32, } #[derive(Debug, Deserialize)] pub struct RunnerCreateParams { pub context_id: u32, pub runner: RunnerCreate, } #[derive(Debug, Deserialize)] pub struct RunnerLoadParams { pub context_id: u32, pub id: u32, } #[derive(Debug, Deserialize)] pub struct FlowCreateParams { pub context_id: u32, pub flow: FlowCreate, } #[derive(Debug, Deserialize)] pub struct FlowLoadParams { pub context_id: u32, pub id: u32, } #[derive(Debug, Deserialize)] pub struct JobCreateParams { pub context_id: u32, pub job: JobCreate, } #[derive(Debug, Deserialize)] pub struct JobLoadParams { pub context_id: u32, pub caller_id: u32, pub id: u32, } #[derive(Debug, Deserialize)] pub struct MessageCreateParams { pub context_id: u32, pub message: MessageCreate, } #[derive(Debug, Deserialize)] pub struct MessageLoadParams { pub context_id: u32, pub caller_id: u32, pub id: u32, } // ----------------------------- // Rpc module builder (manual registration) // ----------------------------- pub fn build_module(state: Arc) -> RpcModule<()> { let mut module: RpcModule<()> = RpcModule::new(()); // Actor { let state = state.clone(); module .register_async_method("actor.create", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: ActorCreateParams = params.parse().map_err(invalid_params_err)?; let actor = p.actor.into_domain().map_err(invalid_params_err)?; state .redis .save_actor(p.context_id, &actor) .await .map_err(storage_err)?; { let out: serde_json::Value = serde_json::to_value(actor).map_err(invalid_params_err)?; Ok::(out) } } }) .expect("register actor.create"); } { let state = state.clone(); module .register_async_method("actor.load", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: ActorLoadParams = params.parse().map_err(invalid_params_err)?; let actor = state .redis .load_actor(p.context_id, p.id) .await .map_err(storage_err)?; { let out: serde_json::Value = serde_json::to_value(actor).map_err(invalid_params_err)?; Ok::(out) } } }) .expect("register actor.load"); } // Context { let state = state.clone(); module .register_async_method("context.create", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: ContextCreateParams = params.parse().map_err(invalid_params_err)?; let ctx = p.context.into_domain().map_err(invalid_params_err)?; state.redis.save_context(&ctx).await.map_err(storage_err)?; { let out: serde_json::Value = serde_json::to_value(ctx).map_err(invalid_params_err)?; Ok::(out) } } }) .expect("register context.create"); } { let state = state.clone(); module .register_async_method("context.load", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: ContextLoadParams = params.parse().map_err(invalid_params_err)?; let ctx = state.redis.load_context(p.id).await.map_err(storage_err)?; { let out: serde_json::Value = serde_json::to_value(ctx).map_err(invalid_params_err)?; Ok::(out) } } }) .expect("register context.load"); } // Runner { let state = state.clone(); module .register_async_method("runner.create", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: RunnerCreateParams = params.parse().map_err(invalid_params_err)?; let runner = p.runner.into_domain().map_err(invalid_params_err)?; state .redis .save_runner(p.context_id, &runner) .await .map_err(storage_err)?; { let out: serde_json::Value = serde_json::to_value(runner).map_err(invalid_params_err)?; Ok::(out) } } }) .expect("register runner.create"); } { let state = state.clone(); module .register_async_method("runner.load", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: RunnerLoadParams = params.parse().map_err(invalid_params_err)?; let runner = state .redis .load_runner(p.context_id, p.id) .await .map_err(storage_err)?; { let out: serde_json::Value = serde_json::to_value(runner).map_err(invalid_params_err)?; Ok::(out) } } }) .expect("register runner.load"); } // Flow { let state = state.clone(); module .register_async_method("flow.create", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: FlowCreateParams = params.parse().map_err(invalid_params_err)?; let flow = p.flow.into_domain().map_err(invalid_params_err)?; state .redis .save_flow(p.context_id, &flow) .await .map_err(storage_err)?; { let out: serde_json::Value = serde_json::to_value(flow).map_err(invalid_params_err)?; Ok::(out) } } }) .expect("register flow.create"); } { let state = state.clone(); module .register_async_method("flow.load", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: FlowLoadParams = params.parse().map_err(invalid_params_err)?; let flow = state .redis .load_flow(p.context_id, p.id) .await .map_err(storage_err)?; { let out: serde_json::Value = serde_json::to_value(flow).map_err(invalid_params_err)?; Ok::(out) } } }) .expect("register flow.load"); } // Job { let state = state.clone(); module .register_async_method("job.create", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: JobCreateParams = params.parse().map_err(invalid_params_err)?; let job = p.job.into_domain().map_err(invalid_params_err)?; state .redis .save_job(p.context_id, &job) .await .map_err(storage_err)?; { let out: serde_json::Value = serde_json::to_value(job).map_err(invalid_params_err)?; Ok::(out) } } }) .expect("register job.create"); } { let state = state.clone(); module .register_async_method("job.load", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: JobLoadParams = params.parse().map_err(invalid_params_err)?; let job = state .redis .load_job(p.context_id, p.caller_id, p.id) .await .map_err(storage_err)?; { let out: serde_json::Value = serde_json::to_value(job).map_err(invalid_params_err)?; Ok::(out) } } }) .expect("register job.load"); } // Message { let state = state.clone(); module .register_async_method("message.create", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: MessageCreateParams = params.parse().map_err(invalid_params_err)?; let message = p.message.into_domain().map_err(invalid_params_err)?; state .redis .save_message(p.context_id, &message) .await .map_err(storage_err)?; { let out: serde_json::Value = serde_json::to_value(message).map_err(invalid_params_err)?; Ok::(out) } } }) .expect("register message.create"); } { let state = state; module .register_async_method("message.load", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: MessageLoadParams = params.parse().map_err(invalid_params_err)?; let msg = state .redis .load_message(p.context_id, p.caller_id, p.id) .await .map_err(storage_err)?; { let out: serde_json::Value = serde_json::to_value(msg).map_err(invalid_params_err)?; Ok::(out) } } }) .expect("register message.load"); } module } // ----------------------------- // Server runners (HTTP/WS on separate listeners) // ----------------------------- pub async fn start_http( addr: SocketAddr, module: RpcModule, ) -> Result> { let server = ServerBuilder::default().build(addr).await?; let handle = server.start(module); Ok(handle) } pub async fn start_ws( addr: SocketAddr, module: RpcModule, ) -> Result> { // jsonrpsee server supports both HTTP and WS; using a second listener gives us a dedicated WS port. let server = ServerBuilder::default().build(addr).await?; let handle = server.start(module); Ok(handle) }