use std::{ collections::HashMap, net::{IpAddr, SocketAddr}, sync::Arc, }; use jsonrpsee::{ RpcModule, server::{ServerBuilder, ServerHandle}, types::error::ErrorObjectOwned, }; use serde::Deserialize; use serde_json::{Value, json}; use crate::{ dag::{DagError, FlowDag}, models::{ Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageFormatType, MessageStatus, Runner, ScriptType, }, service::AppService, time::current_timestamp, }; /// The OpenRPC specification for the HeroCoordinator JSON-RPC API const OPENRPC_SPEC: &str = include_str!("../specs/openrpc.json"); pub struct AppState { pub service: AppService, } impl AppState { pub fn new(service: AppService) -> Self { Self { service } } } // ----------------------------- // Error helpers // ----------------------------- fn invalid_params_err(e: E) -> ErrorObjectOwned { ErrorObjectOwned::owned(-32602, "Invalid params", Some(Value::String(e.to_string()))) } fn storage_err(e: Box) -> ErrorObjectOwned { let msg = e.to_string(); if msg.contains("Key not found") { ErrorObjectOwned::owned(-32001, "Not Found", Some(Value::String(msg))) } else { ErrorObjectOwned::owned(-32010, "Storage Error", Some(Value::String(msg))) } } fn dag_err(e: DagError) -> ErrorObjectOwned { match e { DagError::Storage(inner) => storage_err(inner), DagError::MissingDependency { .. } => ErrorObjectOwned::owned( -32020, "DAG Missing Dependency", Some(Value::String(e.to_string())), ), DagError::CycleDetected { .. } => ErrorObjectOwned::owned( -32021, "DAG Cycle Detected", Some(Value::String(e.to_string())), ), DagError::UnknownJob { .. } => ErrorObjectOwned::owned( -32022, "DAG Unknown Job", Some(Value::String(e.to_string())), ), DagError::DependenciesIncomplete { .. } => ErrorObjectOwned::owned( -32023, "DAG Dependencies Incomplete", Some(Value::String(e.to_string())), ), DagError::FlowFailed { .. } => ErrorObjectOwned::owned( -32024, "DAG Flow Failed", Some(Value::String(e.to_string())), ), DagError::JobNotStarted { .. } => ErrorObjectOwned::owned( -32025, "DAG Job Not Started", Some(Value::String(e.to_string())), ), } } // ----------------------------- // Create DTOs and Param wrappers // ----------------------------- #[derive(Debug, Deserialize)] pub struct ActorCreate { pub id: u32, pub pubkey: String, pub address: Vec, } impl ActorCreate { pub fn into_domain(self) -> Result { let ts = current_timestamp(); let v = json!({ "id": self.id, "pubkey": self.pubkey, "address": self.address, "created_at": ts, "updated_at": ts, }); serde_json::from_value(v).map_err(|e| e.to_string()) } } #[derive(Debug, Deserialize)] pub struct ContextCreate { pub id: u32, pub admins: Vec, pub readers: Vec, pub executors: Vec, } impl ContextCreate { pub fn into_domain(self) -> Context { let ts = current_timestamp(); let ContextCreate { id, admins, readers, executors, } = self; Context { id, admins, readers, executors, created_at: ts, updated_at: ts, } } } #[derive(Debug, Deserialize)] pub struct RunnerCreate { pub id: u32, pub pubkey: String, pub address: IpAddr, pub topic: String, /// The script type this runner executes (used for routing) pub script_type: ScriptType, pub local: bool, } impl RunnerCreate { pub fn into_domain(self) -> Runner { let ts = current_timestamp(); let RunnerCreate { id, pubkey, address, topic, script_type, local, } = self; Runner { id, pubkey, address, topic, script_type, local, created_at: ts, updated_at: ts, } } } #[derive(Debug, Deserialize)] pub struct FlowCreate { pub id: u32, pub caller_id: u32, pub context_id: u32, pub jobs: Vec, pub env_vars: HashMap, } impl FlowCreate { pub fn into_domain(self) -> Flow { let ts = current_timestamp(); let FlowCreate { id, caller_id, context_id, jobs, env_vars, } = self; Flow { id, caller_id, context_id, jobs, env_vars, result: HashMap::new(), created_at: ts, updated_at: ts, status: FlowStatus::Created, } } } #[derive(Debug, Deserialize)] pub struct JobCreate { pub id: u32, pub caller_id: u32, pub context_id: u32, pub script: String, pub script_type: ScriptType, pub timeout: u32, pub retries: u8, pub env_vars: HashMap, pub prerequisites: Vec, pub depends: Vec, } impl JobCreate { pub fn into_domain(self) -> Job { let ts = current_timestamp(); let JobCreate { id, caller_id, context_id, script, script_type, timeout, retries, env_vars, prerequisites, depends, } = self; Job { id, caller_id, context_id, script, script_type, timeout, retries, env_vars, result: HashMap::new(), prerequisites, depends, created_at: ts, updated_at: ts, status: JobStatus::WaitingForPrerequisites, } } } #[derive(Debug, Deserialize)] pub struct MessageCreate { pub id: u32, pub caller_id: u32, pub context_id: u32, pub message: String, pub message_type: ScriptType, pub message_format_type: MessageFormatType, pub timeout: u32, pub timeout_ack: u32, pub timeout_result: u32, pub job: Vec, } impl MessageCreate { pub fn into_domain(self) -> Message { let ts = current_timestamp(); let MessageCreate { id, caller_id, context_id, message, message_type, message_format_type, timeout, timeout_ack, timeout_result, job, } = self; Message { id, caller_id, context_id, message, message_type, message_format_type, timeout, timeout_ack, timeout_result, transport_id: None, transport_status: None, job: job.into_iter().map(JobCreate::into_domain).collect(), logs: Vec::new(), created_at: ts, updated_at: ts, status: MessageStatus::Dispatched, } } } #[derive(Debug, Deserialize)] pub struct ActorCreateParams { pub actor: ActorCreate, } #[derive(Debug, Deserialize)] pub struct ActorLoadParams { pub id: u32, } #[derive(Debug, Deserialize)] pub struct ContextCreateParams { pub context: ContextCreate, } #[derive(Debug, Deserialize)] pub struct ContextLoadParams { pub id: u32, } #[derive(Debug, Deserialize)] pub struct RunnerCreateParams { pub context_id: u32, pub runner: RunnerCreate, } #[derive(Debug, Deserialize)] pub struct RunnerLoadParams { pub context_id: u32, pub id: u32, } #[derive(Debug, Deserialize)] pub struct FlowCreateParams { pub context_id: u32, pub flow: FlowCreate, } #[derive(Debug, Deserialize)] pub struct FlowLoadParams { pub context_id: u32, pub id: u32, } #[derive(Debug, Deserialize)] pub struct JobCreateParams { pub context_id: u32, pub job: JobCreate, } #[derive(Debug, Deserialize)] pub struct JobLoadParams { pub context_id: u32, pub caller_id: u32, pub id: u32, } #[derive(Debug, Deserialize)] pub struct MessageCreateParams { pub context_id: u32, pub message: MessageCreate, } #[derive(Debug, Deserialize)] pub struct MessageLoadParams { pub context_id: u32, pub caller_id: u32, pub id: u32, } // ----------------------------- // Rpc module builder (manual registration) // ----------------------------- pub fn build_module(state: Arc) -> RpcModule<()> { let mut module: RpcModule<()> = RpcModule::new(()); // Actor { let state = state.clone(); module .register_async_method("actor.create", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: ActorCreateParams = params.parse().map_err(invalid_params_err)?; let actor = p.actor.into_domain().map_err(invalid_params_err)?; let actor = state .service .create_actor(actor) .await .map_err(storage_err)?; Ok::<_, ErrorObjectOwned>(actor) } }) .expect("register actor.create"); } { let state = state.clone(); module .register_async_method("actor.load", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: ActorLoadParams = params.parse().map_err(invalid_params_err)?; let actor = state.service.load_actor(p.id).await.map_err(storage_err)?; Ok::<_, ErrorObjectOwned>(actor) } }) .expect("register actor.load"); } // Context { let state = state.clone(); module .register_async_method("context.create", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: ContextCreateParams = params.parse().map_err(invalid_params_err)?; let ctx = p.context.into_domain(); let ctx = state .service .create_context(ctx) .await .map_err(storage_err)?; Ok::<_, ErrorObjectOwned>(ctx) } }) .expect("register context.create"); } { let state = state.clone(); module .register_async_method("context.load", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: ContextLoadParams = params.parse().map_err(invalid_params_err)?; let ctx = state .service .load_context(p.id) .await .map_err(storage_err)?; Ok::<_, ErrorObjectOwned>(ctx) } }) .expect("register context.load"); } // Runner { let state = state.clone(); module .register_async_method("runner.create", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: RunnerCreateParams = params.parse().map_err(invalid_params_err)?; let runner = p.runner.into_domain(); let runner = state .service .create_runner(p.context_id, runner) .await .map_err(storage_err)?; Ok::<_, ErrorObjectOwned>(runner) } }) .expect("register runner.create"); } { let state = state.clone(); module .register_async_method("runner.load", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: RunnerLoadParams = params.parse().map_err(invalid_params_err)?; let runner = state .service .load_runner(p.context_id, p.id) .await .map_err(storage_err)?; Ok::<_, ErrorObjectOwned>(runner) } }) .expect("register runner.load"); } // Flow { let state = state.clone(); module .register_async_method("flow.create", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: FlowCreateParams = params.parse().map_err(invalid_params_err)?; let flow = p.flow.into_domain(); let flow = state .service .create_flow(p.context_id, flow) .await .map_err(storage_err)?; Ok::<_, ErrorObjectOwned>(flow) } }) .expect("register flow.create"); } { let state = state.clone(); module .register_async_method("flow.load", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: FlowLoadParams = params.parse().map_err(invalid_params_err)?; let flow = state .service .load_flow(p.context_id, p.id) .await .map_err(storage_err)?; Ok::<_, ErrorObjectOwned>(flow) } }) .expect("register flow.load"); } { let state = state.clone(); module .register_async_method("flow.dag", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: FlowLoadParams = params.parse().map_err(invalid_params_err)?; let dag: FlowDag = state .service .flow_dag(p.context_id, p.id) .await .map_err(dag_err)?; Ok::<_, ErrorObjectOwned>(dag) } }) .expect("register flow.dag"); } { let state = state.clone(); module .register_async_method("flow.start", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: FlowLoadParams = params.parse().map_err(invalid_params_err)?; let started: bool = state .service .flow_start(p.context_id, p.id) .await .map_err(storage_err)?; Ok::<_, ErrorObjectOwned>(started) } }) .expect("register flow.start"); } // Job { let state = state.clone(); module .register_async_method("job.create", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: JobCreateParams = params.parse().map_err(invalid_params_err)?; let job = p.job.into_domain(); let job = state .service .create_job(p.context_id, job) .await .map_err(storage_err)?; Ok::<_, ErrorObjectOwned>(job) } }) .expect("register job.create"); } { let state = state.clone(); module .register_async_method("job.load", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: JobLoadParams = params.parse().map_err(invalid_params_err)?; let job = state .service .load_job(p.context_id, p.caller_id, p.id) .await .map_err(storage_err)?; Ok::<_, ErrorObjectOwned>(job) } }) .expect("register job.load"); } // Message { let state = state.clone(); module .register_async_method("message.create", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: MessageCreateParams = params.parse().map_err(invalid_params_err)?; let message = p.message.into_domain(); let message = state .service .create_message(p.context_id, message) .await .map_err(storage_err)?; Ok::<_, ErrorObjectOwned>(message) } }) .expect("register message.create"); } { let state = state; module .register_async_method("message.load", move |params, _caller, _ctx| { let state = state.clone(); async move { let p: MessageLoadParams = params.parse().map_err(invalid_params_err)?; let msg = state .service .load_message(p.context_id, p.caller_id, p.id) .await .map_err(storage_err)?; Ok::<_, ErrorObjectOwned>(msg) } }) .expect("register message.load"); } { module .register_async_method("rpc.discover", move |_params, _caller, _ctx| async move { let spec = serde_json::from_str::(OPENRPC_SPEC) .expect("Failed to parse OpenRPC spec"); Ok::<_, ErrorObjectOwned>(spec) }) .expect("register rpc.discover"); } module } // ----------------------------- // Server runners (HTTP/WS on separate listeners) // ----------------------------- pub async fn start_http( addr: SocketAddr, module: RpcModule, ) -> Result> { let server = ServerBuilder::default().build(addr).await?; let handle = server.start(module); Ok(handle) } pub async fn start_ws( addr: SocketAddr, module: RpcModule, ) -> Result> { // jsonrpsee server supports both HTTP and WS; using a second listener gives us a dedicated WS port. let server = ServerBuilder::default().build(addr).await?; let handle = server.start(module); Ok(handle) }