Compare commits
3 Commits
de6c799635
...
c38937f1cb
Author | SHA1 | Date | |
---|---|---|---|
|
c38937f1cb
|
||
|
059d5131e7
|
||
|
c6077623b0
|
@@ -3,6 +3,7 @@
|
||||
Supervisor flow demo for HeroCoordinator.
|
||||
|
||||
This script:
|
||||
- Optionally pre-registers a Python runner on the target Supervisor over Mycelium using an admin secret (--admin-secret). If the flag is not set, this step is skipped.
|
||||
- Creates an actor
|
||||
- Creates a context granting the actor admin/reader/executor privileges
|
||||
- Registers a Runner in the context targeting a Supervisor reachable via Mycelium (by public key or IP)
|
||||
@@ -20,10 +21,13 @@ Notes:
|
||||
- Exactly one of --dst-ip or --dst-pk must be provided.
|
||||
- Runner.topic defaults to "supervisor.rpc" (see main.rs).
|
||||
- The router auto-discovers contexts and will deliver job.run messages to the supervisor.
|
||||
- Mycelium URL is read from MYCELIUM_URL (default http://127.0.0.1:8990).
|
||||
- supervisor.register_runner uses static name="python" and queue="python".
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import base64
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
@@ -36,6 +40,9 @@ JSONRPC_VERSION = "2.0"
|
||||
def env_url() -> str:
|
||||
return os.getenv("COORDINATOR_URL", "http://127.0.0.1:9652").rstrip("/")
|
||||
|
||||
def env_mycelium_url() -> str:
|
||||
return os.getenv("MYCELIUM_URL", "http://127.0.0.1:8990").rstrip("/")
|
||||
|
||||
|
||||
class JsonRpcClient:
|
||||
def __init__(self, url: str):
|
||||
@@ -87,6 +94,60 @@ def print_header(title: str):
|
||||
def pretty(obj: Any):
|
||||
print(json.dumps(obj, indent=2, sort_keys=True))
|
||||
|
||||
def mycelium_register_runner(
|
||||
myc: "JsonRpcClient",
|
||||
dst_pk: Optional[str],
|
||||
dst_ip: Optional[str],
|
||||
topic: str,
|
||||
admin_secret: str,
|
||||
name: str = "python",
|
||||
queue: str = "python",
|
||||
timeout: int = 15,
|
||||
) -> Any:
|
||||
"""
|
||||
Send supervisor.register_runner over Mycelium using pushMessage and wait for the reply.
|
||||
- myc: JsonRpcClient for the Mycelium API (MYCELIUM_URL)
|
||||
- dst_pk/dst_ip: destination on the overlay; one of them must be provided
|
||||
- topic: message topic (defaults to supervisor.rpc from args)
|
||||
- admin_secret: supervisor admin secret to authorize the registration
|
||||
- name/queue: static identifiers for the python runner on the supervisor
|
||||
- timeout: seconds to wait for a reply
|
||||
Returns the JSON-RPC 'result' from the supervisor or raises on error/timeout.
|
||||
"""
|
||||
envelope = {
|
||||
"jsonrpc": JSONRPC_VERSION,
|
||||
"id": 1,
|
||||
"method": "register_runner",
|
||||
"params": [{"secret": admin_secret, "name": name, "queue": queue}],
|
||||
}
|
||||
payload_b64 = base64.b64encode(json.dumps(envelope).encode("utf-8")).decode("ascii")
|
||||
topic_b64 = base64.b64encode(topic.encode("utf-8")).decode("ascii")
|
||||
|
||||
if dst_pk:
|
||||
dst = {"pk": dst_pk}
|
||||
elif dst_ip:
|
||||
dst = {"ip": dst_ip}
|
||||
else:
|
||||
raise RuntimeError("Either dst_pk or dst_ip must be provided for Mycelium destination")
|
||||
|
||||
params = {
|
||||
"message": {"dst": dst, "topic": topic_b64, "payload": payload_b64},
|
||||
}
|
||||
resp = myc.call("pushMessage", params)
|
||||
time.sleep(15)
|
||||
|
||||
# Expect an InboundMessage with a payload if a reply was received
|
||||
# if isinstance(resp, dict) and "payload" in resp:
|
||||
# try:
|
||||
# reply = json.loads(base64.b64decode(resp["payload"]).decode("utf-8"))
|
||||
# except Exception as e:
|
||||
# raise RuntimeError(f"Invalid supervisor reply payload: {e}")
|
||||
# if isinstance(reply, dict) and reply.get("error"):
|
||||
# raise RuntimeError(f"Supervisor register_runner error: {json.dumps(reply['error'])}")
|
||||
# return reply.get("result")
|
||||
#
|
||||
# raise RuntimeError("No reply received from supervisor for register_runner (timeout)")
|
||||
|
||||
|
||||
def try_create_or_load(client: JsonRpcClient, create_method: str, create_params: Dict[str, Any],
|
||||
load_method: str, load_params: Dict[str, Any]) -> Any:
|
||||
@@ -124,6 +185,7 @@ def parse_args() -> argparse.Namespace:
|
||||
)
|
||||
p.add_argument("--topic", default="supervisor.rpc", help="Supervisor topic. Default: supervisor.rpc")
|
||||
p.add_argument("--secret", help="Optional supervisor secret used for authenticated supervisor calls")
|
||||
p.add_argument("--admin-secret", help="Supervisor admin secret to pre-register a Python runner over Mycelium. If omitted, pre-registration is skipped.")
|
||||
p.add_argument("--poll-interval", type=float, default=2.0, help="Flow poll interval seconds. Default: 2.0")
|
||||
p.add_argument("--poll-timeout", type=int, default=600, help="Max seconds to wait for flow completion. Default: 600")
|
||||
return p.parse_args()
|
||||
@@ -138,6 +200,9 @@ def main():
|
||||
url = env_url()
|
||||
client = JsonRpcClient(url)
|
||||
|
||||
mycelium_url = env_mycelium_url()
|
||||
mycelium_client = JsonRpcClient(mycelium_url) if getattr(args, "admin_secret", None) else None
|
||||
|
||||
actor_id = int(args.actor_id)
|
||||
context_id = int(args.context_id)
|
||||
runner_id = int(args.runner_id)
|
||||
@@ -189,6 +254,25 @@ def main():
|
||||
runner_pubkey = args.dst_pk if args.dst_pk else ""
|
||||
runner_address = args.dst_ip if args.dst_ip else "127.0.0.1"
|
||||
|
||||
# Optional: pre-register a Python runner on the Supervisor over Mycelium using an admin secret
|
||||
if getattr(args, "admin_secret", None):
|
||||
print_header("supervisor.register_runner (pre-register via Mycelium)")
|
||||
try:
|
||||
mycelium_result = mycelium_register_runner(
|
||||
mycelium_client,
|
||||
args.dst_pk if args.dst_pk else None,
|
||||
args.dst_ip if args.dst_ip else None,
|
||||
topic,
|
||||
args.admin_secret,
|
||||
name="Python",
|
||||
queue="Python",
|
||||
timeout=15,
|
||||
)
|
||||
print("Supervisor register_runner ->", mycelium_result)
|
||||
except Exception as e:
|
||||
print(f"ERROR: Supervisor pre-registration failed: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
print_header("runner.create (or load)")
|
||||
# runner.load requires both context_id and id
|
||||
try:
|
||||
@@ -351,4 +435,4 @@ if __name__ == "__main__":
|
||||
except Exception as e:
|
||||
print_header("Error")
|
||||
print(str(e))
|
||||
sys.exit(1)
|
||||
sys.exit(1)
|
||||
|
@@ -3,6 +3,8 @@ use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
use reqwest::Client as HttpClient;
|
||||
|
||||
use base64::Engine;
|
||||
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
|
||||
use serde_json::{Value, json};
|
||||
use thiserror::Error;
|
||||
|
||||
@@ -53,6 +55,8 @@ impl MyceliumClient {
|
||||
"method": method,
|
||||
"params": [ params ]
|
||||
});
|
||||
|
||||
tracing::info!(%req, "jsonrpc");
|
||||
let resp = self.http.post(&self.base_url).json(&req).send().await?;
|
||||
let status = resp.status();
|
||||
let body: Value = resp.json().await?;
|
||||
@@ -159,6 +163,83 @@ impl MyceliumClient {
|
||||
.and_then(|v| v.as_str())
|
||||
.map(|s| s.to_string())
|
||||
}
|
||||
/// popMessage: retrieve an inbound message if available (optionally filtered by topic).
|
||||
/// - peek: if true, do not remove the message from the queue
|
||||
/// - timeout_secs: seconds to wait for a message (0 returns immediately)
|
||||
/// - topic_plain: optional plain-text topic which will be base64-encoded per Mycelium spec
|
||||
/// Returns:
|
||||
/// - Ok(Some(result_json)) on success, where result_json matches InboundMessage schema
|
||||
/// - Ok(None) when there is no message ready (Mycelium returns error code 204)
|
||||
pub async fn pop_message(
|
||||
&self,
|
||||
peek: Option<bool>,
|
||||
timeout_secs: Option<u64>,
|
||||
topic_plain: Option<&str>,
|
||||
) -> Result<Option<Value>, MyceliumClientError> {
|
||||
// Build params array
|
||||
let mut params_array = vec![];
|
||||
if let Some(p) = peek {
|
||||
params_array.push(serde_json::Value::Bool(p));
|
||||
} else {
|
||||
params_array.push(serde_json::Value::Null)
|
||||
}
|
||||
if let Some(t) = timeout_secs {
|
||||
params_array.push(serde_json::Value::Number(t.into()));
|
||||
} else {
|
||||
params_array.push(serde_json::Value::Null)
|
||||
}
|
||||
if let Some(tp) = topic_plain {
|
||||
let topic_b64 = BASE64_STANDARD.encode(tp.as_bytes());
|
||||
params_array.push(serde_json::Value::String(topic_b64));
|
||||
} else {
|
||||
params_array.push(serde_json::Value::Null)
|
||||
}
|
||||
|
||||
let req = json!({
|
||||
"jsonrpc": "2.0",
|
||||
"id": self.next_id(),
|
||||
"method": "popMessage",
|
||||
"params": serde_json::Value::Array(params_array),
|
||||
});
|
||||
|
||||
tracing::info!(%req, "calling popMessage");
|
||||
|
||||
let resp = self.http.post(&self.base_url).json(&req).send().await?;
|
||||
let status = resp.status();
|
||||
let body: Value = resp.json().await?;
|
||||
|
||||
// Handle JSON-RPC error envelope specially for code 204 (no message ready)
|
||||
if let Some(err) = body.get("error") {
|
||||
let code = err.get("code").and_then(|v| v.as_i64()).unwrap_or(0);
|
||||
let msg = err
|
||||
.get("message")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("unknown error");
|
||||
|
||||
if code == 204 {
|
||||
// No message ready
|
||||
return Ok(None);
|
||||
}
|
||||
if code == 408 {
|
||||
// Align with other transport timeout mapping
|
||||
return Err(MyceliumClientError::TransportTimeout);
|
||||
}
|
||||
return Err(MyceliumClientError::RpcError(format!(
|
||||
"code={code} msg={msg}"
|
||||
)));
|
||||
}
|
||||
|
||||
if !status.is_success() {
|
||||
return Err(MyceliumClientError::RpcError(format!(
|
||||
"HTTP {status}, body {body}"
|
||||
)));
|
||||
}
|
||||
|
||||
let result = body.get("result").ok_or_else(|| {
|
||||
MyceliumClientError::InvalidResponse(format!("missing result in response: {body}"))
|
||||
})?;
|
||||
Ok(Some(result.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@@ -99,6 +99,16 @@ impl SupervisorClient {
|
||||
})
|
||||
}
|
||||
|
||||
/// Build a supervisor JSON-RPC payload but force a specific id (used for correlation).
|
||||
fn build_supervisor_payload_with_id(&self, method: &str, params: Value, id: u64) -> Value {
|
||||
json!({
|
||||
"jsonrpc": "2.0",
|
||||
"id": id,
|
||||
"method": method,
|
||||
"params": params,
|
||||
})
|
||||
}
|
||||
|
||||
fn encode_payload(payload: &Value) -> Result<String, SupervisorClientError> {
|
||||
let s = serde_json::to_string(payload)?;
|
||||
Ok(BASE64_STANDARD.encode(s.as_bytes()))
|
||||
@@ -147,6 +157,42 @@ impl SupervisorClient {
|
||||
)))
|
||||
}
|
||||
|
||||
/// Variant of call that also returns the inner supervisor JSON-RPC id used in the payload.
|
||||
/// This id is required to correlate asynchronous popMessage replies coming from Mycelium.
|
||||
pub async fn call_with_ids(
|
||||
&self,
|
||||
method: &str,
|
||||
params: Value,
|
||||
) -> Result<(String, u64), SupervisorClientError> {
|
||||
let inner_id = self.next_id();
|
||||
let inner = self.build_supervisor_payload_with_id(method, params, inner_id);
|
||||
let payload_b64 = Self::encode_payload(&inner)?;
|
||||
let result = self
|
||||
.mycelium
|
||||
.push_message(
|
||||
&self.destination,
|
||||
&Self::encode_topic(self.topic.as_bytes()),
|
||||
&payload_b64,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let out_id = if let Some(id) = MyceliumClient::extract_message_id_from_result(&result) {
|
||||
id
|
||||
} else if let Some(arr) = result.as_array()
|
||||
&& arr.len() == 1
|
||||
&& let Some(id) = MyceliumClient::extract_message_id_from_result(&arr[0])
|
||||
{
|
||||
id
|
||||
} else {
|
||||
return Err(SupervisorClientError::InvalidResponse(format!(
|
||||
"result did not contain message id: {result}"
|
||||
)));
|
||||
};
|
||||
|
||||
Ok((out_id, inner_id))
|
||||
}
|
||||
|
||||
/// Synchronous variant: wait for a JSON-RPC reply via Mycelium reply_timeout, and return the inner JSON-RPC "result".
|
||||
/// If the supervisor returns an error object, map to RpcError.
|
||||
pub async fn call_sync(
|
||||
@@ -162,7 +208,7 @@ impl SupervisorClient {
|
||||
.mycelium
|
||||
.push_message(
|
||||
&self.destination,
|
||||
&self.topic,
|
||||
&Self::encode_topic(self.topic.as_bytes()),
|
||||
&payload_b64,
|
||||
Some(reply_timeout_secs),
|
||||
)
|
||||
@@ -308,6 +354,19 @@ impl SupervisorClient {
|
||||
self.call("job.run", params).await
|
||||
}
|
||||
|
||||
/// Typed wrapper returning both outbound Mycelium id and inner supervisor JSON-RPC id.
|
||||
pub async fn job_run_with_ids(
|
||||
&self,
|
||||
job: Value,
|
||||
) -> Result<(String, u64), SupervisorClientError> {
|
||||
let secret = self.need_secret()?;
|
||||
let params = json!([{
|
||||
"secret": secret,
|
||||
"job": job
|
||||
}]);
|
||||
self.call_with_ids("job.run", params).await
|
||||
}
|
||||
|
||||
pub async fn job_start(
|
||||
&self,
|
||||
job_id: impl Into<String>,
|
||||
|
@@ -99,7 +99,7 @@ async fn main() {
|
||||
// Shared application state
|
||||
let state = Arc::new(herocoordinator::rpc::AppState::new(service));
|
||||
|
||||
// Start router workers (auto-discovered contexts)
|
||||
// Start router workers (auto-discovered contexts) and a single global inbound listener
|
||||
{
|
||||
let base_url = format!("http://{}:{}", cli.mycelium_ip, cli.mycelium_port);
|
||||
let cfg = herocoordinator::router::RouterConfig {
|
||||
@@ -110,6 +110,10 @@ async fn main() {
|
||||
transport_poll_interval_secs: 2,
|
||||
transport_poll_timeout_secs: 300,
|
||||
};
|
||||
// Global inbound listener for supervisor replies via Mycelium popMessage
|
||||
let _inbound_handle =
|
||||
herocoordinator::router::start_inbound_listener(service_for_router.clone(), cfg.clone());
|
||||
// Per-context outbound delivery loops
|
||||
let _auto_handle = herocoordinator::router::start_router_auto(service_for_router, cfg);
|
||||
}
|
||||
|
||||
|
182
src/router.rs
182
src/router.rs
@@ -1,5 +1,7 @@
|
||||
use std::{collections::HashSet, sync::Arc};
|
||||
|
||||
use base64::Engine;
|
||||
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
|
||||
use serde_json::{Value, json};
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
@@ -153,7 +155,24 @@ async fn deliver_one(
|
||||
let params = build_params(&msg)?;
|
||||
|
||||
// Send
|
||||
let out_id = client.call(&method, params).await?;
|
||||
// If this is a job.run and we have a secret configured on the client,
|
||||
// prefer the typed wrapper that injects the secret into inner supervisor params,
|
||||
// and also capture the inner supervisor JSON-RPC id for correlation.
|
||||
let (out_id, inner_id_opt) = if method == "job.run" {
|
||||
if let Some(j) = msg.job.first() {
|
||||
let jv = job_to_json(j)?;
|
||||
// Returns (outbound message id, inner supervisor JSON-RPC id)
|
||||
let (out, inner) = client.job_run_with_ids(jv).await?;
|
||||
(out, Some(inner))
|
||||
} else {
|
||||
// Fallback: no embedded job, use the generic call
|
||||
let out = client.call(&method, params).await?;
|
||||
(out, None)
|
||||
}
|
||||
} else {
|
||||
let out = client.call(&method, params).await?;
|
||||
(out, None)
|
||||
};
|
||||
|
||||
// Store transport id and initial Sent status
|
||||
let _ = service
|
||||
@@ -171,6 +190,13 @@ async fn deliver_one(
|
||||
.update_message_status(context_id, caller_id, id, MessageStatus::Acknowledged)
|
||||
.await?;
|
||||
|
||||
// Record correlation (inner supervisor JSON-RPC id -> job/message) for inbound popMessage handling
|
||||
if let (Some(inner_id), Some(job_id)) = (inner_id_opt, job_id_opt) {
|
||||
let _ = service
|
||||
.supcorr_set(inner_id, context_id, caller_id, job_id, id)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Spawn transport-status poller
|
||||
{
|
||||
let service_poll = service.clone();
|
||||
@@ -474,3 +500,157 @@ pub fn start_router_auto(service: AppService, cfg: RouterConfig) -> tokio::task:
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Start a single global inbound listener that reads Mycelium popMessage with topic filter,
|
||||
/// decodes supervisor JSON-RPC replies, and updates correlated jobs/messages.
|
||||
/// This listens for async replies like {"result":{"job_queued":...}} carrying the same inner JSON-RPC id.
|
||||
pub fn start_inbound_listener(
|
||||
service: AppService,
|
||||
cfg: RouterConfig,
|
||||
) -> tokio::task::JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
// Initialize Mycelium client (retry loop)
|
||||
let mycelium = loop {
|
||||
match MyceliumClient::new(cfg.base_url.clone()) {
|
||||
Ok(c) => break c,
|
||||
Err(e) => {
|
||||
error!(error=%e, "MyceliumClient init error (inbound listener)");
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
loop {
|
||||
// Poll for inbound supervisor messages on the configured topic
|
||||
match mycelium.pop_message(Some(false), Some(20), None).await {
|
||||
Ok(Some(inb)) => {
|
||||
// Expect InboundMessage with base64 "payload"
|
||||
let Some(payload_b64) = inb.get("payload").and_then(|v| v.as_str()) else {
|
||||
// Not a payload-bearing message; ignore
|
||||
continue;
|
||||
};
|
||||
let Ok(raw) = BASE64_STANDARD.decode(payload_b64.as_bytes()) else {
|
||||
let _ = service
|
||||
.append_message_logs(
|
||||
0, // unknown context yet
|
||||
0,
|
||||
0,
|
||||
vec![
|
||||
"Inbound payload base64 decode error (supervisor reply)".into(),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
continue;
|
||||
};
|
||||
tracing::info!(
|
||||
raw = %String::from_utf8_lossy(&raw),
|
||||
"Read raw messge from mycelium"
|
||||
);
|
||||
let Ok(rpc): Result<Value, _> = serde_json::from_slice(&raw) else {
|
||||
// Invalid JSON payload
|
||||
continue;
|
||||
};
|
||||
|
||||
// Extract inner supervisor JSON-RPC id (number preferred; string fallback)
|
||||
let inner_id_u64 = match rpc.get("id") {
|
||||
Some(Value::Number(n)) => n.as_u64(),
|
||||
Some(Value::String(s)) => s.parse::<u64>().ok(),
|
||||
_ => None,
|
||||
};
|
||||
let Some(inner_id) = inner_id_u64 else {
|
||||
// Cannot correlate without id
|
||||
continue;
|
||||
};
|
||||
|
||||
// Lookup correlation mapping
|
||||
match service.supcorr_get(inner_id).await {
|
||||
Ok(Some((context_id, caller_id, job_id, message_id))) => {
|
||||
// Determine success/error from supervisor JSON-RPC envelope
|
||||
let is_success = rpc
|
||||
.get("result")
|
||||
.map(|res| {
|
||||
res.get("job_queued").is_some()
|
||||
|| res.as_str().map(|s| s == "job_queued").unwrap_or(false)
|
||||
})
|
||||
.unwrap_or(false);
|
||||
|
||||
if is_success {
|
||||
// Set to Dispatched (idempotent) per spec choice, and append log
|
||||
let _ = service
|
||||
.update_job_status_unchecked(
|
||||
context_id,
|
||||
caller_id,
|
||||
job_id,
|
||||
JobStatus::Dispatched,
|
||||
)
|
||||
.await;
|
||||
let _ = service
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
message_id,
|
||||
vec![format!(
|
||||
"Supervisor reply for job {}: job_queued",
|
||||
job_id
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
let _ = service.supcorr_del(inner_id).await;
|
||||
} else if let Some(err_obj) = rpc.get("error") {
|
||||
// Error path: set job Error and log details
|
||||
let _ = service
|
||||
.update_job_status_unchecked(
|
||||
context_id,
|
||||
caller_id,
|
||||
job_id,
|
||||
JobStatus::Error,
|
||||
)
|
||||
.await;
|
||||
let _ = service
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
message_id,
|
||||
vec![format!(
|
||||
"Supervisor error for job {}: {}",
|
||||
job_id, err_obj
|
||||
)],
|
||||
)
|
||||
.await;
|
||||
let _ = service.supcorr_del(inner_id).await;
|
||||
} else {
|
||||
// Unknown result; keep correlation for a later, clearer reply
|
||||
let _ = service
|
||||
.append_message_logs(
|
||||
context_id,
|
||||
caller_id,
|
||||
message_id,
|
||||
vec![
|
||||
"Supervisor reply did not contain job_queued or error"
|
||||
.to_string(),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
// No correlation found; ignore or log once
|
||||
}
|
||||
Err(e) => {
|
||||
error!(error=%e, "supcorr_get error");
|
||||
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
// No message; continue polling
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
error!(error=%e, "popMessage error");
|
||||
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@@ -1161,6 +1161,37 @@ impl AppService {
|
||||
pub async fn scan_runners(&self, context_id: u32) -> Result<Vec<Runner>, BoxError> {
|
||||
self.redis.scan_runners(context_id).await
|
||||
}
|
||||
|
||||
/// Correlation map: store mapping from inner supervisor JSON-RPC id to context/caller/job/message.
|
||||
pub async fn supcorr_set(
|
||||
&self,
|
||||
inner_id: u64,
|
||||
context_id: u32,
|
||||
caller_id: u32,
|
||||
job_id: u32,
|
||||
message_id: u32,
|
||||
) -> Result<(), BoxError> {
|
||||
self.redis
|
||||
.supcorr_set(inner_id, context_id, caller_id, job_id, message_id)
|
||||
.await
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
/// Correlation map: load mapping by inner supervisor JSON-RPC id.
|
||||
pub async fn supcorr_get(
|
||||
&self,
|
||||
inner_id: u64,
|
||||
) -> Result<Option<(u32, u32, u32, u32)>, BoxError> {
|
||||
self.redis
|
||||
.supcorr_get(inner_id)
|
||||
.await
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
||||
/// Correlation map: delete mapping by inner supervisor JSON-RPC id.
|
||||
pub async fn supcorr_del(&self, inner_id: u64) -> Result<(), BoxError> {
|
||||
self.redis.supcorr_del(inner_id).await.map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
||||
/// Auto-discovery helpers for contexts (wrappers over RedisDriver)
|
||||
|
@@ -751,4 +751,80 @@ impl RedisDriver {
|
||||
out.sort_unstable();
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
// -----------------------------
|
||||
// Supervisor correlation mapping (DB 0)
|
||||
// Key: "supcorr:{inner_id_decimal}"
|
||||
// Value: JSON {"context_id":u32,"caller_id":u32,"job_id":u32,"message_id":u32}
|
||||
// TTL: 1 hour to avoid leaks in case of crashes
|
||||
pub async fn supcorr_set(
|
||||
&self,
|
||||
inner_id: u64,
|
||||
context_id: u32,
|
||||
caller_id: u32,
|
||||
job_id: u32,
|
||||
message_id: u32,
|
||||
) -> Result<()> {
|
||||
let mut cm = self.manager_for_db(0).await?;
|
||||
let key = format!("supcorr:{}", inner_id);
|
||||
let val = serde_json::json!({
|
||||
"context_id": context_id,
|
||||
"caller_id": caller_id,
|
||||
"job_id": job_id,
|
||||
"message_id": message_id,
|
||||
})
|
||||
.to_string();
|
||||
// SET key val EX 3600
|
||||
let _: () = redis::cmd("SET")
|
||||
.arg(&key)
|
||||
.arg(&val)
|
||||
.arg("EX")
|
||||
.arg(3600)
|
||||
.query_async(&mut cm)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!(db=0, key=%key, error=%e, "SET supcorr_set failed");
|
||||
e
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn supcorr_get(
|
||||
&self,
|
||||
inner_id: u64,
|
||||
) -> Result<Option<(u32, u32, u32, u32)>> {
|
||||
let mut cm = self.manager_for_db(0).await?;
|
||||
let key = format!("supcorr:{}", inner_id);
|
||||
let res: Option<String> = redis::cmd("GET")
|
||||
.arg(&key)
|
||||
.query_async(&mut cm)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!(db=0, key=%key, error=%e, "GET supcorr_get failed");
|
||||
e
|
||||
})?;
|
||||
if let Some(s) = res {
|
||||
let v: Value = serde_json::from_str(&s)?;
|
||||
let ctx = v.get("context_id").and_then(|x| x.as_u64()).unwrap_or(0) as u32;
|
||||
let caller = v.get("caller_id").and_then(|x| x.as_u64()).unwrap_or(0) as u32;
|
||||
let job = v.get("job_id").and_then(|x| x.as_u64()).unwrap_or(0) as u32;
|
||||
let msg = v.get("message_id").and_then(|x| x.as_u64()).unwrap_or(0) as u32;
|
||||
return Ok(Some((ctx, caller, job, msg)));
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub async fn supcorr_del(&self, inner_id: u64) -> Result<()> {
|
||||
let mut cm = self.manager_for_db(0).await?;
|
||||
let key = format!("supcorr:{}", inner_id);
|
||||
let _: i64 = redis::cmd("DEL")
|
||||
.arg(&key)
|
||||
.query_async(&mut cm)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!(db=0, key=%key, error=%e, "DEL supcorr_del failed");
|
||||
e
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user