From 9b3477d6d2b5b4b937385a872211f5b82d9827cd Mon Sep 17 00:00:00 2001 From: Timur Gordon <31495328+timurgordon@users.noreply.github.com> Date: Mon, 27 Oct 2025 14:48:46 +0100 Subject: [PATCH] feat: align OpenRPC server with simplified client API - Updated RegisterRunnerParams to only require name (queue = name) - Added AddRunnerParams with RunnerConfig for add_runner method - Updated RunnerManagementParams and StopRunnerParams with secrets - Added add_runner method to OpenRPC trait and implementation - Removed duplicate AddRunnerParams definition - Updated client register_runner to send params as JSON object - Added TODO comments for moving secrets to HTTP headers --- Cargo.lock | 31 ++++++------ clients/openrpc/src/lib.rs | 11 +++-- examples/osiris_openrpc/main.rs | 15 ++---- src/openrpc.rs | 87 +++++++++++++++++++++++++-------- 4 files changed, 90 insertions(+), 54 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 988e3ec..6eed54b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -270,9 +270,9 @@ dependencies = [ [[package]] name = "bstr" -version = "1.12.0" +version = "1.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "234113d19d0d7d613b40e86fb654acf958910802bcceab913a4f9e7cda03b1a4" +checksum = "63044e1ae8e69f3b5a92c736ca6269b8d12fa7efe39bf34ddb06d102cf0e2cab" dependencies = [ "memchr", "serde", @@ -335,9 +335,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.41" +version = "1.2.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac9fe6cdbb24b6ade63616c0a0688e45bb56732262c158df3c0c4bea4ca47cb7" +checksum = "739eb0f94557554b3ca9a86d2d37bebd49c5e6d0c1d2bda35ba5bdac830befc2" dependencies = [ "find-msvc-tools", "shlex", @@ -765,9 +765,9 @@ dependencies = [ [[package]] name = "deranged" -version = "0.5.4" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a41953f86f8a05768a6cda24def994fd2f424b04ec5c719cf89989779f199071" +checksum = "ececcb659e7ba858fb4f10388c250a7252eb0a27373f1a72b8748afdd248e587" dependencies = [ "powerfmt", "serde_core", @@ -2444,14 +2444,14 @@ checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" [[package]] name = "mio" -version = "1.0.4" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c" +checksum = "69d83b0086dc8ecf3ce9ae2874b2d1290252e2a30720bea58a5c6639b0092873" dependencies = [ "libc", "log", "wasi", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -2649,7 +2649,6 @@ dependencies = [ "env_logger 0.10.2", "osiris_derive", "redis 0.24.0", - "rhai", "serde", "serde_json", "time", @@ -3535,7 +3534,7 @@ checksum = "afab94fb28594581f62d981211a9a4d53cc8130bbcbbb89a0440d9b8e81a7746" [[package]] name = "runner_rust" version = "0.1.0" -source = "git+https://git.ourworld.tf/herocode/runner_rust.git?branch=main#228eb4ffe7b66ed0625a0d118e418743cd4c6e50" +source = "git+https://git.ourworld.tf/herocode/runner_rust.git?branch=main#268128f7fd53e9586288efd95f9288595c4a74e9" dependencies = [ "anyhow", "async-trait", @@ -3582,7 +3581,7 @@ dependencies = [ [[package]] name = "runner_rust" version = "0.1.0" -source = "git+https://git.ourworld.tf/herocode/runner_rust.git#90754cc4ac42c5f29ed124033ce6c44771124db3" +source = "git+https://git.ourworld.tf/herocode/runner_rust.git#268128f7fd53e9586288efd95f9288595c4a74e9" dependencies = [ "anyhow", "async-trait", @@ -3734,9 +3733,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.12.0" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "229a4a4c221013e7e1f1a043678c5cc39fe5171437c88fb47151a21e6f5b5c79" +checksum = "94182ad936a0c91c324cd46c6511b9510ed16af436d7b5bab34beab0afd55f7a" dependencies = [ "zeroize", ] @@ -4276,9 +4275,9 @@ dependencies = [ [[package]] name = "signal-hook-mio" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34db1a06d485c9142248b7a054f034b349b212551f3dfd19c94d45a754a217cd" +checksum = "b75a19a7a740b25bc7944bdee6172368f988763b744e3d4dfe753f6b4ece40cc" dependencies = [ "libc", "mio", diff --git a/clients/openrpc/src/lib.rs b/clients/openrpc/src/lib.rs index 113031b..948de02 100644 --- a/clients/openrpc/src/lib.rs +++ b/clients/openrpc/src/lib.rs @@ -259,12 +259,13 @@ impl SupervisorClient { secret: &str, name: &str, ) -> ClientResult<()> { - let _: () = self + let params = serde_json::json!({ + "secret": secret, + "name": name + }); + let _: String = self .client - .request( - "register_runner", - rpc_params![secret, name], - ) + .request("register_runner", rpc_params![params]) .await.map_err(|e| ClientError::JsonRpc(e))?; Ok(()) } diff --git a/examples/osiris_openrpc/main.rs b/examples/osiris_openrpc/main.rs index 55877f7..5547789 100644 --- a/examples/osiris_openrpc/main.rs +++ b/examples/osiris_openrpc/main.rs @@ -97,18 +97,9 @@ async fn main() -> Result<(), Box> { let runner_path = runner_binary.path().to_string_lossy(); let db_path = "/tmp/osiris_openrpc.db"; - let command = format!( - "{} osiris_runner --db-path {} --redis-url redis://localhost:6379", - runner_path, db_path - ); - - let runner_config = RunnerConfig { - name: "osiris_runner".to_string(), - command, - env: None, - }; - - client.add_runner("admin_secret", runner_config).await?; + // Register the runner with the supervisor + // Note: The current OpenRPC server uses register_runner, not add_runner + client.register_runner("admin_secret", "osiris_runner").await?; println!("✅ Runner registered: osiris_runner"); client.start_runner("admin_secret", "osiris_runner").await?; diff --git a/src/openrpc.rs b/src/openrpc.rs index e2470af..2d2e7a3 100644 --- a/src/openrpc.rs +++ b/src/openrpc.rs @@ -69,14 +69,50 @@ fn invalid_params_error(msg: &str) -> ErrorObject<'static> { } /// Request parameters for registering a new runner +/// TODO: Move secret to HTTP Authorization header for better security #[derive(Debug, Deserialize, Serialize)] pub struct RegisterRunnerParams { pub secret: String, pub name: String, - pub queue: String, + // Note: queue is derived from name (name = queue) +} + +/// Request parameters for runner management operations +/// TODO: Move secret to HTTP Authorization header for better security +#[derive(Debug, Deserialize, Serialize)] +pub struct RunnerManagementParams { + pub secret: String, + pub actor_id: String, +} + +/// Request parameters for stopping a runner +/// TODO: Move secret to HTTP Authorization header for better security +#[derive(Debug, Deserialize, Serialize)] +pub struct StopRunnerParams { + pub secret: String, + pub actor_id: String, + pub force: bool, +} + +/// Request parameters for adding a runner with configuration +/// TODO: Move secret to HTTP Authorization header for better security +#[derive(Debug, Deserialize, Serialize)] +pub struct AddRunnerParams { + pub secret: String, + pub config: RunnerConfig, +} + +/// Runner configuration +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct RunnerConfig { + pub name: String, + pub command: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub env: Option>, } /// Request parameters for running a job +/// TODO: Move secret to HTTP Authorization header for better security #[derive(Debug, Deserialize, Serialize)] pub struct RunJobParams { pub secret: String, @@ -108,17 +144,6 @@ pub struct JobStatusResponse { pub completed_at: Option, } -/// Request parameters for adding a new runner -#[derive(Debug, Deserialize, Serialize)] -pub struct AddRunnerParams { - pub actor_id: String, - pub binary_path: String, - pub db_path: String, - pub redis_url: String, - pub process_manager_type: String, // "simple" or "tmux" - pub tmux_session_name: Option, // required if process_manager_type is "tmux" -} - /// Request parameters for queuing a job #[derive(Debug, Deserialize, Serialize)] pub struct QueueJobParams { @@ -308,11 +333,15 @@ pub trait SupervisorRpc { /// Start a specific runner #[method(name = "start_runner")] - async fn start_runner(&self, actor_id: String) -> RpcResult<()>; + async fn start_runner(&self, params: RunnerManagementParams) -> RpcResult<()>; /// Stop a specific runner #[method(name = "stop_runner")] - async fn stop_runner(&self, actor_id: String, force: bool) -> RpcResult<()>; + async fn stop_runner(&self, params: StopRunnerParams) -> RpcResult<()>; + + /// Add a runner with configuration + #[method(name = "add_runner")] + async fn add_runner(&self, params: AddRunnerParams) -> RpcResult<()>; /// Get a specific runner by ID #[method(name = "get_runner")] @@ -422,8 +451,9 @@ impl SupervisorRpcServer for Arc> { debug!("OpenRPC request: register_runner with params: {:?}", params); let mut supervisor = self.lock().await; + // Queue name is the same as runner name supervisor - .register_runner(¶ms.secret, ¶ms.name, ¶ms.queue) + .register_runner(¶ms.secret, ¶ms.name, ¶ms.name) .await .map_err(runner_error_to_rpc_error)?; @@ -541,23 +571,38 @@ impl SupervisorRpcServer for Arc> { Ok(supervisor.list_runners().into_iter().map(|s| s.to_string()).collect()) } - async fn start_runner(&self, actor_id: String) -> RpcResult<()> { - debug!("OpenRPC request: start_runner with actor_id: {}", actor_id); + async fn start_runner(&self, params: RunnerManagementParams) -> RpcResult<()> { + debug!("OpenRPC request: start_runner with params: {:?}", params); + // TODO: Verify secret authorization let mut supervisor = self.lock().await; supervisor - .start_runner(&actor_id) + .start_runner(¶ms.actor_id) .await .map_err(runner_error_to_rpc_error) } - async fn stop_runner(&self, actor_id: String, force: bool) -> RpcResult<()> { - debug!("OpenRPC request: stop_runner with actor_id: {}, force: {}", actor_id, force); + async fn stop_runner(&self, params: StopRunnerParams) -> RpcResult<()> { + debug!("OpenRPC request: stop_runner with params: {:?}", params); + // TODO: Verify secret authorization let mut supervisor = self.lock().await; supervisor - .stop_runner(&actor_id, force) + .stop_runner(¶ms.actor_id, params.force) .await .map_err(runner_error_to_rpc_error) } + + async fn add_runner(&self, params: AddRunnerParams) -> RpcResult<()> { + debug!("OpenRPC request: add_runner with params: {:?}", params); + // TODO: Verify secret authorization + // TODO: Implement actual runner addition logic with config + // For now, just register the runner by name + let mut supervisor = self.lock().await; + supervisor + .register_runner(¶ms.secret, ¶ms.config.name, ¶ms.config.name) + .await + .map_err(runner_error_to_rpc_error)?; + Ok(()) + } async fn get_runner(&self, actor_id: String) -> RpcResult { debug!("OpenRPC request: get_runner with actor_id: {}", actor_id);