#!/usr/bin/env python3 """ Non-destructive JSON-RPC smoke tests against the OpenRPC server. Installs: python3 -m pip install -r tools/requirements.txt Usage: # Default URL http://127.0.0.1:9944 python tools/rpc_smoke_test.py # Specify a different URL python tools/rpc_smoke_test.py --url http://127.0.0.1:9944 # Provide a specific pubkey for fetch_nonce (compressed 33-byte hex) python tools/rpc_smoke_test.py --pubkey 02deadbeef... # Lookup details for first N jobs returned by list_jobs python tools/rpc_smoke_test.py --limit 5 What it tests (non-destructive): - fetch_nonce(pubkey) -> returns a nonce string from the server auth manager - whoami() -> returns a JSON string with basic server info - list_jobs() -> returns job IDs only (no mutation) - get_job_status(id) -> reads status (for up to --limit items) - get_job_output(id) -> reads output (for up to --limit items) - get_job_logs(id) -> reads logs (for up to --limit items) Notes: - If you don't pass --pubkey, this script will generate a random secp256k1 keypair and derive a compressed public key (no persistence, just for testing fetch_nonce). """ import argparse import json import os import random import sys import time from typing import Any, Dict, List, Optional try: import requests except Exception: print("Missing dependency 'requests'. Install with:\n python3 -m pip install -r tools/requirements.txt", file=sys.stderr) raise try: from ecdsa import SigningKey, SECP256k1 except Exception: # ecdsa is optional here; only used to generate a test pubkey if --pubkey is absent SigningKey = None # type: ignore def ensure_http_url(url: str) -> str: if url.startswith("http://") or url.startswith("https://"): return url # Accept ws:// scheme too; convert to http for JSON-RPC over HTTP if url.startswith("ws://"): return "http://" + url[len("ws://") :] if url.startswith("wss://"): return "https://" + url[len("wss://") :] # Default to http if no scheme return "http://" + url class JsonRpcClient: def __init__(self, url: str): self.url = ensure_http_url(url) self._id = int(time.time() * 1000) def call(self, method: str, params: Any) -> Any: self._id += 1 payload = { "jsonrpc": "2.0", "id": self._id, "method": method, "params": params, } resp = requests.post(self.url, json=payload, timeout=30) resp.raise_for_status() data = resp.json() if "error" in data and data["error"] is not None: raise RuntimeError(f"RPC error for {method}: {data['error']}") return data.get("result") def random_compressed_pubkey_hex() -> str: """ Generate a random secp256k1 keypair and return compressed public key hex. Requires 'ecdsa'. If unavailable, raise an informative error. """ if SigningKey is None: raise RuntimeError( "ecdsa not installed; either install with:\n" " python3 -m pip install -r tools/requirements.txt\n" "or pass --pubkey explicitly." ) sk = SigningKey.generate(curve=SECP256k1) vk = sk.get_verifying_key() try: comp = vk.to_string("compressed") except TypeError: # Manual compression p = vk.pubkey.point x = p.x() y = p.y() prefix = b"\x02" if (y % 2 == 0) else b"\x03" comp = prefix + x.to_bytes(32, "big") return comp.hex() def main() -> int: parser = argparse.ArgumentParser(description="Non-destructive RPC smoke tests") parser.add_argument("--url", default=os.environ.get("RPC_URL", "http://127.0.0.1:9944"), help="RPC server URL (http[s]://host:port or ws[s]://host:port)") parser.add_argument("--pubkey", help="Compressed secp256k1 public key hex (33 bytes, 66 hex chars)") parser.add_argument("--limit", type=int, default=3, help="Number of job IDs to detail from list_jobs()") args = parser.parse_args() client = JsonRpcClient(args.url) print(f"[rpc] URL: {client.url}") # 1) fetch_nonce pubkey = args.pubkey or random_compressed_pubkey_hex() print(f"[rpc] fetch_nonce(pubkey={pubkey[:10]}...):", end=" ") try: nonce = client.call("fetch_nonce", [pubkey]) print("OK") print(f" nonce: {nonce}") except Exception as e: print(f"ERROR: {e}") return 1 # 2) whoami print("[rpc] whoami():", end=" ") try: who = client.call("whoami", []) print("OK") print(f" whoami: {who}") except Exception as e: print(f"ERROR: {e}") return 1 # 3) list_jobs print("[rpc] list_jobs():", end=" ") try: job_ids: List[str] = client.call("list_jobs", []) print("OK") print(f" total: {len(job_ids)}") for i, jid in enumerate(job_ids[: max(0, args.limit)]): print(f" [{i}] {jid}") except Exception as e: print(f"ERROR: {e}") return 1 # 4) For a few jobs, query status/output/logs detail_count = 0 for jid in job_ids[: max(0, args.limit)] if 'job_ids' in locals() else []: print(f"[rpc] get_job_status({jid}):", end=" ") try: st = client.call("get_job_status", [jid]) print("OK") print(f" status: {st}") except Exception as e: print(f"ERROR: {e}") print(f"[rpc] get_job_output({jid}):", end=" ") try: out = client.call("get_job_output", [jid]) print("OK") snippet = (out if isinstance(out, str) else json.dumps(out))[:120] print(f" output: {snippet}{'...' if len(snippet)==120 else ''}") except Exception as e: print(f"ERROR: {e}") print(f"[rpc] get_job_logs({jid}):", end=" ") try: logs_obj = client.call("get_job_logs", [jid]) # { logs: String | null } print("OK") logs = logs_obj.get("logs") if isinstance(logs_obj, dict) else None if logs is None: print(" logs: (no logs)") else: snippet = logs[:120] print(f" logs: {snippet}{'...' if len(snippet)==120 else ''}") except Exception as e: print(f"ERROR: {e}") detail_count += 1 print("\nSmoke tests complete.") print("Summary:") print(f" whoami tested") print(f" fetch_nonce tested (pubkey provided/generated)") print(f" list_jobs tested (count printed)") print(f" detailed queries for up to {detail_count} job(s) (status/output/logs)") return 0 if __name__ == "__main__": sys.exit(main())