diff --git a/scripts/jsonrpc_demo.py b/scripts/jsonrpc_demo.py index bcf753f..08a1fe1 100644 --- a/scripts/jsonrpc_demo.py +++ b/scripts/jsonrpc_demo.py @@ -123,6 +123,7 @@ def main(): job_i = 3009 flow_id = 4001 +runner_id = 2001 print_header("actor.create") actor = client.call("actor.create", { "actor": { @@ -147,6 +148,20 @@ def main(): } }) pretty_print(context) +print_header("runner.create") + runner = client.call("runner.create", { + "context_id": context_id, + "runner": { + "id": runner_id, + "pubkey": "", # leave empty to route by IP + "address": "127.0.0.1", + "topic": f"runner{runner_id}", + "script_type": "Python", + "local": True, + "secret": "demo-secret" + } + }) + pretty_print(runner) print_header("job.create - A (root)") jobA = client.call("job.create", { diff --git a/scripts/supervisor_flow_demo.py b/scripts/supervisor_flow_demo.py index fc63a9d..0c1174c 100644 --- a/scripts/supervisor_flow_demo.py +++ b/scripts/supervisor_flow_demo.py @@ -13,8 +13,8 @@ This script: Transport: JSON-RPC over HTTP to the Coordinator (default COORDINATOR_URL=http://127.0.0.1:9652). Example usage: - COORDINATOR_URL=http://127.0.0.1:9652 python3 scripts/supervisor_flow_demo.py --dst-ip 2001:db8::1 - COORDINATOR_URL=http://127.0.0.1:9652 python3 scripts/supervisor_flow_demo.py --dst-pk bb39b4a3a4efd70f3e05e37887677e02efbda14681d0acd3882bc0f754792c32 + COORDINATOR_URL=http://127.0.0.1:9652 python3 scripts/supervisor_flow_demo.py --dst-ip 2001:db8::1 [--secret your-secret] + COORDINATOR_URL=http://127.0.0.1:9652 python3 scripts/supervisor_flow_demo.py --dst-pk bb39b4a3a4efd70f3e05e37887677e02efbda14681d0acd3882bc0f754792c32 [--secret your-secret] Notes: - Exactly one of --dst-ip or --dst-pk must be provided. @@ -123,6 +123,7 @@ def parse_args() -> argparse.Namespace: help="ScriptType for jobs/runner. Default: Python" ) p.add_argument("--topic", default="supervisor.rpc", help="Supervisor topic. Default: supervisor.rpc") + p.add_argument("--secret", help="Optional supervisor secret used for authenticated supervisor calls") p.add_argument("--poll-interval", type=float, default=2.0, help="Flow poll interval seconds. Default: 2.0") p.add_argument("--poll-timeout", type=int, default=600, help="Max seconds to wait for flow completion. Default: 600") return p.parse_args() @@ -191,16 +192,20 @@ def main(): print_header("runner.create (or load)") # runner.load requires both context_id and id try: + runner_payload = { + "id": runner_id, + "pubkey": runner_pubkey, + "address": runner_address, + "topic": topic, + "script_type": script_type, + "local": False, + } + # Optional supervisor secret used by router for authenticated supervisor calls + if getattr(args, "secret", None): + runner_payload["secret"] = args.secret runner = client.call("runner.create", { "context_id": context_id, - "runner": { - "id": runner_id, - "pubkey": runner_pubkey, - "address": runner_address, - "topic": topic, - "script_type": script_type, - "local": False - } + "runner": runner_payload }) except RuntimeError as e: msg = str(e) diff --git a/specs/openrpc.json b/specs/openrpc.json index a566b15..67425bc 100644 --- a/specs/openrpc.json +++ b/specs/openrpc.json @@ -563,6 +563,9 @@ "local": { "type": "boolean" }, + "secret": { + "type": "string" + }, "created_at": { "type": "integer", "format": "int64" @@ -1001,6 +1004,9 @@ }, "local": { "type": "boolean" + }, + "secret": { + "type": "string" } } }, diff --git a/src/models/runner.rs b/src/models/runner.rs index 15515e0..d6d6ae0 100644 --- a/src/models/runner.rs +++ b/src/models/runner.rs @@ -18,6 +18,8 @@ pub struct Runner { pub script_type: ScriptType, /// If this is true, the runner also listens on a local redis queue pub local: bool, + /// Optional secret used for authenticated supervisor calls (if required) + pub secret: Option, pub created_at: Timestamp, pub updated_at: Timestamp, } diff --git a/src/router.rs b/src/router.rs index 360325e..660c906 100644 --- a/src/router.rs +++ b/src/router.rs @@ -140,11 +140,12 @@ async fn deliver_one( // Keep clones for poller usage let dest_for_poller = dest.clone(); let topic_for_poller = cfg.topic.clone(); + let secret_for_poller = runner.secret.clone(); let client = SupervisorClient::new_with_client( mycelium.clone(), dest.clone(), cfg.topic.clone(), - None, // secret + runner.secret.clone(), ); // Build supervisor method and params from Message @@ -234,7 +235,7 @@ async fn deliver_one( client.clone(), sup_dest.clone(), sup_topic.clone(), - None, + secret_for_poller.clone(), ); match sup.job_status_sync(job_id.to_string(), 10).await { Ok(remote_status) => { @@ -256,7 +257,7 @@ async fn deliver_one( client.clone(), sup_dest.clone(), sup_topic.clone(), - None, + secret_for_poller.clone(), ); match sup .job_result_sync( diff --git a/src/rpc.rs b/src/rpc.rs index 002655b..9ea4dfe 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -150,6 +150,8 @@ pub struct RunnerCreate { /// The script type this runner executes (used for routing) pub script_type: ScriptType, pub local: bool, + /// Optional secret used for authenticated supervisor calls (if required) + pub secret: Option, } impl RunnerCreate { pub fn into_domain(self) -> Runner { @@ -162,6 +164,7 @@ impl RunnerCreate { topic, script_type, local, + secret, } = self; Runner { @@ -171,6 +174,7 @@ impl RunnerCreate { topic, script_type, local, + secret, created_at: ts, updated_at: ts, }