@@ -123,6 +123,7 @@ def main():
|
||||
job_i = 3009
|
||||
flow_id = 4001
|
||||
|
||||
runner_id = 2001
|
||||
print_header("actor.create")
|
||||
actor = client.call("actor.create", {
|
||||
"actor": {
|
||||
@@ -147,6 +148,20 @@ def main():
|
||||
}
|
||||
})
|
||||
pretty_print(context)
|
||||
print_header("runner.create")
|
||||
runner = client.call("runner.create", {
|
||||
"context_id": context_id,
|
||||
"runner": {
|
||||
"id": runner_id,
|
||||
"pubkey": "", # leave empty to route by IP
|
||||
"address": "127.0.0.1",
|
||||
"topic": f"runner{runner_id}",
|
||||
"script_type": "Python",
|
||||
"local": True,
|
||||
"secret": "demo-secret"
|
||||
}
|
||||
})
|
||||
pretty_print(runner)
|
||||
|
||||
print_header("job.create - A (root)")
|
||||
jobA = client.call("job.create", {
|
||||
|
@@ -13,8 +13,8 @@ This script:
|
||||
Transport: JSON-RPC over HTTP to the Coordinator (default COORDINATOR_URL=http://127.0.0.1:9652).
|
||||
|
||||
Example usage:
|
||||
COORDINATOR_URL=http://127.0.0.1:9652 python3 scripts/supervisor_flow_demo.py --dst-ip 2001:db8::1
|
||||
COORDINATOR_URL=http://127.0.0.1:9652 python3 scripts/supervisor_flow_demo.py --dst-pk bb39b4a3a4efd70f3e05e37887677e02efbda14681d0acd3882bc0f754792c32
|
||||
COORDINATOR_URL=http://127.0.0.1:9652 python3 scripts/supervisor_flow_demo.py --dst-ip 2001:db8::1 [--secret your-secret]
|
||||
COORDINATOR_URL=http://127.0.0.1:9652 python3 scripts/supervisor_flow_demo.py --dst-pk bb39b4a3a4efd70f3e05e37887677e02efbda14681d0acd3882bc0f754792c32 [--secret your-secret]
|
||||
|
||||
Notes:
|
||||
- Exactly one of --dst-ip or --dst-pk must be provided.
|
||||
@@ -123,6 +123,7 @@ def parse_args() -> argparse.Namespace:
|
||||
help="ScriptType for jobs/runner. Default: Python"
|
||||
)
|
||||
p.add_argument("--topic", default="supervisor.rpc", help="Supervisor topic. Default: supervisor.rpc")
|
||||
p.add_argument("--secret", help="Optional supervisor secret used for authenticated supervisor calls")
|
||||
p.add_argument("--poll-interval", type=float, default=2.0, help="Flow poll interval seconds. Default: 2.0")
|
||||
p.add_argument("--poll-timeout", type=int, default=600, help="Max seconds to wait for flow completion. Default: 600")
|
||||
return p.parse_args()
|
||||
@@ -191,16 +192,20 @@ def main():
|
||||
print_header("runner.create (or load)")
|
||||
# runner.load requires both context_id and id
|
||||
try:
|
||||
runner_payload = {
|
||||
"id": runner_id,
|
||||
"pubkey": runner_pubkey,
|
||||
"address": runner_address,
|
||||
"topic": topic,
|
||||
"script_type": script_type,
|
||||
"local": False,
|
||||
}
|
||||
# Optional supervisor secret used by router for authenticated supervisor calls
|
||||
if getattr(args, "secret", None):
|
||||
runner_payload["secret"] = args.secret
|
||||
runner = client.call("runner.create", {
|
||||
"context_id": context_id,
|
||||
"runner": {
|
||||
"id": runner_id,
|
||||
"pubkey": runner_pubkey,
|
||||
"address": runner_address,
|
||||
"topic": topic,
|
||||
"script_type": script_type,
|
||||
"local": False
|
||||
}
|
||||
"runner": runner_payload
|
||||
})
|
||||
except RuntimeError as e:
|
||||
msg = str(e)
|
||||
|
@@ -563,6 +563,9 @@
|
||||
"local": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"secret": {
|
||||
"type": "string"
|
||||
},
|
||||
"created_at": {
|
||||
"type": "integer",
|
||||
"format": "int64"
|
||||
@@ -1001,6 +1004,9 @@
|
||||
},
|
||||
"local": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"secret": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
},
|
||||
|
@@ -18,6 +18,8 @@ pub struct Runner {
|
||||
pub script_type: ScriptType,
|
||||
/// If this is true, the runner also listens on a local redis queue
|
||||
pub local: bool,
|
||||
/// Optional secret used for authenticated supervisor calls (if required)
|
||||
pub secret: Option<String>,
|
||||
pub created_at: Timestamp,
|
||||
pub updated_at: Timestamp,
|
||||
}
|
||||
|
@@ -140,11 +140,12 @@ async fn deliver_one(
|
||||
// Keep clones for poller usage
|
||||
let dest_for_poller = dest.clone();
|
||||
let topic_for_poller = cfg.topic.clone();
|
||||
let secret_for_poller = runner.secret.clone();
|
||||
let client = SupervisorClient::new_with_client(
|
||||
mycelium.clone(),
|
||||
dest.clone(),
|
||||
cfg.topic.clone(),
|
||||
None, // secret
|
||||
runner.secret.clone(),
|
||||
);
|
||||
|
||||
// Build supervisor method and params from Message
|
||||
@@ -234,7 +235,7 @@ async fn deliver_one(
|
||||
client.clone(),
|
||||
sup_dest.clone(),
|
||||
sup_topic.clone(),
|
||||
None,
|
||||
secret_for_poller.clone(),
|
||||
);
|
||||
match sup.job_status_sync(job_id.to_string(), 10).await {
|
||||
Ok(remote_status) => {
|
||||
@@ -256,7 +257,7 @@ async fn deliver_one(
|
||||
client.clone(),
|
||||
sup_dest.clone(),
|
||||
sup_topic.clone(),
|
||||
None,
|
||||
secret_for_poller.clone(),
|
||||
);
|
||||
match sup
|
||||
.job_result_sync(
|
||||
|
@@ -150,6 +150,8 @@ pub struct RunnerCreate {
|
||||
/// The script type this runner executes (used for routing)
|
||||
pub script_type: ScriptType,
|
||||
pub local: bool,
|
||||
/// Optional secret used for authenticated supervisor calls (if required)
|
||||
pub secret: Option<String>,
|
||||
}
|
||||
impl RunnerCreate {
|
||||
pub fn into_domain(self) -> Runner {
|
||||
@@ -162,6 +164,7 @@ impl RunnerCreate {
|
||||
topic,
|
||||
script_type,
|
||||
local,
|
||||
secret,
|
||||
} = self;
|
||||
|
||||
Runner {
|
||||
@@ -171,6 +174,7 @@ impl RunnerCreate {
|
||||
topic,
|
||||
script_type,
|
||||
local,
|
||||
secret,
|
||||
created_at: ts,
|
||||
updated_at: ts,
|
||||
}
|
||||
|
Reference in New Issue
Block a user