From 38709e06f3fad00500b2862f6907da75e4721f0f Mon Sep 17 00:00:00 2001 From: Lee Smet Date: Fri, 29 Aug 2025 15:43:32 +0200 Subject: [PATCH] Add script to test actor/context/job/flow create and flow dag Signed-off-by: Lee Smet --- scripts/jsonrpc_demo.py | 361 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 361 insertions(+) create mode 100644 scripts/jsonrpc_demo.py diff --git a/scripts/jsonrpc_demo.py b/scripts/jsonrpc_demo.py new file mode 100644 index 0000000..bcf753f --- /dev/null +++ b/scripts/jsonrpc_demo.py @@ -0,0 +1,361 @@ +#!/usr/bin/env python3 +""" +Demo script for HeroCoordinator JSON-RPC API. +- Creates an actor +- Verifies by loading the actor +- Creates a context with the actor as admin/reader/executor +- Creates three jobs with dependencies +- Creates a flow referencing those jobs +- Fetches and prints the flow DAG + +Usage: + COORDINATOR_URL=http://127.0.0.1:9652 python3 scripts/jsonrpc_demo.py +Defaults to http://127.0.0.1:9652 if COORDINATOR_URL is not set. +""" +import os +import json +import sys +from urllib import request, error +from typing import Any, Dict, List, Tuple + +JSONRPC_VERSION = "2.0" + +class JsonRpcClient: + def __init__(self, url: str): + self.url = url.rstrip("/") + self._id = 0 + + def call(self, method: str, params: Dict[str, Any]) -> Any: + self._id += 1 + payload = { + "jsonrpc": JSONRPC_VERSION, + "id": self._id, + "method": method, + "params": params, + } + data = json.dumps(payload).encode("utf-8") + req = request.Request(self.url, data=data, headers={"Content-Type": "application/json"}) + try: + with request.urlopen(req) as resp: + body = resp.read() + except error.HTTPError as e: + try: + details = e.read().decode("utf-8", "ignore") + except Exception: + details = "" + raise RuntimeError(f"HTTP error {e.code}: {details}") from e + except error.URLError as e: + raise RuntimeError(f"URL error: {e.reason}") from e + + try: + obj = json.loads(body.decode("utf-8")) + except Exception as e: + raise RuntimeError(f"Invalid JSON response: {body!r}") from e + + # JSON-RPC single response expected + if isinstance(obj, list): + raise RuntimeError("Batch responses are not supported in this demo") + + if obj.get("error"): + raise RuntimeError(f"RPC error: {json.dumps(obj['error'])}") + + return obj.get("result") + +def print_header(title: str): + print("\n" + "=" * 80) + print(title) + print("=" * 80) + +def pretty_print(obj: Any): + print(json.dumps(obj, indent=2, sort_keys=True)) + +def summarize_dag(dag: Dict[str, Any]): + print_header("Flow DAG Summary") + flow_id = dag.get("flow_id") + caller_id = dag.get("caller_id") + context_id = dag.get("context_id") + print(f"flow_id={flow_id} caller_id={caller_id} context_id={context_id}") + edges: List[Tuple[int, int]] = dag.get("edges", []) + roots: List[int] = dag.get("roots", []) + leaves: List[int] = dag.get("leaves", []) + levels: List[List[int]] = dag.get("levels", []) + nodes: Dict[str, Any] = dag.get("nodes", {}) + + print("Edges:") + for a, b in edges: + print(f" {a} -> {b}") + + print(f"Roots: {roots}") + print(f"Leaves: {leaves}") + print("Levels:") + for i, lvl in enumerate(levels): + print(f" L{i}: {lvl}") + + # Show nodes and their dependencies (from JobSummary) + print("Nodes:") + for k, v in nodes.items(): + depends = v.get("depends", []) + prerequisites = v.get("prerequisites", []) + stype = v.get("script_type") + print(f" Job {k}: depends={depends} prerequisites={prerequisites} script_type={stype}") + +def assert_edges(edges: List[Tuple[int, int]], required: List[Tuple[int, int]]): + edge_set = {(int(a), int(b)) for a, b in edges} + missing = [e for e in required if e not in edge_set] + if missing: + raise AssertionError(f"Missing expected edges in DAG: {missing}; got={sorted(edge_set)}") + +def main(): + url = os.getenv("COORDINATOR_URL", "http://127.0.0.1:9652") + client = JsonRpcClient(url) + + # Deterministic demo IDs; change if collisions happen + actor_id = 1001 + context_id = 1 # Redis DB indices are 0-15; keep <= 15 + job_a = 3001 + job_b = 3002 + job_c = 3003 + job_d = 3004 + job_e = 3005 + job_f = 3006 + job_g = 3007 + job_h = 3008 + job_i = 3009 + flow_id = 4001 + + print_header("actor.create") + actor = client.call("actor.create", { + "actor": { + "id": actor_id, + "pubkey": "demo-pubkey", + "address": ["127.0.0.1"] + } + }) + pretty_print(actor) + + print_header("actor.load") + actor_loaded = client.call("actor.load", {"id": actor_id}) + pretty_print(actor_loaded) + + print_header("context.create") + context = client.call("context.create", { + "context": { + "id": context_id, + "admins": [actor_id], + "readers": [actor_id], + "executors": [actor_id] + } + }) + pretty_print(context) + + print_header("job.create - A (root)") + jobA = client.call("job.create", { + "context_id": context_id, + "job": { + "id": job_a, + "caller_id": actor_id, + "context_id": context_id, + "script": "print('A')", + "script_type": "Python", + "timeout": 30, + "retries": 0, + "env_vars": {}, + "prerequisites": [], + "depends": [] + } + }) + pretty_print(jobA) + + print_header("job.create - B (root)") + jobB = client.call("job.create", { + "context_id": context_id, + "job": { + "id": job_b, + "caller_id": actor_id, + "context_id": context_id, + "script": "print('B')", + "script_type": "Python", + "timeout": 30, + "retries": 0, + "env_vars": {}, + "prerequisites": [], + "depends": [] + } + }) + pretty_print(jobB) + + print_header("job.create - C (depends on A and B)") + jobC = client.call("job.create", { + "context_id": context_id, + "job": { + "id": job_c, + "caller_id": actor_id, + "context_id": context_id, + "script": "print('C')", + "script_type": "Python", + "timeout": 30, + "retries": 0, + "env_vars": {}, + "prerequisites": [], + "depends": [job_a, job_b] + } + }) + pretty_print(jobC) + + print_header("job.create - D (depends on A)") + jobD = client.call("job.create", { + "context_id": context_id, + "job": { + "id": job_d, + "caller_id": actor_id, + "context_id": context_id, + "script": "print('D')", + "script_type": "Python", + "timeout": 30, + "retries": 0, + "env_vars": {}, + "prerequisites": [], + "depends": [job_a] + } + }) + pretty_print(jobD) + + print_header("job.create - E (depends on B)") + jobE = client.call("job.create", { + "context_id": context_id, + "job": { + "id": job_e, + "caller_id": actor_id, + "context_id": context_id, + "script": "print('E')", + "script_type": "Python", + "timeout": 30, + "retries": 0, + "env_vars": {}, + "prerequisites": [], + "depends": [job_b] + } + }) + pretty_print(jobE) + + print_header("job.create - F (depends on C and D)") + jobF = client.call("job.create", { + "context_id": context_id, + "job": { + "id": job_f, + "caller_id": actor_id, + "context_id": context_id, + "script": "print('F')", + "script_type": "Python", + "timeout": 30, + "retries": 0, + "env_vars": {}, + "prerequisites": [], + "depends": [job_c, job_d] + } + }) + pretty_print(jobF) + + print_header("job.create - G (depends on C and E)") + jobG = client.call("job.create", { + "context_id": context_id, + "job": { + "id": job_g, + "caller_id": actor_id, + "context_id": context_id, + "script": "print('G')", + "script_type": "Python", + "timeout": 30, + "retries": 0, + "env_vars": {}, + "prerequisites": [], + "depends": [job_c, job_e] + } + }) + pretty_print(jobG) + + print_header("job.create - H (leaf; depends on F and G)") + jobH = client.call("job.create", { + "context_id": context_id, + "job": { + "id": job_h, + "caller_id": actor_id, + "context_id": context_id, + "script": "print('H')", + "script_type": "Python", + "timeout": 30, + "retries": 0, + "env_vars": {}, + "prerequisites": [], + "depends": [job_f, job_g] + } + }) + pretty_print(jobH) + + print_header("job.create - I (leaf; depends on F and G)") + jobI = client.call("job.create", { + "context_id": context_id, + "job": { + "id": job_i, + "caller_id": actor_id, + "context_id": context_id, + "script": "print('I')", + "script_type": "Python", + "timeout": 30, + "retries": 0, + "env_vars": {}, + "prerequisites": [], + "depends": [job_f, job_g] + } + }) + pretty_print(jobI) + + print_header("flow.create") + flow = client.call("flow.create", { + "context_id": context_id, + "flow": { + "id": flow_id, + "caller_id": actor_id, + "context_id": context_id, + "jobs": [job_a, job_b, job_c, job_d, job_e, job_f, job_g, job_h, job_i], + "env_vars": {} + } + }) + pretty_print(flow) + + print_header("flow.dag") + dag = client.call("flow.dag", {"context_id": context_id, "id": flow_id}) + summarize_dag(dag) + + # Validate roots and leaves + got_roots = list(map(int, dag.get("roots", []))) + if got_roots != sorted([job_a, job_b]): + print("WARNING: Unexpected roots:", got_roots, file=sys.stderr) + + got_leaves = {int(x) for x in dag.get("leaves", [])} + expected_leaves = {job_h, job_i} + if got_leaves != expected_leaves: + print("WARNING: Unexpected leaves:", got_leaves, "expected:", expected_leaves, file=sys.stderr) + + # Check edges reflect the expanded DAG + expected_edges = [ + (job_a, job_c), (job_b, job_c), + (job_a, job_d), (job_b, job_e), + (job_c, job_f), (job_d, job_f), + (job_c, job_g), (job_e, job_g), + (job_f, job_h), (job_g, job_h), + (job_f, job_i), (job_g, job_i), + ] + try: + assert_edges(dag.get("edges", []), expected_edges) + print("DAG edges contain expected dependencies:", expected_edges) + except AssertionError as e: + print("WARNING:", e, file=sys.stderr) + +if __name__ == "__main__": + try: + main() + except Exception as e: + print_header("Error") + print(str(e)) + sys.exit(1) \ No newline at end of file