From 62c200b5bd1ebc31f06ce5677aedd8db7fcfc4b3 Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Thu, 21 Aug 2025 13:29:39 +0200 Subject: [PATCH] Add openrpc API Signed-off-by: Lee Smet --- Cargo.lock | 435 +++++++++++++ Cargo.toml | 2 + specs/openrpc.json | 1227 +++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + src/main.rs | 69 ++- src/models/actor.rs | 2 +- src/models/context.rs | 2 +- src/models/flow.rs | 4 +- src/models/runner.rs | 2 +- src/models/script_type.rs | 2 +- src/rpc.rs | 613 ++++++++++++++++++ 11 files changed, 2350 insertions(+), 9 deletions(-) create mode 100644 specs/openrpc.json create mode 100644 src/rpc.rs diff --git a/Cargo.lock b/Cargo.lock index dd9ebdf..2d0505b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -84,6 +84,12 @@ dependencies = [ "syn", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "autocfg" version = "1.5.0" @@ -105,12 +111,27 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "bitflags" version = "2.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a65b545ab31d687cff52899d4890855fec459eb6afe0da6417b8a18da87aa29" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "bytes" version = "1.10.1" @@ -183,6 +204,35 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -194,6 +244,18 @@ dependencies = [ "syn", ] +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "form_urlencoded" version = "1.2.2" @@ -292,6 +354,16 @@ dependencies = [ "slab", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "getrandom" version = "0.2.16" @@ -309,6 +381,31 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" +[[package]] +name = "h2" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3c0b69cfcb4e1b9f1bf2f53f95f766e4661169728ec61cd3fe5a0166f2d1386" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "hashbrown" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" + [[package]] name = "heck" version = "0.5.0" @@ -319,7 +416,9 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" name = "herocoordinator" version = "0.1.0" dependencies = [ + "async-trait", "clap", + "jsonrpsee", "redis", "serde", "serde_json", @@ -327,6 +426,90 @@ dependencies = [ "tracing", ] +[[package]] +name = "http" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4a85d31aea989eead29a3aaf9e1115a180df8282431156e533de47660892565" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http", +] + +[[package]] +name = "http-body-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" + +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + +[[package]] +name = "hyper" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb3aa54a13a0dfe7fbe3a59e0c76093041720fdc77b110cc0fc260fafb4dc51e" +dependencies = [ + "atomic-waker", + "bytes", + "futures-channel", + "futures-core", + "h2", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "pin-utils", + "smallvec", + "tokio", +] + +[[package]] +name = "hyper-util" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d9b05277c7e8da2c93a568989bb6207bef0112e8d17df7a6eda4a3cf143bc5e" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "hyper", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "icu_collections" version = "2.0.0" @@ -434,6 +617,16 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "indexmap" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661" +dependencies = [ + "equivalent", + "hashbrown", +] + [[package]] name = "io-uring" version = "0.7.9" @@ -457,6 +650,95 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "jsonrpsee" +version = "0.24.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b26c20e2178756451cfeb0661fb74c47dd5988cb7e3939de7e9241fd604d42" +dependencies = [ + "jsonrpsee-core", + "jsonrpsee-proc-macros", + "jsonrpsee-server", + "jsonrpsee-types", + "tokio", + "tracing", +] + +[[package]] +name = "jsonrpsee-core" +version = "0.24.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "456196007ca3a14db478346f58c7238028d55ee15c1df15115596e411ff27925" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "jsonrpsee-types", + "parking_lot", + "rand", + "rustc-hash", + "serde", + "serde_json", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "jsonrpsee-proc-macros" +version = "0.24.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e65763c942dfc9358146571911b0cd1c361c2d63e2d2305622d40d36376ca80" +dependencies = [ + "heck", + "proc-macro-crate", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "jsonrpsee-server" +version = "0.24.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55e363146da18e50ad2b51a0a7925fc423137a0b1371af8235b1c231a0647328" +dependencies = [ + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "jsonrpsee-core", + "jsonrpsee-types", + "pin-project", + "route-recognizer", + "serde", + "serde_json", + "soketto", + "thiserror", + "tokio", + "tokio-stream", + "tokio-util", + "tower", + "tracing", +] + +[[package]] +name = "jsonrpsee-types" +version = "0.24.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08a8e70baf945b6b5752fc8eb38c918a48f1234daf11355e07106d963f860089" +dependencies = [ + "http", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "libc" version = "0.2.175" @@ -479,6 +761,12 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "log" +version = "0.4.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" + [[package]] name = "memchr" version = "2.7.5" @@ -605,6 +893,15 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "proc-macro-crate" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edce586971a4dfaa28950c6f18ed55e0406c1ab88bbce2c6f6293a7aaba73d35" +dependencies = [ + "toml_edit", +] + [[package]] name = "proc-macro2" version = "1.0.101" @@ -686,12 +983,24 @@ dependencies = [ "bitflags", ] +[[package]] +name = "route-recognizer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afab94fb28594581f62d981211a9a4d53cc8130bbcbbb89a0440d9b8e81a7746" + [[package]] name = "rustc-demangle" version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" +[[package]] +name = "rustc-hash" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" + [[package]] name = "ryu" version = "1.0.20" @@ -736,6 +1045,17 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha1_smol" version = "1.0.1" @@ -783,6 +1103,22 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "soketto" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e859df029d160cb88608f5d7df7fb4753fd20fdfb4de5644f3d8b8440841721" +dependencies = [ + "base64", + "bytes", + "futures", + "http", + "httparse", + "log", + "rand", + "sha1", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -817,6 +1153,26 @@ dependencies = [ "syn", ] +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tinystr" version = "0.8.1" @@ -869,6 +1225,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "tokio-util" version = "0.7.16" @@ -877,17 +1245,63 @@ checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", ] +[[package]] +name = "toml_datetime" +version = "0.6.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22cddaf88f4fbc13c51aebbf5f8eceb5c7c5a9da2ac40a13519eb5b0a0e8f11c" + +[[package]] +name = "toml_edit" +version = "0.22.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" +dependencies = [ + "indexmap", + "toml_datetime", + "winnow", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + +[[package]] +name = "tower-service" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" + [[package]] name = "tracing" version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -913,6 +1327,12 @@ dependencies = [ "once_cell", ] +[[package]] +name = "typenum" +version = "1.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f" + [[package]] name = "unicode-ident" version = "1.0.18" @@ -942,6 +1362,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" @@ -1110,6 +1536,15 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "271414315aff87387382ec3d271b52d7ae78726f5d44ac98b4f4030c91880486" +[[package]] +name = "winnow" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3edebf492c8125044983378ecb5766203ad3b4c2f7a922bd7dd207f6d443e95" +dependencies = [ + "memchr", +] + [[package]] name = "writeable" version = "0.6.1" diff --git a/Cargo.toml b/Cargo.toml index 0781739..52f074d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,3 +10,5 @@ serde_json = "1.0.143" tokio = { version = "1.47.1", features = ["full"] } tracing = "0.1.41" redis = { version = "0.25.4", features = ["tokio-comp", "connection-manager", "aio"] } +jsonrpsee = { version = "0.24.7", features = ["server", "macros"] } +async-trait = "0.1.83" diff --git a/specs/openrpc.json b/specs/openrpc.json new file mode 100644 index 0000000..367dffd --- /dev/null +++ b/specs/openrpc.json @@ -0,0 +1,1227 @@ +{ + "openrpc": "1.2.6", + "info": { + "title": "HeroCoordinator JSON-RPC API", + "version": "0.1.0", + "description": "JSON-RPC API over HTTP and WebSocket for creating and loading domain models. Timestamps created_at/updated_at are set by the system. Known model quirks are kept as-is and documented here: Context uses 'upddated_at' (typo) instead of 'updated_at'; Runner uses 'crated_at' instead of 'created_at'; Message uses ScriptType as 'message_type'. Host and ports are configurable via CLI flags." + }, + "servers": [ + { + "name": "http", + "url": "http://127.0.0.1:9652", + "summary": "Default HTTP server (configurable via --api-http-ip/--api-http-port)" + }, + { + "name": "ws", + "url": "ws://127.0.0.1:9653", + "summary": "Default WS server (configurable via --api-ws-ip/--api-ws-port)" + } + ], + "methods": [ + { + "name": "actor.create", + "summary": "Create/Upsert Actor in a context", + "params": [ + { + "name": "params", + "schema": { + "$ref": "#/components/schemas/ActorCreateParams" + } + } + ], + "result": { + "name": "result", + "schema": { + "$ref": "#/components/schemas/Actor" + } + }, + "errors": [ + { + "$ref": "#/components/errors/InvalidParams" + }, + { + "$ref": "#/components/errors/NotFound" + }, + { + "$ref": "#/components/errors/StorageError" + } + ] + }, + { + "name": "actor.load", + "summary": "Load an Actor by id from a context", + "params": [ + { + "name": "params", + "schema": { + "$ref": "#/components/schemas/ActorLoadParams" + } + } + ], + "result": { + "name": "result", + "schema": { + "$ref": "#/components/schemas/Actor" + } + }, + "errors": [ + { + "$ref": "#/components/errors/InvalidParams" + }, + { + "$ref": "#/components/errors/NotFound" + }, + { + "$ref": "#/components/errors/StorageError" + } + ] + }, + { + "name": "context.create", + "summary": "Create/Upsert Context (stored in its own DB index)", + "params": [ + { + "name": "params", + "schema": { + "$ref": "#/components/schemas/ContextCreateParams" + } + } + ], + "result": { + "name": "result", + "schema": { + "$ref": "#/components/schemas/Context" + } + }, + "errors": [ + { + "$ref": "#/components/errors/InvalidParams" + }, + { + "$ref": "#/components/errors/StorageError" + } + ] + }, + { + "name": "context.load", + "summary": "Load a Context by id", + "params": [ + { + "name": "params", + "schema": { + "$ref": "#/components/schemas/ContextLoadParams" + } + } + ], + "result": { + "name": "result", + "schema": { + "$ref": "#/components/schemas/Context" + } + }, + "errors": [ + { + "$ref": "#/components/errors/InvalidParams" + }, + { + "$ref": "#/components/errors/NotFound" + }, + { + "$ref": "#/components/errors/StorageError" + } + ] + }, + { + "name": "runner.create", + "summary": "Create/Upsert Runner in a context", + "params": [ + { + "name": "params", + "schema": { + "$ref": "#/components/schemas/RunnerCreateParams" + } + } + ], + "result": { + "name": "result", + "schema": { + "$ref": "#/components/schemas/Runner" + } + }, + "errors": [ + { + "$ref": "#/components/errors/InvalidParams" + }, + { + "$ref": "#/components/errors/StorageError" + } + ] + }, + { + "name": "runner.load", + "summary": "Load Runner by id from a context", + "params": [ + { + "name": "params", + "schema": { + "$ref": "#/components/schemas/RunnerLoadParams" + } + } + ], + "result": { + "name": "result", + "schema": { + "$ref": "#/components/schemas/Runner" + } + }, + "errors": [ + { + "$ref": "#/components/errors/InvalidParams" + }, + { + "$ref": "#/components/errors/NotFound" + }, + { + "$ref": "#/components/errors/StorageError" + } + ] + }, + { + "name": "flow.create", + "summary": "Create/Upsert Flow in a context", + "params": [ + { + "name": "params", + "schema": { + "$ref": "#/components/schemas/FlowCreateParams" + } + } + ], + "result": { + "name": "result", + "schema": { + "$ref": "#/components/schemas/Flow" + } + }, + "errors": [ + { + "$ref": "#/components/errors/InvalidParams" + }, + { + "$ref": "#/components/errors/StorageError" + } + ] + }, + { + "name": "flow.load", + "summary": "Load Flow by id from a context", + "params": [ + { + "name": "params", + "schema": { + "$ref": "#/components/schemas/FlowLoadParams" + } + } + ], + "result": { + "name": "result", + "schema": { + "$ref": "#/components/schemas/Flow" + } + }, + "errors": [ + { + "$ref": "#/components/errors/InvalidParams" + }, + { + "$ref": "#/components/errors/NotFound" + }, + { + "$ref": "#/components/errors/StorageError" + } + ] + }, + { + "name": "job.create", + "summary": "Create/Upsert Job in a context", + "params": [ + { + "name": "params", + "schema": { + "$ref": "#/components/schemas/JobCreateParams" + } + } + ], + "result": { + "name": "result", + "schema": { + "$ref": "#/components/schemas/Job" + } + }, + "errors": [ + { + "$ref": "#/components/errors/InvalidParams" + }, + { + "$ref": "#/components/errors/StorageError" + } + ] + }, + { + "name": "job.load", + "summary": "Load Job by ids from a context", + "params": [ + { + "name": "params", + "schema": { + "$ref": "#/components/schemas/JobLoadParams" + } + } + ], + "result": { + "name": "result", + "schema": { + "$ref": "#/components/schemas/Job" + } + }, + "errors": [ + { + "$ref": "#/components/errors/InvalidParams" + }, + { + "$ref": "#/components/errors/NotFound" + }, + { + "$ref": "#/components/errors/StorageError" + } + ] + }, + { + "name": "message.create", + "summary": "Create/Upsert Message in a context", + "params": [ + { + "name": "params", + "schema": { + "$ref": "#/components/schemas/MessageCreateParams" + } + } + ], + "result": { + "name": "result", + "schema": { + "$ref": "#/components/schemas/Message" + } + }, + "errors": [ + { + "$ref": "#/components/errors/InvalidParams" + }, + { + "$ref": "#/components/errors/StorageError" + } + ] + }, + { + "name": "message.load", + "summary": "Load Message by ids from a context", + "params": [ + { + "name": "params", + "schema": { + "$ref": "#/components/schemas/MessageLoadParams" + } + } + ], + "result": { + "name": "result", + "schema": { + "$ref": "#/components/schemas/Message" + } + }, + "errors": [ + { + "$ref": "#/components/errors/InvalidParams" + }, + { + "$ref": "#/components/errors/NotFound" + }, + { + "$ref": "#/components/errors/StorageError" + } + ] + } + ], + "components": { + "schemas": { + "IpAddr": { + "type": "string", + "description": "IPv4 or IPv6 textual address" + }, + "ScriptType": { + "type": "string", + "enum": [ + "Osis", + "Sal", + "V", + "Python" + ] + }, + "FlowStatus": { + "type": "string", + "enum": [ + "Dispatched", + "Started", + "Error", + "Finished" + ] + }, + "JobStatus": { + "type": "string", + "enum": [ + "Dispatched", + "WaitingForPrerequisites", + "Started", + "Error", + "Finished" + ] + }, + "MessageFormatType": { + "type": "string", + "enum": [ + "Html", + "Text", + "Md" + ] + }, + "MessageStatus": { + "type": "string", + "enum": [ + "Dispatched", + "Acknowledged", + "Error", + "Processed" + ] + }, + "MessageType": { + "type": "string", + "enum": [ + "Job", + "Chat", + "Mail" + ] + }, + "Actor": { + "type": "object", + "required": [ + "id", + "pubkey", + "address", + "created_at", + "updated_at" + ], + "properties": { + "id": { + "type": "integer", + "format": "uint32" + }, + "pubkey": { + "type": "string" + }, + "address": { + "type": "array", + "items": { + "$ref": "#/components/schemas/IpAddr" + } + }, + "created_at": { + "type": "integer", + "format": "int64" + }, + "updated_at": { + "type": "integer", + "format": "int64" + } + } + }, + "Context": { + "type": "object", + "required": [ + "id", + "admins", + "readers", + "executors", + "created_at", + "upddated_at" + ], + "properties": { + "id": { + "type": "integer", + "format": "uint32" + }, + "admins": { + "type": "array", + "items": { + "type": "integer", + "format": "uint32" + } + }, + "readers": { + "type": "array", + "items": { + "type": "integer", + "format": "uint32" + } + }, + "executors": { + "type": "array", + "items": { + "type": "integer", + "format": "uint32" + } + }, + "created_at": { + "type": "integer", + "format": "int64" + }, + "upddated_at": { + "type": "integer", + "format": "int64", + "description": "Typo retained intentionally" + } + } + }, + "Runner": { + "type": "object", + "required": [ + "id", + "pubkey", + "address", + "topic", + "local", + "crated_at", + "updated_at" + ], + "properties": { + "id": { + "type": "integer", + "format": "uint32" + }, + "pubkey": { + "type": "string" + }, + "address": { + "$ref": "#/components/schemas/IpAddr" + }, + "topic": { + "type": "string" + }, + "local": { + "type": "boolean" + }, + "crated_at": { + "type": "integer", + "format": "int64", + "description": "Typo retained intentionally" + }, + "updated_at": { + "type": "integer", + "format": "int64" + } + } + }, + "Flow": { + "type": "object", + "required": [ + "id", + "caller_id", + "context_id", + "jobs", + "env_vars", + "result", + "created_at", + "updated_at", + "status" + ], + "properties": { + "id": { + "type": "integer", + "format": "uint32" + }, + "caller_id": { + "type": "integer", + "format": "uint32" + }, + "context_id": { + "type": "integer", + "format": "uint32" + }, + "jobs": { + "type": "array", + "items": { + "type": "integer", + "format": "uint32" + } + }, + "env_vars": { + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "result": { + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "created_at": { + "type": "integer", + "format": "int64" + }, + "updated_at": { + "type": "integer", + "format": "int64" + }, + "status": { + "$ref": "#/components/schemas/FlowStatus" + } + } + }, + "Job": { + "type": "object", + "required": [ + "id", + "caller_id", + "context_id", + "script", + "script_type", + "timeout", + "retries", + "env_vars", + "result", + "prerequisites", + "depends", + "created_at", + "updated_at", + "status" + ], + "properties": { + "id": { + "type": "integer", + "format": "uint32" + }, + "caller_id": { + "type": "integer", + "format": "uint32" + }, + "context_id": { + "type": "integer", + "format": "uint32" + }, + "script": { + "type": "string" + }, + "script_type": { + "$ref": "#/components/schemas/ScriptType" + }, + "timeout": { + "type": "integer", + "format": "uint32" + }, + "retries": { + "type": "integer", + "minimum": 0, + "maximum": 255 + }, + "env_vars": { + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "result": { + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "prerequisites": { + "type": "array", + "items": { + "type": "string" + } + }, + "depends": { + "type": "array", + "items": { + "type": "integer", + "format": "uint32" + } + }, + "created_at": { + "type": "integer", + "format": "int64" + }, + "updated_at": { + "type": "integer", + "format": "int64" + }, + "status": { + "$ref": "#/components/schemas/JobStatus" + } + } + }, + "Message": { + "type": "object", + "required": [ + "id", + "caller_id", + "context_id", + "message", + "message_type", + "message_format_type", + "timeout", + "timeout_ack", + "timeout_result", + "job", + "logs", + "created_at", + "updated_at", + "status" + ], + "properties": { + "id": { + "type": "integer", + "format": "uint32" + }, + "caller_id": { + "type": "integer", + "format": "uint32" + }, + "context_id": { + "type": "integer", + "format": "uint32" + }, + "message": { + "type": "string" + }, + "message_type": { + "$ref": "#/components/schemas/ScriptType" + }, + "message_format_type": { + "$ref": "#/components/schemas/MessageFormatType" + }, + "timeout": { + "type": "integer", + "format": "uint32" + }, + "timeout_ack": { + "type": "integer", + "format": "uint32" + }, + "timeout_result": { + "type": "integer", + "format": "uint32" + }, + "job": { + "type": "array", + "items": { + "$ref": "#/components/schemas/Job" + } + }, + "logs": { + "type": "array", + "items": { + "type": "string" + } + }, + "created_at": { + "type": "integer", + "format": "int64" + }, + "updated_at": { + "type": "integer", + "format": "int64" + }, + "status": { + "$ref": "#/components/schemas/MessageStatus" + } + } + }, + "ActorCreate": { + "type": "object", + "required": [ + "id", + "pubkey", + "address" + ], + "properties": { + "id": { + "type": "integer", + "format": "uint32" + }, + "pubkey": { + "type": "string" + }, + "address": { + "type": "array", + "items": { + "$ref": "#/components/schemas/IpAddr" + } + } + } + }, + "ContextCreate": { + "type": "object", + "required": [ + "id", + "admins", + "readers", + "executors" + ], + "properties": { + "id": { + "type": "integer", + "format": "uint32" + }, + "admins": { + "type": "array", + "items": { + "type": "integer", + "format": "uint32" + } + }, + "readers": { + "type": "array", + "items": { + "type": "integer", + "format": "uint32" + } + }, + "executors": { + "type": "array", + "items": { + "type": "integer", + "format": "uint32" + } + } + } + }, + "RunnerCreate": { + "type": "object", + "required": [ + "id", + "pubkey", + "address", + "topic", + "local" + ], + "properties": { + "id": { + "type": "integer", + "format": "uint32" + }, + "pubkey": { + "type": "string" + }, + "address": { + "$ref": "#/components/schemas/IpAddr" + }, + "topic": { + "type": "string" + }, + "local": { + "type": "boolean" + } + } + }, + "FlowCreate": { + "type": "object", + "required": [ + "id", + "caller_id", + "context_id", + "jobs", + "env_vars" + ], + "properties": { + "id": { + "type": "integer", + "format": "uint32" + }, + "caller_id": { + "type": "integer", + "format": "uint32" + }, + "context_id": { + "type": "integer", + "format": "uint32" + }, + "jobs": { + "type": "array", + "items": { + "type": "integer", + "format": "uint32" + } + }, + "env_vars": { + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "result": { + "type": "object", + "additionalProperties": { + "type": "string" + } + } + } + }, + "JobCreate": { + "type": "object", + "required": [ + "id", + "caller_id", + "context_id", + "script", + "script_type", + "timeout", + "retries", + "env_vars", + "prerequisites", + "depends" + ], + "properties": { + "id": { + "type": "integer", + "format": "uint32" + }, + "caller_id": { + "type": "integer", + "format": "uint32" + }, + "context_id": { + "type": "integer", + "format": "uint32" + }, + "script": { + "type": "string" + }, + "script_type": { + "$ref": "#/components/schemas/ScriptType" + }, + "timeout": { + "type": "integer", + "format": "uint32" + }, + "retries": { + "type": "integer", + "minimum": 0, + "maximum": 255 + }, + "env_vars": { + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "result": { + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "prerequisites": { + "type": "array", + "items": { + "type": "string" + } + }, + "depends": { + "type": "array", + "items": { + "type": "integer", + "format": "uint32" + } + } + } + }, + "MessageCreate": { + "type": "object", + "required": [ + "id", + "caller_id", + "context_id", + "message", + "message_type", + "message_format_type", + "timeout", + "timeout_ack", + "timeout_result", + "job" + ], + "properties": { + "id": { + "type": "integer", + "format": "uint32" + }, + "caller_id": { + "type": "integer", + "format": "uint32" + }, + "context_id": { + "type": "integer", + "format": "uint32" + }, + "message": { + "type": "string" + }, + "message_type": { + "$ref": "#/components/schemas/ScriptType" + }, + "message_format_type": { + "$ref": "#/components/schemas/MessageFormatType" + }, + "timeout": { + "type": "integer", + "format": "uint32" + }, + "timeout_ack": { + "type": "integer", + "format": "uint32" + }, + "timeout_result": { + "type": "integer", + "format": "uint32" + }, + "job": { + "type": "array", + "items": { + "$ref": "#/components/schemas/JobCreate" + } + }, + "logs": { + "type": "array", + "items": { + "type": "string" + } + } + } + }, + "ActorCreateParams": { + "type": "object", + "required": [ + "context_id", + "actor" + ], + "properties": { + "context_id": { + "type": "integer", + "format": "uint32" + }, + "actor": { + "$ref": "#/components/schemas/ActorCreate" + } + } + }, + "ActorLoadParams": { + "type": "object", + "required": [ + "context_id", + "id" + ], + "properties": { + "context_id": { + "type": "integer", + "format": "uint32" + }, + "id": { + "type": "integer", + "format": "uint32" + } + } + }, + "ContextCreateParams": { + "type": "object", + "required": [ + "context" + ], + "properties": { + "context": { + "$ref": "#/components/schemas/ContextCreate" + } + } + }, + "ContextLoadParams": { + "type": "object", + "required": [ + "id" + ], + "properties": { + "id": { + "type": "integer", + "format": "uint32" + } + } + }, + "RunnerCreateParams": { + "type": "object", + "required": [ + "context_id", + "runner" + ], + "properties": { + "context_id": { + "type": "integer", + "format": "uint32" + }, + "runner": { + "$ref": "#/components/schemas/RunnerCreate" + } + } + }, + "RunnerLoadParams": { + "type": "object", + "required": [ + "context_id", + "id" + ], + "properties": { + "context_id": { + "type": "integer", + "format": "uint32" + }, + "id": { + "type": "integer", + "format": "uint32" + } + } + }, + "FlowCreateParams": { + "type": "object", + "required": [ + "context_id", + "flow" + ], + "properties": { + "context_id": { + "type": "integer", + "format": "uint32" + }, + "flow": { + "$ref": "#/components/schemas/FlowCreate" + } + } + }, + "FlowLoadParams": { + "type": "object", + "required": [ + "context_id", + "id" + ], + "properties": { + "context_id": { + "type": "integer", + "format": "uint32" + }, + "id": { + "type": "integer", + "format": "uint32" + } + } + }, + "JobCreateParams": { + "type": "object", + "required": [ + "context_id", + "job" + ], + "properties": { + "context_id": { + "type": "integer", + "format": "uint32" + }, + "job": { + "$ref": "#/components/schemas/JobCreate" + } + } + }, + "JobLoadParams": { + "type": "object", + "required": [ + "context_id", + "caller_id", + "id" + ], + "properties": { + "context_id": { + "type": "integer", + "format": "uint32" + }, + "caller_id": { + "type": "integer", + "format": "uint32" + }, + "id": { + "type": "integer", + "format": "uint32" + } + } + }, + "MessageCreateParams": { + "type": "object", + "required": [ + "context_id", + "message" + ], + "properties": { + "context_id": { + "type": "integer", + "format": "uint32" + }, + "message": { + "$ref": "#/components/schemas/MessageCreate" + } + } + }, + "MessageLoadParams": { + "type": "object", + "required": [ + "context_id", + "caller_id", + "id" + ], + "properties": { + "context_id": { + "type": "integer", + "format": "uint32" + }, + "caller_id": { + "type": "integer", + "format": "uint32" + }, + "id": { + "type": "integer", + "format": "uint32" + } + } + } + }, + "errors": { + "InvalidParams": { + "code": -32602, + "message": "Invalid params" + }, + "NotFound": { + "code": -32001, + "message": "Not Found" + }, + "StorageError": { + "code": -32010, + "message": "Storage Error" + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 875e6a7..f9fb8a0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ pub mod models; pub mod storage; mod time; +pub mod rpc; diff --git a/src/main.rs b/src/main.rs index 2f6ad84..e6ec894 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ use clap::Parser; use std::net::{IpAddr, SocketAddr}; +use std::sync::Arc; #[derive(Debug, Clone, Parser)] #[command( @@ -35,13 +36,75 @@ struct Cli { help = "Socket address of Redis instance (default: 127.0.0.1:6379)" )] redis_addr: SocketAddr, + + #[arg( + long = "api-http-ip", + env = "API_HTTP_IP", + default_value = "127.0.0.1", + help = "Bind IP for HTTP JSON-RPC server (default: 127.0.0.1)" + )] + api_http_ip: IpAddr, + + #[arg( + long = "api-http-port", + env = "API_HTTP_PORT", + default_value_t = 9652u16, + help = "Bind port for HTTP JSON-RPC server (default: 9652)" + )] + api_http_port: u16, + + #[arg( + long = "api-ws-ip", + env = "API_WS_IP", + default_value = "127.0.0.1", + help = "Bind IP for WebSocket JSON-RPC server (default: 127.0.0.1)" + )] + api_ws_ip: IpAddr, + + #[arg( + long = "api-ws-port", + env = "API_WS_PORT", + default_value_t = 9653u16, + help = "Bind port for WebSocket JSON-RPC server (default: 9653)" + )] + api_ws_port: u16, } -fn main() { +#[tokio::main] +async fn main() { let cli = Cli::parse(); + let http_addr = SocketAddr::new(cli.api_http_ip, cli.api_http_port); + let ws_addr = SocketAddr::new(cli.api_ws_ip, cli.api_ws_port); + + // Initialize Redis driver + let redis = herocoordinator::storage::RedisDriver::new(cli.redis_addr.to_string()) + .await + .expect("Failed to connect to Redis"); + + // Shared application state + let state = Arc::new(herocoordinator::rpc::AppState::new(redis)); + + // Build RPC modules for both servers + let http_module = herocoordinator::rpc::build_module(state.clone()); + let ws_module = herocoordinator::rpc::build_module(state.clone()); + println!( - "mycelium_ip={}, mycelium_port={}, redis_addr={}", - cli.mycelium_ip, cli.mycelium_port, cli.redis_addr + "Starting JSON-RPC servers: HTTP http://{} | WS ws://{} | redis_addr={}", + http_addr, ws_addr, cli.redis_addr ); + + // Start servers + let _http_handle = herocoordinator::rpc::start_http(http_addr, http_module) + .await + .expect("Failed to start HTTP server"); + let _ws_handle = herocoordinator::rpc::start_ws(ws_addr, ws_module) + .await + .expect("Failed to start WS server"); + + // Wait for Ctrl+C to terminate + if let Err(e) = tokio::signal::ctrl_c().await { + eprintln!("Failed to listen for shutdown signal: {e}"); + } + println!("Shutdown signal received, exiting."); } diff --git a/src/models/actor.rs b/src/models/actor.rs index 6d5ab09..9237ee2 100644 --- a/src/models/actor.rs +++ b/src/models/actor.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use crate::time::Timestamp; -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub struct Actor { id: u32, pubkey: String, diff --git a/src/models/context.rs b/src/models/context.rs index ae1b236..9de663c 100644 --- a/src/models/context.rs +++ b/src/models/context.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use crate::time::Timestamp; -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub struct Context { /// Redis DB to use id: u32, diff --git a/src/models/flow.rs b/src/models/flow.rs index b5ad9f2..542a8c5 100644 --- a/src/models/flow.rs +++ b/src/models/flow.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use crate::time::Timestamp; -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub struct Flow { /// Job Id set tby the actor which created it id: u32, @@ -24,7 +24,7 @@ pub struct Flow { } /// The status of a flow -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub enum FlowStatus { Dispatched, Started, diff --git a/src/models/runner.rs b/src/models/runner.rs index 47a069c..8fc0375 100644 --- a/src/models/runner.rs +++ b/src/models/runner.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use crate::time::Timestamp; -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub struct Runner { id: u32, /// Mycelium public key diff --git a/src/models/script_type.rs b/src/models/script_type.rs index 774d3db..5d01754 100644 --- a/src/models/script_type.rs +++ b/src/models/script_type.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -#[derive(Deserialize, Serialize)] +#[derive(Debug, Deserialize, Serialize)] pub enum ScriptType { Osis, Sal, diff --git a/src/rpc.rs b/src/rpc.rs new file mode 100644 index 0000000..21e8a8f --- /dev/null +++ b/src/rpc.rs @@ -0,0 +1,613 @@ +use std::{ + collections::HashMap, + net::{IpAddr, SocketAddr}, + sync::Arc, +}; + +use jsonrpsee::{ + RpcModule, + server::{ServerBuilder, ServerHandle}, + types::error::ErrorObjectOwned, +}; +use serde::{Deserialize, Serialize}; +use serde_json::{Value, json}; + +use crate::{ + models::{Actor, Context, Flow, Job, Message, Runner, ScriptType}, + storage::RedisDriver, + time::current_timestamp, +}; + +pub struct AppState { + pub redis: RedisDriver, +} + +impl AppState { + pub fn new(redis: RedisDriver) -> Self { + Self { redis } + } +} + +// ----------------------------- +// Error helpers +// ----------------------------- + +fn invalid_params_err(e: E) -> ErrorObjectOwned { + ErrorObjectOwned::owned(-32602, "Invalid params", Some(Value::String(e.to_string()))) +} + +fn storage_err(e: Box) -> ErrorObjectOwned { + let msg = e.to_string(); + if msg.contains("Key not found") { + ErrorObjectOwned::owned(-32001, "Not Found", Some(Value::String(msg))) + } else { + ErrorObjectOwned::owned(-32010, "Storage Error", Some(Value::String(msg))) + } +} + +// ----------------------------- +// Local enums for DTOs (to keep quirks isolated) +// ----------------------------- + +#[derive(Debug, Deserialize, Serialize, Clone, Copy)] +#[serde(rename_all = "PascalCase")] +pub enum MessageFormatTypeDto { + Html, + Text, + Md, +} + +// ----------------------------- +// Create DTOs and Param wrappers +// ----------------------------- + +#[derive(Debug, Deserialize)] +pub struct ActorCreate { + pub id: u32, + pub pubkey: String, + pub address: Vec, +} +impl ActorCreate { + pub fn into_domain(self) -> Result { + let ts = current_timestamp(); + let v = json!({ + "id": self.id, + "pubkey": self.pubkey, + "address": self.address, + "created_at": ts, + "updated_at": ts, + }); + serde_json::from_value(v).map_err(|e| e.to_string()) + } +} + +#[derive(Debug, Deserialize)] +pub struct ContextCreate { + pub id: u32, + pub admins: Vec, + pub readers: Vec, + pub executors: Vec, +} +impl ContextCreate { + pub fn into_domain(self) -> Result { + let ts = current_timestamp(); + // Note: keep current code quirk: "upddated_at" + let mut v = serde_json::Map::new(); + v.insert("id".to_string(), Value::from(self.id)); + v.insert( + "admins".to_string(), + serde_json::to_value(self.admins).unwrap(), + ); + v.insert( + "readers".to_string(), + serde_json::to_value(self.readers).unwrap(), + ); + v.insert( + "executors".to_string(), + serde_json::to_value(self.executors).unwrap(), + ); + v.insert("created_at".to_string(), Value::from(ts)); + v.insert("upddated_at".to_string(), Value::from(ts)); + serde_json::from_value(Value::Object(v)).map_err(|e| e.to_string()) + } +} + +#[derive(Debug, Deserialize)] +pub struct RunnerCreate { + pub id: u32, + pub pubkey: String, + pub address: IpAddr, + pub topic: String, + pub local: bool, +} +impl RunnerCreate { + pub fn into_domain(self) -> Result { + let ts = current_timestamp(); + // Note: keep current code quirk: "crated_at" + let v = json!({ + "id": self.id, + "pubkey": self.pubkey, + "address": self.address, + "topic": self.topic, + "local": self.local, + "crated_at": ts, + "updated_at": ts, + }); + serde_json::from_value(v).map_err(|e| e.to_string()) + } +} + +#[derive(Debug, Deserialize)] +pub struct FlowCreate { + pub id: u32, + pub caller_id: u32, + pub context_id: u32, + pub jobs: Vec, + pub env_vars: HashMap, + #[serde(default)] + pub result: Option>, +} +impl FlowCreate { + pub fn into_domain(self) -> Result { + let ts = current_timestamp(); + let v = json!({ + "id": self.id, + "caller_id": self.caller_id, + "context_id": self.context_id, + "jobs": self.jobs, + "env_vars": self.env_vars, + "result": self.result.unwrap_or_default(), + "created_at": ts, + "updated_at": ts, + "status": "Dispatched", + }); + serde_json::from_value(v).map_err(|e| e.to_string()) + } +} + +#[derive(Debug, Deserialize)] +pub struct JobCreate { + pub id: u32, + pub caller_id: u32, + pub context_id: u32, + pub script: String, + pub script_type: ScriptType, + pub timeout: u32, + pub retries: u8, + pub env_vars: HashMap, + #[serde(default)] + pub result: Option>, + pub prerequisites: Vec, + pub depends: Vec, +} +impl JobCreate { + pub fn into_domain(self) -> Result { + let ts = current_timestamp(); + let v = json!({ + "id": self.id, + "caller_id": self.caller_id, + "context_id": self.context_id, + "script": self.script, + "script_type": self.script_type, + "timeout": self.timeout, + "retries": self.retries, + "env_vars": self.env_vars, + "result": self.result.unwrap_or_default(), + "prerequisites": self.prerequisites, + "depends": self.depends, + "created_at": ts, + "updated_at": ts, + "status": "Dispatched", + }); + serde_json::from_value(v).map_err(|e| e.to_string()) + } +} + +#[derive(Debug, Deserialize)] +pub struct MessageCreate { + pub id: u32, + pub caller_id: u32, + pub context_id: u32, + pub message: String, + // Note: model uses ScriptType for message_type (keep as-is) + pub message_type: ScriptType, + pub message_format_type: MessageFormatTypeDto, + pub timeout: u32, + pub timeout_ack: u32, + pub timeout_result: u32, + pub job: Vec, + #[serde(default)] + pub logs: Option>, +} +impl MessageCreate { + pub fn into_domain(self) -> Result { + let ts = current_timestamp(); + let jobs: Result, String> = self + .job + .into_iter() + .map(|j| { + let jd: Job = j.into_domain()?; + serde_json::to_value(jd).map_err(|e| e.to_string()) + }) + .collect(); + let v = json!({ + "id": self.id, + "caller_id": self.caller_id, + "context_id": self.context_id, + "message": self.message, + "message_type": self.message_type, + "message_format_type": self.message_format_type, // "Html" | "Text" | "Md" + "timeout": self.timeout, + "timeout_ack": self.timeout_ack, + "timeout_result": self.timeout_result, + "job": jobs?, + "logs": self.logs.unwrap_or_default(), + "created_at": ts, + "updated_at": ts, + "status": "Dispatched", + }); + serde_json::from_value(v).map_err(|e| e.to_string()) + } +} + +#[derive(Debug, Deserialize)] +pub struct ActorCreateParams { + pub context_id: u32, + pub actor: ActorCreate, +} +#[derive(Debug, Deserialize)] +pub struct ActorLoadParams { + pub context_id: u32, + pub id: u32, +} + +#[derive(Debug, Deserialize)] +pub struct ContextCreateParams { + pub context: ContextCreate, +} +#[derive(Debug, Deserialize)] +pub struct ContextLoadParams { + pub id: u32, +} + +#[derive(Debug, Deserialize)] +pub struct RunnerCreateParams { + pub context_id: u32, + pub runner: RunnerCreate, +} +#[derive(Debug, Deserialize)] +pub struct RunnerLoadParams { + pub context_id: u32, + pub id: u32, +} + +#[derive(Debug, Deserialize)] +pub struct FlowCreateParams { + pub context_id: u32, + pub flow: FlowCreate, +} +#[derive(Debug, Deserialize)] +pub struct FlowLoadParams { + pub context_id: u32, + pub id: u32, +} + +#[derive(Debug, Deserialize)] +pub struct JobCreateParams { + pub context_id: u32, + pub job: JobCreate, +} +#[derive(Debug, Deserialize)] +pub struct JobLoadParams { + pub context_id: u32, + pub caller_id: u32, + pub id: u32, +} + +#[derive(Debug, Deserialize)] +pub struct MessageCreateParams { + pub context_id: u32, + pub message: MessageCreate, +} +#[derive(Debug, Deserialize)] +pub struct MessageLoadParams { + pub context_id: u32, + pub caller_id: u32, + pub id: u32, +} + +// ----------------------------- +// Rpc module builder (manual registration) +// ----------------------------- + +pub fn build_module(state: Arc) -> RpcModule<()> { + let mut module: RpcModule<()> = RpcModule::new(()); + + // Actor + { + let state = state.clone(); + module + .register_async_method("actor.create", move |params, _caller, _ctx| { + let state = state.clone(); + async move { + let p: ActorCreateParams = params.parse().map_err(invalid_params_err)?; + let actor = p.actor.into_domain().map_err(invalid_params_err)?; + state + .redis + .save_actor(p.context_id, &actor) + .await + .map_err(storage_err)?; + { + let out: serde_json::Value = + serde_json::to_value(actor).map_err(invalid_params_err)?; + Ok::(out) + } + } + }) + .expect("register actor.create"); + } + { + let state = state.clone(); + module + .register_async_method("actor.load", move |params, _caller, _ctx| { + let state = state.clone(); + async move { + let p: ActorLoadParams = params.parse().map_err(invalid_params_err)?; + let actor = state + .redis + .load_actor(p.context_id, p.id) + .await + .map_err(storage_err)?; + { + let out: serde_json::Value = + serde_json::to_value(actor).map_err(invalid_params_err)?; + Ok::(out) + } + } + }) + .expect("register actor.load"); + } + + // Context + { + let state = state.clone(); + module + .register_async_method("context.create", move |params, _caller, _ctx| { + let state = state.clone(); + async move { + let p: ContextCreateParams = params.parse().map_err(invalid_params_err)?; + let ctx = p.context.into_domain().map_err(invalid_params_err)?; + state.redis.save_context(&ctx).await.map_err(storage_err)?; + { + let out: serde_json::Value = + serde_json::to_value(ctx).map_err(invalid_params_err)?; + Ok::(out) + } + } + }) + .expect("register context.create"); + } + { + let state = state.clone(); + module + .register_async_method("context.load", move |params, _caller, _ctx| { + let state = state.clone(); + async move { + let p: ContextLoadParams = params.parse().map_err(invalid_params_err)?; + let ctx = state.redis.load_context(p.id).await.map_err(storage_err)?; + { + let out: serde_json::Value = + serde_json::to_value(ctx).map_err(invalid_params_err)?; + Ok::(out) + } + } + }) + .expect("register context.load"); + } + + // Runner + { + let state = state.clone(); + module + .register_async_method("runner.create", move |params, _caller, _ctx| { + let state = state.clone(); + async move { + let p: RunnerCreateParams = params.parse().map_err(invalid_params_err)?; + let runner = p.runner.into_domain().map_err(invalid_params_err)?; + state + .redis + .save_runner(p.context_id, &runner) + .await + .map_err(storage_err)?; + { + let out: serde_json::Value = + serde_json::to_value(runner).map_err(invalid_params_err)?; + Ok::(out) + } + } + }) + .expect("register runner.create"); + } + { + let state = state.clone(); + module + .register_async_method("runner.load", move |params, _caller, _ctx| { + let state = state.clone(); + async move { + let p: RunnerLoadParams = params.parse().map_err(invalid_params_err)?; + let runner = state + .redis + .load_runner(p.context_id, p.id) + .await + .map_err(storage_err)?; + { + let out: serde_json::Value = + serde_json::to_value(runner).map_err(invalid_params_err)?; + Ok::(out) + } + } + }) + .expect("register runner.load"); + } + + // Flow + { + let state = state.clone(); + module + .register_async_method("flow.create", move |params, _caller, _ctx| { + let state = state.clone(); + async move { + let p: FlowCreateParams = params.parse().map_err(invalid_params_err)?; + let flow = p.flow.into_domain().map_err(invalid_params_err)?; + state + .redis + .save_flow(p.context_id, &flow) + .await + .map_err(storage_err)?; + { + let out: serde_json::Value = + serde_json::to_value(flow).map_err(invalid_params_err)?; + Ok::(out) + } + } + }) + .expect("register flow.create"); + } + { + let state = state.clone(); + module + .register_async_method("flow.load", move |params, _caller, _ctx| { + let state = state.clone(); + async move { + let p: FlowLoadParams = params.parse().map_err(invalid_params_err)?; + let flow = state + .redis + .load_flow(p.context_id, p.id) + .await + .map_err(storage_err)?; + { + let out: serde_json::Value = + serde_json::to_value(flow).map_err(invalid_params_err)?; + Ok::(out) + } + } + }) + .expect("register flow.load"); + } + + // Job + { + let state = state.clone(); + module + .register_async_method("job.create", move |params, _caller, _ctx| { + let state = state.clone(); + async move { + let p: JobCreateParams = params.parse().map_err(invalid_params_err)?; + let job = p.job.into_domain().map_err(invalid_params_err)?; + state + .redis + .save_job(p.context_id, &job) + .await + .map_err(storage_err)?; + { + let out: serde_json::Value = + serde_json::to_value(job).map_err(invalid_params_err)?; + Ok::(out) + } + } + }) + .expect("register job.create"); + } + { + let state = state.clone(); + module + .register_async_method("job.load", move |params, _caller, _ctx| { + let state = state.clone(); + async move { + let p: JobLoadParams = params.parse().map_err(invalid_params_err)?; + let job = state + .redis + .load_job(p.context_id, p.caller_id, p.id) + .await + .map_err(storage_err)?; + { + let out: serde_json::Value = + serde_json::to_value(job).map_err(invalid_params_err)?; + Ok::(out) + } + } + }) + .expect("register job.load"); + } + + // Message + { + let state = state.clone(); + module + .register_async_method("message.create", move |params, _caller, _ctx| { + let state = state.clone(); + async move { + let p: MessageCreateParams = params.parse().map_err(invalid_params_err)?; + let message = p.message.into_domain().map_err(invalid_params_err)?; + state + .redis + .save_message(p.context_id, &message) + .await + .map_err(storage_err)?; + { + let out: serde_json::Value = + serde_json::to_value(message).map_err(invalid_params_err)?; + Ok::(out) + } + } + }) + .expect("register message.create"); + } + { + let state = state; + module + .register_async_method("message.load", move |params, _caller, _ctx| { + let state = state.clone(); + async move { + let p: MessageLoadParams = params.parse().map_err(invalid_params_err)?; + let msg = state + .redis + .load_message(p.context_id, p.caller_id, p.id) + .await + .map_err(storage_err)?; + { + let out: serde_json::Value = + serde_json::to_value(msg).map_err(invalid_params_err)?; + Ok::(out) + } + } + }) + .expect("register message.load"); + } + + module +} + +// ----------------------------- +// Server runners (HTTP/WS on separate listeners) +// ----------------------------- + +pub async fn start_http( + addr: SocketAddr, + module: RpcModule, +) -> Result> { + let server = ServerBuilder::default().build(addr).await?; + let handle = server.start(module); + Ok(handle) +} + +pub async fn start_ws( + addr: SocketAddr, + module: RpcModule, +) -> Result> { + // jsonrpsee server supports both HTTP and WS; using a second listener gives us a dedicated WS port. + let server = ServerBuilder::default().build(addr).await?; + let handle = server.start(module); + Ok(handle) +} +