From c4971aa7949e26881ce92320d2c4a1383d832ae2 Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Wed, 3 Sep 2025 20:17:12 +0200 Subject: [PATCH] Add full flow script example Signed-off-by: Lee Smet --- scripts/supervisor_flow_demo.py | 349 ++++++++++++++++++++++++++++++++ 1 file changed, 349 insertions(+) create mode 100644 scripts/supervisor_flow_demo.py diff --git a/scripts/supervisor_flow_demo.py b/scripts/supervisor_flow_demo.py new file mode 100644 index 0000000..fc63a9d --- /dev/null +++ b/scripts/supervisor_flow_demo.py @@ -0,0 +1,349 @@ +#!/usr/bin/env python3 +""" +Supervisor flow demo for HeroCoordinator. + +This script: +- Creates an actor +- Creates a context granting the actor admin/reader/executor privileges +- Registers a Runner in the context targeting a Supervisor reachable via Mycelium (by public key or IP) +- Creates simple Python jobs (text jobs) with a small dependency chain +- Creates a flow referencing those jobs +- Starts the flow and polls until it finishes (or errors) + +Transport: JSON-RPC over HTTP to the Coordinator (default COORDINATOR_URL=http://127.0.0.1:9652). + +Example usage: + COORDINATOR_URL=http://127.0.0.1:9652 python3 scripts/supervisor_flow_demo.py --dst-ip 2001:db8::1 + COORDINATOR_URL=http://127.0.0.1:9652 python3 scripts/supervisor_flow_demo.py --dst-pk bb39b4a3a4efd70f3e05e37887677e02efbda14681d0acd3882bc0f754792c32 + +Notes: +- Exactly one of --dst-ip or --dst-pk must be provided. +- Runner.topic defaults to "supervisor.rpc" (see main.rs). +- The router auto-discovers contexts and will deliver job.run messages to the supervisor. +""" + +import argparse +import json +import os +import sys +import time +from typing import Any, Dict, List, Optional, Tuple +from urllib import request, error + +JSONRPC_VERSION = "2.0" + + +def env_url() -> str: + return os.getenv("COORDINATOR_URL", "http://127.0.0.1:9652").rstrip("/") + + +class JsonRpcClient: + def __init__(self, url: str): + self.url = url + self._id = 0 + + def call(self, method: str, params: Dict[str, Any]) -> Any: + self._id += 1 + payload = { + "jsonrpc": JSONRPC_VERSION, + "id": self._id, + "method": method, + "params": params, + } + data = json.dumps(payload).encode("utf-8") + req = request.Request(self.url, data=data, headers={"Content-Type": "application/json"}) + try: + with request.urlopen(req) as resp: + body = resp.read() + except error.HTTPError as e: + try: + details = e.read().decode("utf-8", "ignore") + except Exception: + details = "" + raise RuntimeError(f"HTTP error {e.code}: {details}") from e + except error.URLError as e: + raise RuntimeError(f"URL error: {e.reason}") from e + + try: + obj = json.loads(body.decode("utf-8")) + except Exception as e: + raise RuntimeError(f"Invalid JSON response: {body!r}") from e + + if isinstance(obj, list): + raise RuntimeError("Batch responses are not supported") + + if obj.get("error"): + raise RuntimeError(f"RPC error: {json.dumps(obj['error'])}") + + return obj.get("result") + + +def print_header(title: str): + print("\n" + "=" * 80) + print(title) + print("=" * 80) + + +def pretty(obj: Any): + print(json.dumps(obj, indent=2, sort_keys=True)) + + +def try_create_or_load(client: JsonRpcClient, create_method: str, create_params: Dict[str, Any], + load_method: str, load_params: Dict[str, Any]) -> Any: + """Attempt a create; if it fails due to existence, try load.""" + try: + return client.call(create_method, create_params) + except RuntimeError as e: + msg = str(e) + # Server maps AlreadyExists to StorageError, we don't have a structured error code here. + if "Already exists" in msg or "Storage Error" in msg or "Invalid params" in msg: + # Fall back to load + return client.call(load_method, load_params) + raise + + +def parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser(description="Create actor/context/runner/jobs/flow; start and wait until completion.") + group = p.add_mutually_exclusive_group(required=True) + group.add_argument("--dst-ip", help="Supervisor Mycelium IP address (IPv4 or IPv6)") + group.add_argument("--dst-pk", help="Supervisor public key (64-hex)") + + p.add_argument("--context-id", type=int, default=2, help="Context id (Redis DB index; 0-15). Default: 2") + p.add_argument("--actor-id", type=int, default=11001, help="Actor id. Default: 11001") + p.add_argument("--runner-id", type=int, default=12001, help="Runner id. Default: 12001") + p.add_argument("--flow-id", type=int, default=13001, help="Flow id. Default: 13001") + p.add_argument("--base-job-id", type=int, default=20000, help="Base job id for first job; subsequent jobs increment. Default: 20000") + p.add_argument("--jobs", type=int, default=3, help="Number of jobs to create (>=1). Forms a simple chain. Default: 3") + p.add_argument("--timeout-secs", type=int, default=60, help="Per-job timeout seconds. Default: 60") + p.add_argument("--retries", type=int, default=0, help="Per-job retries (0-255). Default: 0") + p.add_argument( + "--script-type", + choices=["Python", "V", "Osis", "Sal"], + default="Python", + help="ScriptType for jobs/runner. Default: Python" + ) + p.add_argument("--topic", default="supervisor.rpc", help="Supervisor topic. Default: supervisor.rpc") + p.add_argument("--poll-interval", type=float, default=2.0, help="Flow poll interval seconds. Default: 2.0") + p.add_argument("--poll-timeout", type=int, default=600, help="Max seconds to wait for flow completion. Default: 600") + return p.parse_args() + + +def main(): + args = parse_args() + if args.jobs < 1: + print("ERROR: --jobs must be >= 1", file=sys.stderr) + sys.exit(2) + + url = env_url() + client = JsonRpcClient(url) + + actor_id = int(args.actor_id) + context_id = int(args.context_id) + runner_id = int(args.runner_id) + flow_id = int(args.flow_id) + base_job_id = int(args.base_job_id) + script_type = args.script_type + timeout = int(args.timeout_secs) + retries = int(args.retries) + topic = args.topic + + # 1) Actor + print_header("actor.create (or load)") + actor = try_create_or_load( + client, + "actor.create", + { + "actor": { + "id": actor_id, + "pubkey": "demo-pubkey", + "address": ["127.0.0.1"], + } + }, + "actor.load", + {"id": actor_id}, + ) + pretty(actor) + + # 2) Context + print_header("context.create (or load)") + context = try_create_or_load( + client, + "context.create", + { + "context": { + "id": context_id, + "admins": [actor_id], + "readers": [actor_id], + "executors": [actor_id], + } + }, + "context.load", + {"id": context_id}, + ) + pretty(context) + + # 3) Runner in this context + # Router picks pubkey if non-empty, else IP address. + # However, RunnerCreate requires both fields; we fill both and control routing via pubkey empty/non-empty. + runner_pubkey = args.dst_pk if args.dst_pk else "" + runner_address = args.dst_ip if args.dst_ip else "127.0.0.1" + + print_header("runner.create (or load)") + # runner.load requires both context_id and id + try: + runner = client.call("runner.create", { + "context_id": context_id, + "runner": { + "id": runner_id, + "pubkey": runner_pubkey, + "address": runner_address, + "topic": topic, + "script_type": script_type, + "local": False + } + }) + except RuntimeError as e: + msg = str(e) + if "Already exists" in msg or "Storage Error" in msg or "Invalid params" in msg: + runner = client.call("runner.load", {"context_id": context_id, "id": runner_id}) + else: + raise + pretty(runner) + + # 4) Jobs + # Build a simple chain: J0 (root), J1 depends on J0, J2 depends on J1, ... up to N-1 + job_ids: List[int] = [] + for i in range(args.jobs): + jid = base_job_id + i + depends = [] if i == 0 else [base_job_id + (i - 1)] + job_payload = { + "id": jid, + "caller_id": actor_id, + "context_id": context_id, + "script": f"print('Job {i} running')", + "script_type": script_type, + "timeout": timeout, + "retries": retries, + "env_vars": {}, + "prerequisites": [], + "depends": depends, + } + print_header(f"job.create - {jid} {'(root)' if not depends else f'(depends on {depends})'}") + try: + job = client.call("job.create", { + "context_id": context_id, + "job": job_payload + }) + except RuntimeError as e: + msg = str(e) + if "Already exists" in msg or "Storage Error" in msg or "Invalid params" in msg: + job = client.call("job.load", { + "context_id": context_id, + "caller_id": actor_id, + "id": jid + }) + else: + raise + pretty(job) + job_ids.append(jid) + + # 5) Flow + print_header("flow.create (or load)") + try: + flow = client.call("flow.create", { + "context_id": context_id, + "flow": { + "id": flow_id, + "caller_id": actor_id, + "context_id": context_id, + "jobs": job_ids, + "env_vars": {} + } + }) + except RuntimeError as e: + msg = str(e) + if "Already exists" in msg or "Storage Error" in msg or "Invalid params" in msg: + flow = client.call("flow.load", {"context_id": context_id, "id": flow_id}) + else: + raise + pretty(flow) + + # Optional: show DAG + try: + print_header("flow.dag") + dag = client.call("flow.dag", {"context_id": context_id, "id": flow_id}) + pretty(dag) + except Exception as e: + print(f"WARN: flow.dag failed: {e}", file=sys.stderr) + + # 6) Start flow (idempotent; returns bool whether scheduler started) + print_header("flow.start") + started = client.call("flow.start", {"context_id": context_id, "id": flow_id}) + print(f"flow.start -> {started}") + + # 7) Poll until Finished or Error (or timeout) + print_header("Polling flow.load until completion") + t0 = time.time() + status = None + last_status_print = 0.0 + poll_count = 0 + while True: + poll_count += 1 + flow = client.call("flow.load", {"context_id": context_id, "id": flow_id}) + status = flow.get("status") + now = time.time() + if now - last_status_print >= max(1.0, float(args.poll_interval)): + print(f"[{int(now - t0)}s] flow.status = {status}") + last_status_print = now + + # Every 5th poll, print the current flow DAG + if (poll_count % 5) == 0: + try: + print_header("flow.dag (periodic)") + dag = client.call("flow.dag", {"context_id": context_id, "id": flow_id}) + pretty(dag) + except Exception as e: + print(f"WARN: periodic flow.dag failed: {e}", file=sys.stderr) + + if status in ("Finished", "Error"): + break + if (now - t0) > args.poll_timeout: + print(f"ERROR: Flow did not complete within {args.poll_timeout}s (status={status})", file=sys.stderr) + break + + time.sleep(float(args.poll_interval)) + + # 8) Final summary: job statuses + print_header("Final job statuses") + for jid in job_ids: + try: + j = client.call("job.load", { + "context_id": context_id, + "caller_id": actor_id, + "id": jid + }) + print(f"Job {jid}: status={j.get('status')} result={j.get('result')}") + except Exception as e: + print(f"Job {jid}: load failed: {e}", file=sys.stderr) + + # Exit code + if status == "Finished": + print_header("Result") + print("Flow finished successfully.") + sys.exit(0) + else: + print_header("Result") + print(f"Flow ended with status={status}") + sys.exit(1) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\nInterrupted.") + sys.exit(130) + except Exception as e: + print_header("Error") + print(str(e)) + sys.exit(1) \ No newline at end of file