#!/usr/bin/env python3 """ Demo script for HeroCoordinator JSON-RPC API. - Creates an actor - Verifies by loading the actor - Creates a context with the actor as admin/reader/executor - Creates three jobs with dependencies - Creates a flow referencing those jobs - Fetches and prints the flow DAG Usage: COORDINATOR_URL=http://127.0.0.1:9652 python3 scripts/jsonrpc_demo.py Defaults to http://127.0.0.1:9652 if COORDINATOR_URL is not set. """ import os import json import sys from urllib import request, error from typing import Any, Dict, List, Tuple JSONRPC_VERSION = "2.0" class JsonRpcClient: def __init__(self, url: str): self.url = url.rstrip("/") self._id = 0 def call(self, method: str, params: Dict[str, Any]) -> Any: self._id += 1 payload = { "jsonrpc": JSONRPC_VERSION, "id": self._id, "method": method, "params": params, } data = json.dumps(payload).encode("utf-8") req = request.Request(self.url, data=data, headers={"Content-Type": "application/json"}) try: with request.urlopen(req) as resp: body = resp.read() except error.HTTPError as e: try: details = e.read().decode("utf-8", "ignore") except Exception: details = "" raise RuntimeError(f"HTTP error {e.code}: {details}") from e except error.URLError as e: raise RuntimeError(f"URL error: {e.reason}") from e try: obj = json.loads(body.decode("utf-8")) except Exception as e: raise RuntimeError(f"Invalid JSON response: {body!r}") from e # JSON-RPC single response expected if isinstance(obj, list): raise RuntimeError("Batch responses are not supported in this demo") if obj.get("error"): raise RuntimeError(f"RPC error: {json.dumps(obj['error'])}") return obj.get("result") def print_header(title: str): print("\n" + "=" * 80) print(title) print("=" * 80) def pretty_print(obj: Any): print(json.dumps(obj, indent=2, sort_keys=True)) def summarize_dag(dag: Dict[str, Any]): print_header("Flow DAG Summary") flow_id = dag.get("flow_id") caller_id = dag.get("caller_id") context_id = dag.get("context_id") print(f"flow_id={flow_id} caller_id={caller_id} context_id={context_id}") edges: List[Tuple[int, int]] = dag.get("edges", []) roots: List[int] = dag.get("roots", []) leaves: List[int] = dag.get("leaves", []) levels: List[List[int]] = dag.get("levels", []) nodes: Dict[str, Any] = dag.get("nodes", {}) print("Edges:") for a, b in edges: print(f" {a} -> {b}") print(f"Roots: {roots}") print(f"Leaves: {leaves}") print("Levels:") for i, lvl in enumerate(levels): print(f" L{i}: {lvl}") # Show nodes and their dependencies (from JobSummary) print("Nodes:") for k, v in nodes.items(): depends = v.get("depends", []) prerequisites = v.get("prerequisites", []) stype = v.get("script_type") print(f" Job {k}: depends={depends} prerequisites={prerequisites} script_type={stype}") def assert_edges(edges: List[Tuple[int, int]], required: List[Tuple[int, int]]): edge_set = {(int(a), int(b)) for a, b in edges} missing = [e for e in required if e not in edge_set] if missing: raise AssertionError(f"Missing expected edges in DAG: {missing}; got={sorted(edge_set)}") def main(): url = os.getenv("COORDINATOR_URL", "http://127.0.0.1:9652") client = JsonRpcClient(url) # Deterministic demo IDs; change if collisions happen actor_id = 1001 context_id = 1 # Redis DB indices are 0-15; keep <= 15 job_a = 3001 job_b = 3002 job_c = 3003 job_d = 3004 job_e = 3005 job_f = 3006 job_g = 3007 job_h = 3008 job_i = 3009 flow_id = 4001 print_header("actor.create") actor = client.call("actor.create", { "actor": { "id": actor_id, "pubkey": "demo-pubkey", "address": ["127.0.0.1"] } }) pretty_print(actor) print_header("actor.load") actor_loaded = client.call("actor.load", {"id": actor_id}) pretty_print(actor_loaded) print_header("context.create") context = client.call("context.create", { "context": { "id": context_id, "admins": [actor_id], "readers": [actor_id], "executors": [actor_id] } }) pretty_print(context) print_header("job.create - A (root)") jobA = client.call("job.create", { "context_id": context_id, "job": { "id": job_a, "caller_id": actor_id, "context_id": context_id, "script": "print('A')", "script_type": "Python", "timeout": 30, "retries": 0, "env_vars": {}, "prerequisites": [], "depends": [] } }) pretty_print(jobA) print_header("job.create - B (root)") jobB = client.call("job.create", { "context_id": context_id, "job": { "id": job_b, "caller_id": actor_id, "context_id": context_id, "script": "print('B')", "script_type": "Python", "timeout": 30, "retries": 0, "env_vars": {}, "prerequisites": [], "depends": [] } }) pretty_print(jobB) print_header("job.create - C (depends on A and B)") jobC = client.call("job.create", { "context_id": context_id, "job": { "id": job_c, "caller_id": actor_id, "context_id": context_id, "script": "print('C')", "script_type": "Python", "timeout": 30, "retries": 0, "env_vars": {}, "prerequisites": [], "depends": [job_a, job_b] } }) pretty_print(jobC) print_header("job.create - D (depends on A)") jobD = client.call("job.create", { "context_id": context_id, "job": { "id": job_d, "caller_id": actor_id, "context_id": context_id, "script": "print('D')", "script_type": "Python", "timeout": 30, "retries": 0, "env_vars": {}, "prerequisites": [], "depends": [job_a] } }) pretty_print(jobD) print_header("job.create - E (depends on B)") jobE = client.call("job.create", { "context_id": context_id, "job": { "id": job_e, "caller_id": actor_id, "context_id": context_id, "script": "print('E')", "script_type": "Python", "timeout": 30, "retries": 0, "env_vars": {}, "prerequisites": [], "depends": [job_b] } }) pretty_print(jobE) print_header("job.create - F (depends on C and D)") jobF = client.call("job.create", { "context_id": context_id, "job": { "id": job_f, "caller_id": actor_id, "context_id": context_id, "script": "print('F')", "script_type": "Python", "timeout": 30, "retries": 0, "env_vars": {}, "prerequisites": [], "depends": [job_c, job_d] } }) pretty_print(jobF) print_header("job.create - G (depends on C and E)") jobG = client.call("job.create", { "context_id": context_id, "job": { "id": job_g, "caller_id": actor_id, "context_id": context_id, "script": "print('G')", "script_type": "Python", "timeout": 30, "retries": 0, "env_vars": {}, "prerequisites": [], "depends": [job_c, job_e] } }) pretty_print(jobG) print_header("job.create - H (leaf; depends on F and G)") jobH = client.call("job.create", { "context_id": context_id, "job": { "id": job_h, "caller_id": actor_id, "context_id": context_id, "script": "print('H')", "script_type": "Python", "timeout": 30, "retries": 0, "env_vars": {}, "prerequisites": [], "depends": [job_f, job_g] } }) pretty_print(jobH) print_header("job.create - I (leaf; depends on F and G)") jobI = client.call("job.create", { "context_id": context_id, "job": { "id": job_i, "caller_id": actor_id, "context_id": context_id, "script": "print('I')", "script_type": "Python", "timeout": 30, "retries": 0, "env_vars": {}, "prerequisites": [], "depends": [job_f, job_g] } }) pretty_print(jobI) print_header("flow.create") flow = client.call("flow.create", { "context_id": context_id, "flow": { "id": flow_id, "caller_id": actor_id, "context_id": context_id, "jobs": [job_a, job_b, job_c, job_d, job_e, job_f, job_g, job_h, job_i], "env_vars": {} } }) pretty_print(flow) print_header("flow.dag") dag = client.call("flow.dag", {"context_id": context_id, "id": flow_id}) summarize_dag(dag) # Validate roots and leaves got_roots = list(map(int, dag.get("roots", []))) if got_roots != sorted([job_a, job_b]): print("WARNING: Unexpected roots:", got_roots, file=sys.stderr) got_leaves = {int(x) for x in dag.get("leaves", [])} expected_leaves = {job_h, job_i} if got_leaves != expected_leaves: print("WARNING: Unexpected leaves:", got_leaves, "expected:", expected_leaves, file=sys.stderr) # Check edges reflect the expanded DAG expected_edges = [ (job_a, job_c), (job_b, job_c), (job_a, job_d), (job_b, job_e), (job_c, job_f), (job_d, job_f), (job_c, job_g), (job_e, job_g), (job_f, job_h), (job_g, job_h), (job_f, job_i), (job_g, job_i), ] try: assert_edges(dag.get("edges", []), expected_edges) print("DAG edges contain expected dependencies:", expected_edges) except AssertionError as e: print("WARNING:", e, file=sys.stderr) if __name__ == "__main__": try: main() except Exception as e: print_header("Error") print(str(e)) sys.exit(1)