baobab/tools/rpc_smoke_test.py
Maxime Van Hees 0ebda7c1aa Updates
2025-08-14 14:14:34 +02:00

204 lines
6.6 KiB
Python

#!/usr/bin/env python3
"""
Non-destructive JSON-RPC smoke tests against the OpenRPC server.
Installs:
python3 -m pip install -r tools/requirements.txt
Usage:
# Default URL http://127.0.0.1:9944
python tools/rpc_smoke_test.py
# Specify a different URL
python tools/rpc_smoke_test.py --url http://127.0.0.1:9944
# Provide a specific pubkey for fetch_nonce (compressed 33-byte hex)
python tools/rpc_smoke_test.py --pubkey 02deadbeef...
# Lookup details for first N jobs returned by list_jobs
python tools/rpc_smoke_test.py --limit 5
What it tests (non-destructive):
- fetch_nonce(pubkey) -> returns a nonce string from the server auth manager
- whoami() -> returns a JSON string with basic server info
- list_jobs() -> returns job IDs only (no mutation)
- get_job_status(id) -> reads status (for up to --limit items)
- get_job_output(id) -> reads output (for up to --limit items)
- get_job_logs(id) -> reads logs (for up to --limit items)
Notes:
- If you don't pass --pubkey, this script will generate a random secp256k1 keypair
and derive a compressed public key (no persistence, just for testing fetch_nonce).
"""
import argparse
import json
import os
import random
import sys
import time
from typing import Any, Dict, List, Optional
try:
import requests
except Exception:
print("Missing dependency 'requests'. Install with:\n python3 -m pip install -r tools/requirements.txt", file=sys.stderr)
raise
try:
from ecdsa import SigningKey, SECP256k1
except Exception:
# ecdsa is optional here; only used to generate a test pubkey if --pubkey is absent
SigningKey = None # type: ignore
def ensure_http_url(url: str) -> str:
if url.startswith("http://") or url.startswith("https://"):
return url
# Accept ws:// scheme too; convert to http for JSON-RPC over HTTP
if url.startswith("ws://"):
return "http://" + url[len("ws://") :]
if url.startswith("wss://"):
return "https://" + url[len("wss://") :]
# Default to http if no scheme
return "http://" + url
class JsonRpcClient:
def __init__(self, url: str):
self.url = ensure_http_url(url)
self._id = int(time.time() * 1000)
def call(self, method: str, params: Any) -> Any:
self._id += 1
payload = {
"jsonrpc": "2.0",
"id": self._id,
"method": method,
"params": params,
}
resp = requests.post(self.url, json=payload, timeout=30)
resp.raise_for_status()
data = resp.json()
if "error" in data and data["error"] is not None:
raise RuntimeError(f"RPC error for {method}: {data['error']}")
return data.get("result")
def random_compressed_pubkey_hex() -> str:
"""
Generate a random secp256k1 keypair and return compressed public key hex.
Requires 'ecdsa'. If unavailable, raise an informative error.
"""
if SigningKey is None:
raise RuntimeError(
"ecdsa not installed; either install with:\n"
" python3 -m pip install -r tools/requirements.txt\n"
"or pass --pubkey explicitly."
)
sk = SigningKey.generate(curve=SECP256k1)
vk = sk.get_verifying_key()
try:
comp = vk.to_string("compressed")
except TypeError:
# Manual compression
p = vk.pubkey.point
x = p.x()
y = p.y()
prefix = b"\x02" if (y % 2 == 0) else b"\x03"
comp = prefix + x.to_bytes(32, "big")
return comp.hex()
def main() -> int:
parser = argparse.ArgumentParser(description="Non-destructive RPC smoke tests")
parser.add_argument("--url", default=os.environ.get("RPC_URL", "http://127.0.0.1:9944"),
help="RPC server URL (http[s]://host:port or ws[s]://host:port)")
parser.add_argument("--pubkey", help="Compressed secp256k1 public key hex (33 bytes, 66 hex chars)")
parser.add_argument("--limit", type=int, default=3, help="Number of job IDs to detail from list_jobs()")
args = parser.parse_args()
client = JsonRpcClient(args.url)
print(f"[rpc] URL: {client.url}")
# 1) fetch_nonce
pubkey = args.pubkey or random_compressed_pubkey_hex()
print(f"[rpc] fetch_nonce(pubkey={pubkey[:10]}...):", end=" ")
try:
nonce = client.call("fetch_nonce", [pubkey])
print("OK")
print(f" nonce: {nonce}")
except Exception as e:
print(f"ERROR: {e}")
return 1
# 2) whoami
print("[rpc] whoami():", end=" ")
try:
who = client.call("whoami", [])
print("OK")
print(f" whoami: {who}")
except Exception as e:
print(f"ERROR: {e}")
return 1
# 3) list_jobs
print("[rpc] list_jobs():", end=" ")
try:
job_ids: List[str] = client.call("list_jobs", [])
print("OK")
print(f" total: {len(job_ids)}")
for i, jid in enumerate(job_ids[: max(0, args.limit)]):
print(f" [{i}] {jid}")
except Exception as e:
print(f"ERROR: {e}")
return 1
# 4) For a few jobs, query status/output/logs
detail_count = 0
for jid in job_ids[: max(0, args.limit)] if 'job_ids' in locals() else []:
print(f"[rpc] get_job_status({jid}):", end=" ")
try:
st = client.call("get_job_status", [jid])
print("OK")
print(f" status: {st}")
except Exception as e:
print(f"ERROR: {e}")
print(f"[rpc] get_job_output({jid}):", end=" ")
try:
out = client.call("get_job_output", [jid])
print("OK")
snippet = (out if isinstance(out, str) else json.dumps(out))[:120]
print(f" output: {snippet}{'...' if len(snippet)==120 else ''}")
except Exception as e:
print(f"ERROR: {e}")
print(f"[rpc] get_job_logs({jid}):", end=" ")
try:
logs_obj = client.call("get_job_logs", [jid]) # { logs: String | null }
print("OK")
logs = logs_obj.get("logs") if isinstance(logs_obj, dict) else None
if logs is None:
print(" logs: (no logs)")
else:
snippet = logs[:120]
print(f" logs: {snippet}{'...' if len(snippet)==120 else ''}")
except Exception as e:
print(f"ERROR: {e}")
detail_count += 1
print("\nSmoke tests complete.")
print("Summary:")
print(f" whoami tested")
print(f" fetch_nonce tested (pubkey provided/generated)")
print(f" list_jobs tested (count printed)")
print(f" detailed queries for up to {detail_count} job(s) (status/output/logs)")
return 0
if __name__ == "__main__":
sys.exit(main())