Compare commits
6 Commits
4d1cd3d910
...
main
Author | SHA1 | Date | |
---|---|---|---|
|
dbb9493bcb
|
||
|
d921dca75c
|
||
|
4a15269442
|
||
|
43fd61d662
|
||
|
38709e06f3
|
||
|
08de312cd9
|
361
scripts/jsonrpc_demo.py
Normal file
361
scripts/jsonrpc_demo.py
Normal file
@@ -0,0 +1,361 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Demo script for HeroCoordinator JSON-RPC API.
|
||||
- Creates an actor
|
||||
- Verifies by loading the actor
|
||||
- Creates a context with the actor as admin/reader/executor
|
||||
- Creates three jobs with dependencies
|
||||
- Creates a flow referencing those jobs
|
||||
- Fetches and prints the flow DAG
|
||||
|
||||
Usage:
|
||||
COORDINATOR_URL=http://127.0.0.1:9652 python3 scripts/jsonrpc_demo.py
|
||||
Defaults to http://127.0.0.1:9652 if COORDINATOR_URL is not set.
|
||||
"""
|
||||
import os
|
||||
import json
|
||||
import sys
|
||||
from urllib import request, error
|
||||
from typing import Any, Dict, List, Tuple
|
||||
|
||||
JSONRPC_VERSION = "2.0"
|
||||
|
||||
class JsonRpcClient:
|
||||
def __init__(self, url: str):
|
||||
self.url = url.rstrip("/")
|
||||
self._id = 0
|
||||
|
||||
def call(self, method: str, params: Dict[str, Any]) -> Any:
|
||||
self._id += 1
|
||||
payload = {
|
||||
"jsonrpc": JSONRPC_VERSION,
|
||||
"id": self._id,
|
||||
"method": method,
|
||||
"params": params,
|
||||
}
|
||||
data = json.dumps(payload).encode("utf-8")
|
||||
req = request.Request(self.url, data=data, headers={"Content-Type": "application/json"})
|
||||
try:
|
||||
with request.urlopen(req) as resp:
|
||||
body = resp.read()
|
||||
except error.HTTPError as e:
|
||||
try:
|
||||
details = e.read().decode("utf-8", "ignore")
|
||||
except Exception:
|
||||
details = ""
|
||||
raise RuntimeError(f"HTTP error {e.code}: {details}") from e
|
||||
except error.URLError as e:
|
||||
raise RuntimeError(f"URL error: {e.reason}") from e
|
||||
|
||||
try:
|
||||
obj = json.loads(body.decode("utf-8"))
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Invalid JSON response: {body!r}") from e
|
||||
|
||||
# JSON-RPC single response expected
|
||||
if isinstance(obj, list):
|
||||
raise RuntimeError("Batch responses are not supported in this demo")
|
||||
|
||||
if obj.get("error"):
|
||||
raise RuntimeError(f"RPC error: {json.dumps(obj['error'])}")
|
||||
|
||||
return obj.get("result")
|
||||
|
||||
def print_header(title: str):
|
||||
print("\n" + "=" * 80)
|
||||
print(title)
|
||||
print("=" * 80)
|
||||
|
||||
def pretty_print(obj: Any):
|
||||
print(json.dumps(obj, indent=2, sort_keys=True))
|
||||
|
||||
def summarize_dag(dag: Dict[str, Any]):
|
||||
print_header("Flow DAG Summary")
|
||||
flow_id = dag.get("flow_id")
|
||||
caller_id = dag.get("caller_id")
|
||||
context_id = dag.get("context_id")
|
||||
print(f"flow_id={flow_id} caller_id={caller_id} context_id={context_id}")
|
||||
edges: List[Tuple[int, int]] = dag.get("edges", [])
|
||||
roots: List[int] = dag.get("roots", [])
|
||||
leaves: List[int] = dag.get("leaves", [])
|
||||
levels: List[List[int]] = dag.get("levels", [])
|
||||
nodes: Dict[str, Any] = dag.get("nodes", {})
|
||||
|
||||
print("Edges:")
|
||||
for a, b in edges:
|
||||
print(f" {a} -> {b}")
|
||||
|
||||
print(f"Roots: {roots}")
|
||||
print(f"Leaves: {leaves}")
|
||||
print("Levels:")
|
||||
for i, lvl in enumerate(levels):
|
||||
print(f" L{i}: {lvl}")
|
||||
|
||||
# Show nodes and their dependencies (from JobSummary)
|
||||
print("Nodes:")
|
||||
for k, v in nodes.items():
|
||||
depends = v.get("depends", [])
|
||||
prerequisites = v.get("prerequisites", [])
|
||||
stype = v.get("script_type")
|
||||
print(f" Job {k}: depends={depends} prerequisites={prerequisites} script_type={stype}")
|
||||
|
||||
def assert_edges(edges: List[Tuple[int, int]], required: List[Tuple[int, int]]):
|
||||
edge_set = {(int(a), int(b)) for a, b in edges}
|
||||
missing = [e for e in required if e not in edge_set]
|
||||
if missing:
|
||||
raise AssertionError(f"Missing expected edges in DAG: {missing}; got={sorted(edge_set)}")
|
||||
|
||||
def main():
|
||||
url = os.getenv("COORDINATOR_URL", "http://127.0.0.1:9652")
|
||||
client = JsonRpcClient(url)
|
||||
|
||||
# Deterministic demo IDs; change if collisions happen
|
||||
actor_id = 1001
|
||||
context_id = 1 # Redis DB indices are 0-15; keep <= 15
|
||||
job_a = 3001
|
||||
job_b = 3002
|
||||
job_c = 3003
|
||||
job_d = 3004
|
||||
job_e = 3005
|
||||
job_f = 3006
|
||||
job_g = 3007
|
||||
job_h = 3008
|
||||
job_i = 3009
|
||||
flow_id = 4001
|
||||
|
||||
print_header("actor.create")
|
||||
actor = client.call("actor.create", {
|
||||
"actor": {
|
||||
"id": actor_id,
|
||||
"pubkey": "demo-pubkey",
|
||||
"address": ["127.0.0.1"]
|
||||
}
|
||||
})
|
||||
pretty_print(actor)
|
||||
|
||||
print_header("actor.load")
|
||||
actor_loaded = client.call("actor.load", {"id": actor_id})
|
||||
pretty_print(actor_loaded)
|
||||
|
||||
print_header("context.create")
|
||||
context = client.call("context.create", {
|
||||
"context": {
|
||||
"id": context_id,
|
||||
"admins": [actor_id],
|
||||
"readers": [actor_id],
|
||||
"executors": [actor_id]
|
||||
}
|
||||
})
|
||||
pretty_print(context)
|
||||
|
||||
print_header("job.create - A (root)")
|
||||
jobA = client.call("job.create", {
|
||||
"context_id": context_id,
|
||||
"job": {
|
||||
"id": job_a,
|
||||
"caller_id": actor_id,
|
||||
"context_id": context_id,
|
||||
"script": "print('A')",
|
||||
"script_type": "Python",
|
||||
"timeout": 30,
|
||||
"retries": 0,
|
||||
"env_vars": {},
|
||||
"prerequisites": [],
|
||||
"depends": []
|
||||
}
|
||||
})
|
||||
pretty_print(jobA)
|
||||
|
||||
print_header("job.create - B (root)")
|
||||
jobB = client.call("job.create", {
|
||||
"context_id": context_id,
|
||||
"job": {
|
||||
"id": job_b,
|
||||
"caller_id": actor_id,
|
||||
"context_id": context_id,
|
||||
"script": "print('B')",
|
||||
"script_type": "Python",
|
||||
"timeout": 30,
|
||||
"retries": 0,
|
||||
"env_vars": {},
|
||||
"prerequisites": [],
|
||||
"depends": []
|
||||
}
|
||||
})
|
||||
pretty_print(jobB)
|
||||
|
||||
print_header("job.create - C (depends on A and B)")
|
||||
jobC = client.call("job.create", {
|
||||
"context_id": context_id,
|
||||
"job": {
|
||||
"id": job_c,
|
||||
"caller_id": actor_id,
|
||||
"context_id": context_id,
|
||||
"script": "print('C')",
|
||||
"script_type": "Python",
|
||||
"timeout": 30,
|
||||
"retries": 0,
|
||||
"env_vars": {},
|
||||
"prerequisites": [],
|
||||
"depends": [job_a, job_b]
|
||||
}
|
||||
})
|
||||
pretty_print(jobC)
|
||||
|
||||
print_header("job.create - D (depends on A)")
|
||||
jobD = client.call("job.create", {
|
||||
"context_id": context_id,
|
||||
"job": {
|
||||
"id": job_d,
|
||||
"caller_id": actor_id,
|
||||
"context_id": context_id,
|
||||
"script": "print('D')",
|
||||
"script_type": "Python",
|
||||
"timeout": 30,
|
||||
"retries": 0,
|
||||
"env_vars": {},
|
||||
"prerequisites": [],
|
||||
"depends": [job_a]
|
||||
}
|
||||
})
|
||||
pretty_print(jobD)
|
||||
|
||||
print_header("job.create - E (depends on B)")
|
||||
jobE = client.call("job.create", {
|
||||
"context_id": context_id,
|
||||
"job": {
|
||||
"id": job_e,
|
||||
"caller_id": actor_id,
|
||||
"context_id": context_id,
|
||||
"script": "print('E')",
|
||||
"script_type": "Python",
|
||||
"timeout": 30,
|
||||
"retries": 0,
|
||||
"env_vars": {},
|
||||
"prerequisites": [],
|
||||
"depends": [job_b]
|
||||
}
|
||||
})
|
||||
pretty_print(jobE)
|
||||
|
||||
print_header("job.create - F (depends on C and D)")
|
||||
jobF = client.call("job.create", {
|
||||
"context_id": context_id,
|
||||
"job": {
|
||||
"id": job_f,
|
||||
"caller_id": actor_id,
|
||||
"context_id": context_id,
|
||||
"script": "print('F')",
|
||||
"script_type": "Python",
|
||||
"timeout": 30,
|
||||
"retries": 0,
|
||||
"env_vars": {},
|
||||
"prerequisites": [],
|
||||
"depends": [job_c, job_d]
|
||||
}
|
||||
})
|
||||
pretty_print(jobF)
|
||||
|
||||
print_header("job.create - G (depends on C and E)")
|
||||
jobG = client.call("job.create", {
|
||||
"context_id": context_id,
|
||||
"job": {
|
||||
"id": job_g,
|
||||
"caller_id": actor_id,
|
||||
"context_id": context_id,
|
||||
"script": "print('G')",
|
||||
"script_type": "Python",
|
||||
"timeout": 30,
|
||||
"retries": 0,
|
||||
"env_vars": {},
|
||||
"prerequisites": [],
|
||||
"depends": [job_c, job_e]
|
||||
}
|
||||
})
|
||||
pretty_print(jobG)
|
||||
|
||||
print_header("job.create - H (leaf; depends on F and G)")
|
||||
jobH = client.call("job.create", {
|
||||
"context_id": context_id,
|
||||
"job": {
|
||||
"id": job_h,
|
||||
"caller_id": actor_id,
|
||||
"context_id": context_id,
|
||||
"script": "print('H')",
|
||||
"script_type": "Python",
|
||||
"timeout": 30,
|
||||
"retries": 0,
|
||||
"env_vars": {},
|
||||
"prerequisites": [],
|
||||
"depends": [job_f, job_g]
|
||||
}
|
||||
})
|
||||
pretty_print(jobH)
|
||||
|
||||
print_header("job.create - I (leaf; depends on F and G)")
|
||||
jobI = client.call("job.create", {
|
||||
"context_id": context_id,
|
||||
"job": {
|
||||
"id": job_i,
|
||||
"caller_id": actor_id,
|
||||
"context_id": context_id,
|
||||
"script": "print('I')",
|
||||
"script_type": "Python",
|
||||
"timeout": 30,
|
||||
"retries": 0,
|
||||
"env_vars": {},
|
||||
"prerequisites": [],
|
||||
"depends": [job_f, job_g]
|
||||
}
|
||||
})
|
||||
pretty_print(jobI)
|
||||
|
||||
print_header("flow.create")
|
||||
flow = client.call("flow.create", {
|
||||
"context_id": context_id,
|
||||
"flow": {
|
||||
"id": flow_id,
|
||||
"caller_id": actor_id,
|
||||
"context_id": context_id,
|
||||
"jobs": [job_a, job_b, job_c, job_d, job_e, job_f, job_g, job_h, job_i],
|
||||
"env_vars": {}
|
||||
}
|
||||
})
|
||||
pretty_print(flow)
|
||||
|
||||
print_header("flow.dag")
|
||||
dag = client.call("flow.dag", {"context_id": context_id, "id": flow_id})
|
||||
summarize_dag(dag)
|
||||
|
||||
# Validate roots and leaves
|
||||
got_roots = list(map(int, dag.get("roots", [])))
|
||||
if got_roots != sorted([job_a, job_b]):
|
||||
print("WARNING: Unexpected roots:", got_roots, file=sys.stderr)
|
||||
|
||||
got_leaves = {int(x) for x in dag.get("leaves", [])}
|
||||
expected_leaves = {job_h, job_i}
|
||||
if got_leaves != expected_leaves:
|
||||
print("WARNING: Unexpected leaves:", got_leaves, "expected:", expected_leaves, file=sys.stderr)
|
||||
|
||||
# Check edges reflect the expanded DAG
|
||||
expected_edges = [
|
||||
(job_a, job_c), (job_b, job_c),
|
||||
(job_a, job_d), (job_b, job_e),
|
||||
(job_c, job_f), (job_d, job_f),
|
||||
(job_c, job_g), (job_e, job_g),
|
||||
(job_f, job_h), (job_g, job_h),
|
||||
(job_f, job_i), (job_g, job_i),
|
||||
]
|
||||
try:
|
||||
assert_edges(dag.get("edges", []), expected_edges)
|
||||
print("DAG edges contain expected dependencies:", expected_edges)
|
||||
except AssertionError as e:
|
||||
print("WARNING:", e, file=sys.stderr)
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
main()
|
||||
except Exception as e:
|
||||
print_header("Error")
|
||||
print(str(e))
|
||||
sys.exit(1)
|
@@ -2,8 +2,8 @@ use clap::Parser;
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::sync::Arc;
|
||||
|
||||
use tracing::{error, info, warn};
|
||||
use tracing_subscriber::{EnvFilter, fmt};
|
||||
use tracing::{error, info};
|
||||
use tracing_subscriber::EnvFilter;
|
||||
#[derive(Debug, Clone, Parser)]
|
||||
#[command(
|
||||
name = "herocoordinator",
|
||||
@@ -25,8 +25,8 @@ struct Cli {
|
||||
long = "mycelium-port",
|
||||
short = 'p',
|
||||
env = "MYCELIUM_PORT",
|
||||
default_value_t = 9651u16,
|
||||
help = "Port for Mycelium JSON-RPC (default: 9651)"
|
||||
default_value_t = 8990u16,
|
||||
help = "Port for Mycelium JSON-RPC (default: 8990)"
|
||||
)]
|
||||
mycelium_port: u16,
|
||||
|
||||
|
172
src/router.rs
172
src/router.rs
@@ -8,7 +8,7 @@ use crate::{
|
||||
models::{Job, JobStatus, Message, MessageStatus, ScriptType, TransportStatus},
|
||||
service::AppService,
|
||||
};
|
||||
use tracing::{error, info, warn};
|
||||
use tracing::{error, info};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct RouterConfig {
|
||||
@@ -227,62 +227,63 @@ async fn deliver_one(
|
||||
// Stop on terminal states
|
||||
if matches!(s, TransportStatus::Delivered | TransportStatus::Read) {
|
||||
// On Read, fetch supervisor job.status and update local job/message if terminal
|
||||
if matches!(s, TransportStatus::Read) {
|
||||
if let Some(job_id) = job_id_opt {
|
||||
let sup = SupervisorClient::new_with_client(
|
||||
client.clone(),
|
||||
sup_dest.clone(),
|
||||
sup_topic.clone(),
|
||||
None,
|
||||
);
|
||||
match sup.job_status_sync(job_id.to_string(), 10).await {
|
||||
Ok(remote_status) => {
|
||||
if let Some((mapped, terminal)) =
|
||||
map_supervisor_job_status(&remote_status)
|
||||
{
|
||||
if terminal {
|
||||
let _ = service_poll
|
||||
.update_job_status_unchecked(
|
||||
context_id,
|
||||
caller_id,
|
||||
job_id,
|
||||
mapped.clone(),
|
||||
)
|
||||
.await;
|
||||
if matches!(s, TransportStatus::Read)
|
||||
&& let Some(job_id) = job_id_opt
|
||||
{
|
||||
let sup = SupervisorClient::new_with_client(
|
||||
client.clone(),
|
||||
sup_dest.clone(),
|
||||
sup_topic.clone(),
|
||||
None,
|
||||
);
|
||||
match sup.job_status_sync(job_id.to_string(), 10).await {
|
||||
Ok(remote_status) => {
|
||||
if let Some((mapped, terminal)) =
|
||||
map_supervisor_job_status(&remote_status)
|
||||
{
|
||||
if terminal {
|
||||
let _ = service_poll
|
||||
.update_job_status_unchecked(
|
||||
context_id,
|
||||
caller_id,
|
||||
job_id,
|
||||
mapped.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
// After terminal status, fetch supervisor job.result and store into Job.result
|
||||
let sup = SupervisorClient::new_with_client(
|
||||
client.clone(),
|
||||
sup_dest.clone(),
|
||||
sup_topic.clone(),
|
||||
None,
|
||||
);
|
||||
match sup
|
||||
.job_result_sync(
|
||||
job_id.to_string(),
|
||||
job_result_reply_timeout,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(result_map) => {
|
||||
// Persist the result into the Job.result map (merge)
|
||||
let _ = service_poll
|
||||
.update_job_result_merge_unchecked(
|
||||
context_id,
|
||||
caller_id,
|
||||
job_id,
|
||||
result_map.clone(),
|
||||
)
|
||||
.await;
|
||||
// Log which key was stored (success or error)
|
||||
let key = result_map
|
||||
.keys()
|
||||
.next()
|
||||
.cloned()
|
||||
.unwrap_or_else(|| {
|
||||
"unknown".to_string()
|
||||
});
|
||||
let _ = service_poll
|
||||
// After terminal status, fetch supervisor job.result and store into Job.result
|
||||
let sup = SupervisorClient::new_with_client(
|
||||
client.clone(),
|
||||
sup_dest.clone(),
|
||||
sup_topic.clone(),
|
||||
None,
|
||||
);
|
||||
match sup
|
||||
.job_result_sync(
|
||||
job_id.to_string(),
|
||||
job_result_reply_timeout,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(result_map) => {
|
||||
// Persist the result into the Job.result map (merge)
|
||||
let _ = service_poll
|
||||
.update_job_result_merge_unchecked(
|
||||
context_id,
|
||||
caller_id,
|
||||
job_id,
|
||||
result_map.clone(),
|
||||
)
|
||||
.await;
|
||||
// Log which key was stored (success or error)
|
||||
let key = result_map
|
||||
.keys()
|
||||
.next()
|
||||
.cloned()
|
||||
.unwrap_or_else(|| {
|
||||
"unknown".to_string()
|
||||
});
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
@@ -293,9 +294,9 @@ async fn deliver_one(
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = service_poll
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
@@ -306,19 +307,19 @@ async fn deliver_one(
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Mark message as processed
|
||||
let _ = service_poll
|
||||
.update_message_status(
|
||||
context_id,
|
||||
caller_id,
|
||||
id,
|
||||
MessageStatus::Processed,
|
||||
)
|
||||
.await;
|
||||
let _ = service_poll
|
||||
// Mark message as processed
|
||||
let _ = service_poll
|
||||
.update_message_status(
|
||||
context_id,
|
||||
caller_id,
|
||||
id,
|
||||
MessageStatus::Processed,
|
||||
)
|
||||
.await;
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
@@ -329,32 +330,31 @@ async fn deliver_one(
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
} else {
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
id,
|
||||
vec![format!(
|
||||
"Unknown supervisor status '{}' for job {}",
|
||||
remote_status, job_id
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
} else {
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
id,
|
||||
vec![format!("job.status sync error: {}", e)],
|
||||
vec![format!(
|
||||
"Unknown supervisor status '{}' for job {}",
|
||||
remote_status, job_id
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = service_poll
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
id,
|
||||
vec![format!("job.status sync error: {}", e)],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
@@ -10,7 +10,7 @@ use crate::models::{
|
||||
Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageStatus, Runner,
|
||||
TransportStatus,
|
||||
};
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
use tracing::{error, warn};
|
||||
|
||||
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||
|
||||
@@ -122,7 +122,7 @@ impl RedisDriver {
|
||||
warn!(db=%db, key=%key, error=%e, "DEL before HSET failed");
|
||||
}
|
||||
// Write all fields
|
||||
let _: usize = cm.hset_multiple(key, &pairs).await.map_err(|e| {
|
||||
let _: () = cm.hset_multiple(key, &pairs).await.map_err(|e| {
|
||||
error!(db=%db, key=%key, error=%e, "HSET multiple failed");
|
||||
e
|
||||
})?;
|
||||
@@ -323,7 +323,7 @@ impl RedisDriver {
|
||||
("status".to_string(), status_str),
|
||||
("updated_at".to_string(), ts.to_string()),
|
||||
];
|
||||
let _: usize = cm.hset_multiple(&key, &pairs).await.map_err(|e| {
|
||||
let _: () = cm.hset_multiple(&key, &pairs).await.map_err(|e| {
|
||||
error!(db=%db, key=%key, error=%e, "HSET update_job_status failed");
|
||||
e
|
||||
})?;
|
||||
@@ -372,7 +372,7 @@ impl RedisDriver {
|
||||
("status".to_string(), status_str),
|
||||
("updated_at".to_string(), ts.to_string()),
|
||||
];
|
||||
let _: usize = cm.hset_multiple(&key, &pairs).await.map_err(|e| {
|
||||
let _: () = cm.hset_multiple(&key, &pairs).await.map_err(|e| {
|
||||
error!(db=%db, key=%key, error=%e, "HSET update_flow_status failed");
|
||||
e
|
||||
})?;
|
||||
|
Reference in New Issue
Block a user