forked from herocode/horus
move repos into monorepo
This commit is contained in:
827
bin/coordinator/src/storage/redis.rs
Normal file
827
bin/coordinator/src/storage/redis.rs
Normal file
@@ -0,0 +1,827 @@
|
||||
use std::collections::HashMap as StdHashMap;
|
||||
|
||||
use redis::{AsyncCommands, aio::ConnectionManager};
|
||||
use serde::Serialize;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde_json::{Map as JsonMap, Value};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::models::{
|
||||
Actor, Context, Flow, FlowStatus, Job, JobStatus, Message, MessageStatus, Runner,
|
||||
TransportStatus,
|
||||
};
|
||||
use tracing::{error, warn};
|
||||
|
||||
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||
|
||||
/// Async Redis driver that saves/loads every model as a Redis hash (HSET),
|
||||
/// using canonical keys as specified in the specs.
|
||||
/// - Complex fields (arrays, maps, nested structs) are JSON-encoded per field
|
||||
/// - Scalars are written as plain strings (numbers/bools as their string representation)
|
||||
/// - On load, each field value is first attempted to parse as JSON; if that fails it is treated as a plain string
|
||||
pub struct RedisDriver {
|
||||
/// Base address, e.g. "127.0.0.1:6379" or "redis://127.0.0.1:6379"
|
||||
base_addr: String,
|
||||
/// Cache of connection managers per DB index
|
||||
managers: Mutex<StdHashMap<u32, ConnectionManager>>,
|
||||
}
|
||||
|
||||
impl RedisDriver {
|
||||
/// Create a new driver for the given Redis address.
|
||||
/// Accepts either "host:port" or "redis://host:port"
|
||||
pub async fn new(addr: impl Into<String>) -> Result<Self> {
|
||||
let raw = addr.into();
|
||||
let base_addr = if raw.starts_with("redis://") {
|
||||
raw
|
||||
} else {
|
||||
format!("redis://{}", raw)
|
||||
};
|
||||
Ok(Self {
|
||||
base_addr,
|
||||
managers: Mutex::new(StdHashMap::new()),
|
||||
})
|
||||
}
|
||||
|
||||
/// Get or create a ConnectionManager for the given DB index.
|
||||
async fn manager_for_db(&self, db: u32) -> Result<ConnectionManager> {
|
||||
{
|
||||
// Fast path: check existing
|
||||
let guard = self.managers.lock().await;
|
||||
if let Some(cm) = guard.get(&db) {
|
||||
return Ok(cm.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// Slow path: create a new manager and cache it
|
||||
let url = format!("{}/{}", self.base_addr.trim_end_matches('/'), db);
|
||||
let client = redis::Client::open(url.as_str()).map_err(|e| {
|
||||
error!(%url, db=%db, error=%e, "Redis client open failed");
|
||||
e
|
||||
})?;
|
||||
let cm = client.get_connection_manager().await.map_err(|e| {
|
||||
error!(%url, db=%db, error=%e, "Redis connection manager init failed");
|
||||
e
|
||||
})?;
|
||||
|
||||
let mut guard = self.managers.lock().await;
|
||||
let entry = guard.entry(db).or_insert(cm);
|
||||
Ok(entry.clone())
|
||||
}
|
||||
|
||||
// -----------------------------
|
||||
// Generic helpers (serde <-> HSET)
|
||||
// -----------------------------
|
||||
|
||||
fn struct_to_hset_pairs<T: Serialize>(value: &T) -> Result<Vec<(String, String)>> {
|
||||
let json = serde_json::to_value(value)?;
|
||||
let obj = json
|
||||
.as_object()
|
||||
.ok_or("Model must serialize to a JSON object")?;
|
||||
let mut pairs = Vec::with_capacity(obj.len());
|
||||
for (k, v) in obj {
|
||||
let s = match v {
|
||||
Value::Array(_) | Value::Object(_) => serde_json::to_string(v)?, // complex - store JSON
|
||||
Value::String(s) => s.clone(), // string - plain
|
||||
Value::Number(n) => n.to_string(), // number - plain
|
||||
Value::Bool(b) => b.to_string(), // bool - plain
|
||||
Value::Null => "null".to_string(), // null sentinel
|
||||
};
|
||||
pairs.push((k.clone(), s));
|
||||
}
|
||||
Ok(pairs)
|
||||
}
|
||||
|
||||
fn hmap_to_struct<T: DeserializeOwned>(map: StdHashMap<String, String>) -> Result<T> {
|
||||
let mut obj = JsonMap::with_capacity(map.len());
|
||||
for (k, s) in map {
|
||||
// Try parse as JSON first (works for arrays, objects, numbers, booleans, null)
|
||||
// If that fails, fallback to string.
|
||||
match serde_json::from_str::<Value>(&s) {
|
||||
Ok(v) => {
|
||||
obj.insert(k, v);
|
||||
}
|
||||
Err(_) => {
|
||||
obj.insert(k, Value::String(s));
|
||||
}
|
||||
}
|
||||
}
|
||||
let json = Value::Object(obj);
|
||||
let model = serde_json::from_value(json)?;
|
||||
Ok(model)
|
||||
}
|
||||
|
||||
async fn hset_model<T: Serialize>(&self, db: u32, key: &str, model: &T) -> Result<()> {
|
||||
let mut cm = self.manager_for_db(db).await?;
|
||||
let pairs = Self::struct_to_hset_pairs(model).map_err(|e| {
|
||||
error!(db=%db, key=%key, error=%e, "Serialize model to HSET pairs failed");
|
||||
e
|
||||
})?;
|
||||
// Ensure no stale fields
|
||||
let del_res: redis::RedisResult<u64> = cm.del(key).await;
|
||||
if let Err(e) = del_res {
|
||||
warn!(db=%db, key=%key, error=%e, "DEL before HSET failed");
|
||||
}
|
||||
// Write all fields
|
||||
let _: () = cm.hset_multiple(key, &pairs).await.map_err(|e| {
|
||||
error!(db=%db, key=%key, error=%e, "HSET multiple failed");
|
||||
e
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn hget_model<T: DeserializeOwned>(&self, db: u32, key: &str) -> Result<T> {
|
||||
let mut cm = self.manager_for_db(db).await?;
|
||||
let map: StdHashMap<String, String> = cm.hgetall(key).await.map_err(|e| {
|
||||
error!(db=%db, key=%key, error=%e, "HGETALL failed");
|
||||
e
|
||||
})?;
|
||||
if map.is_empty() {
|
||||
// NotFound is expected in some flows; don't log as error
|
||||
return Err(format!("Key not found: {}", key).into());
|
||||
}
|
||||
Self::hmap_to_struct(map).map_err(|e| {
|
||||
error!(db=%db, key=%key, error=%e, "Deserialize model from HGETALL failed");
|
||||
e
|
||||
})
|
||||
}
|
||||
|
||||
// -----------------------------
|
||||
// Key helpers (canonical keys)
|
||||
// -----------------------------
|
||||
|
||||
fn actor_key(id: u32) -> String {
|
||||
format!("actor:{}", id)
|
||||
}
|
||||
|
||||
fn context_key(id: u32) -> String {
|
||||
format!("context:{}", id)
|
||||
}
|
||||
|
||||
fn flow_key(id: u32) -> String {
|
||||
format!("flow:{}", id)
|
||||
}
|
||||
|
||||
fn runner_key(id: u32) -> String {
|
||||
format!("runner:{}", id)
|
||||
}
|
||||
|
||||
fn job_key(caller_id: u32, id: u32) -> String {
|
||||
format!("job:{}:{}", caller_id, id)
|
||||
}
|
||||
|
||||
fn message_key(caller_id: u32, id: u32) -> String {
|
||||
format!("message:{}:{}", caller_id, id)
|
||||
}
|
||||
|
||||
// -----------------------------
|
||||
// Context (DB = context.id)
|
||||
// -----------------------------
|
||||
|
||||
/// Save a Context in its own DB (db index = context.id)
|
||||
pub async fn save_context(&self, ctx: &Context) -> Result<()> {
|
||||
// We don't have field access; compute db and key via JSON to avoid changing model definitions.
|
||||
// Extract "id" from serialized JSON object.
|
||||
let json = serde_json::to_value(ctx)?;
|
||||
let id = json
|
||||
.get("id")
|
||||
.and_then(|v| v.as_u64())
|
||||
.ok_or("Context.id missing or not a number")? as u32;
|
||||
let key = Self::context_key(id);
|
||||
// Write the context hash in its own DB
|
||||
self.hset_model(id, &key, ctx).await?;
|
||||
// Register this context id in the global registry (DB 0)
|
||||
let _ = self.register_context_id(id).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Load a Context from its own DB (db index = id)
|
||||
pub async fn load_context(&self, id: u32) -> Result<Context> {
|
||||
let key = Self::context_key(id);
|
||||
self.hget_model(id, &key).await
|
||||
}
|
||||
|
||||
// -----------------------------
|
||||
// Actor
|
||||
// -----------------------------
|
||||
|
||||
/// Save an Actor to the given DB (tenant/context DB)
|
||||
pub async fn save_actor(&self, db: u32, actor: &Actor) -> Result<()> {
|
||||
let json = serde_json::to_value(actor)?;
|
||||
let id = json
|
||||
.get("id")
|
||||
.and_then(|v| v.as_u64())
|
||||
.ok_or("Actor.id missing or not a number")? as u32;
|
||||
let key = Self::actor_key(id);
|
||||
self.hset_model(db, &key, actor).await
|
||||
}
|
||||
|
||||
/// Load an Actor by id from the given DB
|
||||
pub async fn load_actor(&self, db: u32, id: u32) -> Result<Actor> {
|
||||
let key = Self::actor_key(id);
|
||||
self.hget_model(db, &key).await
|
||||
}
|
||||
/// Save an Actor globally in DB 0 (Actor is context-independent)
|
||||
pub async fn save_actor_global(&self, actor: &Actor) -> Result<()> {
|
||||
let json = serde_json::to_value(actor)?;
|
||||
let id = json
|
||||
.get("id")
|
||||
.and_then(|v| v.as_u64())
|
||||
.ok_or("Actor.id missing or not a number")? as u32;
|
||||
let key = Self::actor_key(id);
|
||||
self.hset_model(0, &key, actor).await
|
||||
}
|
||||
|
||||
/// Load an Actor globally from DB 0 by id
|
||||
pub async fn load_actor_global(&self, id: u32) -> Result<Actor> {
|
||||
let key = Self::actor_key(id);
|
||||
self.hget_model(0, &key).await
|
||||
}
|
||||
|
||||
// -----------------------------
|
||||
// Runner
|
||||
// -----------------------------
|
||||
|
||||
pub async fn save_runner(&self, db: u32, runner: &Runner) -> Result<()> {
|
||||
let json = serde_json::to_value(runner)?;
|
||||
let id = json
|
||||
.get("id")
|
||||
.and_then(|v| v.as_u64())
|
||||
.ok_or("Runner.id missing or not a number")? as u32;
|
||||
let key = Self::runner_key(id);
|
||||
self.hset_model(db, &key, runner).await
|
||||
}
|
||||
|
||||
pub async fn load_runner(&self, db: u32, id: u32) -> Result<Runner> {
|
||||
let key = Self::runner_key(id);
|
||||
self.hget_model(db, &key).await
|
||||
}
|
||||
|
||||
// -----------------------------
|
||||
// Flow
|
||||
// -----------------------------
|
||||
|
||||
pub async fn save_flow(&self, db: u32, flow: &Flow) -> Result<()> {
|
||||
let json = serde_json::to_value(flow)?;
|
||||
let id = json
|
||||
.get("id")
|
||||
.and_then(|v| v.as_u64())
|
||||
.ok_or("Flow.id missing or not a number")? as u32;
|
||||
let key = Self::flow_key(id);
|
||||
self.hset_model(db, &key, flow).await
|
||||
}
|
||||
|
||||
pub async fn load_flow(&self, db: u32, id: u32) -> Result<Flow> {
|
||||
let key = Self::flow_key(id);
|
||||
self.hget_model(db, &key).await
|
||||
}
|
||||
|
||||
// -----------------------------
|
||||
// Job
|
||||
// -----------------------------
|
||||
|
||||
pub async fn save_job(&self, db: u32, job: &Job) -> Result<()> {
|
||||
let json = serde_json::to_value(job)?;
|
||||
let id = json
|
||||
.get("id")
|
||||
.and_then(|v| v.as_u64())
|
||||
.ok_or("Job.id missing or not a number")? as u32;
|
||||
let caller_id = json
|
||||
.get("caller_id")
|
||||
.and_then(|v| v.as_u64())
|
||||
.ok_or("Job.caller_id missing or not a number")? as u32;
|
||||
let key = Self::job_key(caller_id, id);
|
||||
self.hset_model(db, &key, job).await
|
||||
}
|
||||
|
||||
pub async fn load_job(&self, db: u32, caller_id: u32, id: u32) -> Result<Job> {
|
||||
let key = Self::job_key(caller_id, id);
|
||||
self.hget_model(db, &key).await
|
||||
}
|
||||
|
||||
/// Atomically update a job's status and `updated_at` fields.
|
||||
/// - No transition validation is performed.
|
||||
/// - Writes only the two fields via HSET to avoid rewriting the whole model.
|
||||
pub async fn update_job_status(
|
||||
&self,
|
||||
db: u32,
|
||||
caller_id: u32,
|
||||
id: u32,
|
||||
status: JobStatus,
|
||||
) -> Result<()> {
|
||||
let mut cm = self.manager_for_db(db).await?;
|
||||
let key = Self::job_key(caller_id, id);
|
||||
|
||||
// Serialize enum into the same plain string representation stored by create paths
|
||||
let status_str = match serde_json::to_value(&status)? {
|
||||
Value::String(s) => s,
|
||||
v => v.to_string(),
|
||||
};
|
||||
|
||||
let ts = crate::time::current_timestamp();
|
||||
|
||||
let pairs = vec![
|
||||
("status".to_string(), status_str),
|
||||
("updated_at".to_string(), ts.to_string()),
|
||||
];
|
||||
let _: () = cm.hset_multiple(&key, &pairs).await.map_err(|e| {
|
||||
error!(db=%db, key=%key, error=%e, "HSET update_job_status failed");
|
||||
e
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// -----------------------------
|
||||
// Message
|
||||
// -----------------------------
|
||||
|
||||
pub async fn save_message(&self, db: u32, message: &Message) -> Result<()> {
|
||||
let json = serde_json::to_value(message)?;
|
||||
let id = json
|
||||
.get("id")
|
||||
.and_then(|v| v.as_u64())
|
||||
.ok_or("Message.id missing or not a number")? as u32;
|
||||
let caller_id = json
|
||||
.get("caller_id")
|
||||
.and_then(|v| v.as_u64())
|
||||
.ok_or("Message.caller_id missing or not a number")? as u32;
|
||||
let key = Self::message_key(caller_id, id);
|
||||
self.hset_model(db, &key, message).await
|
||||
}
|
||||
|
||||
pub async fn load_message(&self, db: u32, caller_id: u32, id: u32) -> Result<Message> {
|
||||
let key = Self::message_key(caller_id, id);
|
||||
self.hget_model(db, &key).await
|
||||
}
|
||||
|
||||
// -----------------------------
|
||||
// Partial update helpers
|
||||
// -----------------------------
|
||||
|
||||
/// Flow: update only status and updated_at
|
||||
pub async fn update_flow_status(&self, db: u32, id: u32, status: FlowStatus) -> Result<()> {
|
||||
let mut cm = self.manager_for_db(db).await?;
|
||||
let key = Self::flow_key(id);
|
||||
|
||||
let status_str = match serde_json::to_value(&status)? {
|
||||
Value::String(s) => s,
|
||||
v => v.to_string(),
|
||||
};
|
||||
let ts = crate::time::current_timestamp();
|
||||
|
||||
let pairs = vec![
|
||||
("status".to_string(), status_str),
|
||||
("updated_at".to_string(), ts.to_string()),
|
||||
];
|
||||
let _: () = cm.hset_multiple(&key, &pairs).await.map_err(|e| {
|
||||
error!(db=%db, key=%key, error=%e, "HSET update_flow_status failed");
|
||||
e
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Message: update only status and updated_at
|
||||
pub async fn update_message_status(
|
||||
&self,
|
||||
db: u32,
|
||||
caller_id: u32,
|
||||
id: u32,
|
||||
status: MessageStatus,
|
||||
) -> Result<()> {
|
||||
let mut cm = self.manager_for_db(db).await?;
|
||||
let key = Self::message_key(caller_id, id);
|
||||
|
||||
let status_str = match serde_json::to_value(&status)? {
|
||||
Value::String(s) => s,
|
||||
v => v.to_string(),
|
||||
};
|
||||
let ts = crate::time::current_timestamp();
|
||||
|
||||
let pairs = vec![
|
||||
("status".to_string(), status_str),
|
||||
("updated_at".to_string(), ts.to_string()),
|
||||
];
|
||||
let _: () = cm.hset_multiple(&key, &pairs).await.map_err(|e| {
|
||||
error!(db=%db, key=%key, error=%e, "HSET update_message_status failed");
|
||||
e
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Message: update transport_id / transport_status (optionally) and bump updated_at
|
||||
pub async fn update_message_transport(
|
||||
&self,
|
||||
db: u32,
|
||||
caller_id: u32,
|
||||
id: u32,
|
||||
transport_id: Option<String>,
|
||||
transport_status: Option<TransportStatus>,
|
||||
) -> Result<()> {
|
||||
let mut cm = self.manager_for_db(db).await?;
|
||||
let key = Self::message_key(caller_id, id);
|
||||
|
||||
let mut pairs: Vec<(String, String)> = Vec::new();
|
||||
|
||||
if let Some(tid) = transport_id {
|
||||
pairs.push(("transport_id".to_string(), tid));
|
||||
}
|
||||
|
||||
if let Some(ts_status) = transport_status {
|
||||
let status_str = match serde_json::to_value(&ts_status)? {
|
||||
Value::String(s) => s,
|
||||
v => v.to_string(),
|
||||
};
|
||||
pairs.push(("transport_status".to_string(), status_str));
|
||||
}
|
||||
|
||||
// Always bump updated_at
|
||||
let ts = crate::time::current_timestamp();
|
||||
pairs.push(("updated_at".to_string(), ts.to_string()));
|
||||
|
||||
let _: () = cm.hset_multiple(&key, &pairs).await.map_err(|e| {
|
||||
error!(db=%db, key=%key, error=%e, "HSET update_message_transport failed");
|
||||
e
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flow: merge env_vars map and bump updated_at
|
||||
pub async fn update_flow_env_vars_merge(
|
||||
&self,
|
||||
db: u32,
|
||||
id: u32,
|
||||
patch: StdHashMap<String, String>,
|
||||
) -> Result<()> {
|
||||
let mut cm = self.manager_for_db(db).await?;
|
||||
let key = Self::flow_key(id);
|
||||
|
||||
let current: Option<String> = cm.hget(&key, "env_vars").await.ok();
|
||||
let mut obj = match current
|
||||
.and_then(|s| serde_json::from_str::<Value>(&s).ok())
|
||||
.and_then(|v| v.as_object().cloned())
|
||||
{
|
||||
Some(m) => m,
|
||||
None => JsonMap::new(),
|
||||
};
|
||||
|
||||
for (k, v) in patch {
|
||||
obj.insert(k, Value::String(v));
|
||||
}
|
||||
|
||||
let env_vars_str = Value::Object(obj).to_string();
|
||||
let ts = crate::time::current_timestamp();
|
||||
let pairs = vec![
|
||||
("env_vars".to_string(), env_vars_str),
|
||||
("updated_at".to_string(), ts.to_string()),
|
||||
];
|
||||
let _: () = cm.hset_multiple(&key, &pairs).await.map_err(|e| {
|
||||
error!(db=%db, key=%key, error=%e, "HSET update_flow_env_vars_merge failed");
|
||||
e
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flow: merge result map and bump updated_at
|
||||
pub async fn update_flow_result_merge(
|
||||
&self,
|
||||
db: u32,
|
||||
id: u32,
|
||||
patch: StdHashMap<String, String>,
|
||||
) -> Result<()> {
|
||||
let mut cm = self.manager_for_db(db).await?;
|
||||
let key = Self::flow_key(id);
|
||||
|
||||
let current: Option<String> = cm.hget(&key, "result").await.ok();
|
||||
let mut obj = match current
|
||||
.and_then(|s| serde_json::from_str::<Value>(&s).ok())
|
||||
.and_then(|v| v.as_object().cloned())
|
||||
{
|
||||
Some(m) => m,
|
||||
None => JsonMap::new(),
|
||||
};
|
||||
|
||||
for (k, v) in patch {
|
||||
obj.insert(k, Value::String(v));
|
||||
}
|
||||
|
||||
let result_str = Value::Object(obj).to_string();
|
||||
let ts = crate::time::current_timestamp();
|
||||
let pairs = vec![
|
||||
("result".to_string(), result_str),
|
||||
("updated_at".to_string(), ts.to_string()),
|
||||
];
|
||||
let _: () = cm.hset_multiple(&key, &pairs).await.map_err(|e| {
|
||||
error!(db=%db, key=%key, error=%e, "HSET update_flow_result_merge failed");
|
||||
e
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Job: merge env_vars map and bump updated_at
|
||||
pub async fn update_job_env_vars_merge(
|
||||
&self,
|
||||
db: u32,
|
||||
caller_id: u32,
|
||||
id: u32,
|
||||
patch: StdHashMap<String, String>,
|
||||
) -> Result<()> {
|
||||
let mut cm = self.manager_for_db(db).await?;
|
||||
let key = Self::job_key(caller_id, id);
|
||||
|
||||
let current: Option<String> = cm.hget(&key, "env_vars").await.ok();
|
||||
let mut obj = match current
|
||||
.and_then(|s| serde_json::from_str::<Value>(&s).ok())
|
||||
.and_then(|v| v.as_object().cloned())
|
||||
{
|
||||
Some(m) => m,
|
||||
None => JsonMap::new(),
|
||||
};
|
||||
|
||||
for (k, v) in patch {
|
||||
obj.insert(k, Value::String(v));
|
||||
}
|
||||
|
||||
let env_vars_str = Value::Object(obj).to_string();
|
||||
let ts = crate::time::current_timestamp();
|
||||
let pairs = vec![
|
||||
("env_vars".to_string(), env_vars_str),
|
||||
("updated_at".to_string(), ts.to_string()),
|
||||
];
|
||||
let _: () = cm.hset_multiple(&key, &pairs).await.map_err(|e| {
|
||||
error!(db=%db, key=%key, error=%e, "HSET update_job_env_vars_merge failed");
|
||||
e
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Job: merge result map and bump updated_at
|
||||
pub async fn update_job_result_merge(
|
||||
&self,
|
||||
db: u32,
|
||||
caller_id: u32,
|
||||
id: u32,
|
||||
patch: StdHashMap<String, String>,
|
||||
) -> Result<()> {
|
||||
let mut cm = self.manager_for_db(db).await?;
|
||||
let key = Self::job_key(caller_id, id);
|
||||
|
||||
let current: Option<String> = cm.hget(&key, "result").await.ok();
|
||||
let mut obj = match current
|
||||
.and_then(|s| serde_json::from_str::<Value>(&s).ok())
|
||||
.and_then(|v| v.as_object().cloned())
|
||||
{
|
||||
Some(m) => m,
|
||||
None => JsonMap::new(),
|
||||
};
|
||||
|
||||
for (k, v) in patch {
|
||||
obj.insert(k, Value::String(v));
|
||||
}
|
||||
|
||||
let result_str = Value::Object(obj).to_string();
|
||||
let ts = crate::time::current_timestamp();
|
||||
let pairs = vec![
|
||||
("result".to_string(), result_str),
|
||||
("updated_at".to_string(), ts.to_string()),
|
||||
];
|
||||
let _: () = cm.hset_multiple(&key, &pairs).await.map_err(|e| {
|
||||
error!(db=%db, key=%key, error=%e, "HSET update_job_result_merge failed");
|
||||
e
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Flow: set jobs list and bump updated_at
|
||||
pub async fn update_flow_jobs_set(&self, db: u32, id: u32, new_jobs: Vec<u32>) -> Result<()> {
|
||||
let mut cm = self.manager_for_db(db).await?;
|
||||
let key = Self::flow_key(id);
|
||||
|
||||
let jobs_str = serde_json::to_string(&new_jobs)?;
|
||||
let ts = crate::time::current_timestamp();
|
||||
let pairs = vec![
|
||||
("jobs".to_string(), jobs_str),
|
||||
("updated_at".to_string(), ts.to_string()),
|
||||
];
|
||||
let _: () = cm.hset_multiple(&key, &pairs).await.map_err(|e| {
|
||||
error!(db=%db, key=%key, error=%e, "HSET update_flow_jobs_set failed");
|
||||
e
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Message: append logs (no dedup) and bump updated_at
|
||||
pub async fn append_message_logs(
|
||||
&self,
|
||||
db: u32,
|
||||
caller_id: u32,
|
||||
id: u32,
|
||||
new_logs: Vec<String>,
|
||||
) -> Result<()> {
|
||||
let mut cm = self.manager_for_db(db).await?;
|
||||
let key = Self::message_key(caller_id, id);
|
||||
|
||||
let current: Option<String> = cm.hget(&key, "logs").await.ok();
|
||||
let mut arr: Vec<Value> = current
|
||||
.and_then(|s| serde_json::from_str::<Value>(&s).ok())
|
||||
.and_then(|v| v.as_array().cloned())
|
||||
.unwrap_or_default();
|
||||
|
||||
for l in new_logs {
|
||||
arr.push(Value::String(l));
|
||||
}
|
||||
|
||||
let logs_str = Value::Array(arr).to_string();
|
||||
let ts = crate::time::current_timestamp();
|
||||
let pairs = vec![
|
||||
("logs".to_string(), logs_str),
|
||||
("updated_at".to_string(), ts.to_string()),
|
||||
];
|
||||
let _: () = cm.hset_multiple(&key, &pairs).await.map_err(|e| {
|
||||
error!(db=%db, key=%key, error=%e, "HSET append_message_logs failed");
|
||||
e
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// -----------------------------
|
||||
// Queues (lists)
|
||||
// -----------------------------
|
||||
|
||||
/// Push a value onto a Redis list using LPUSH in the given DB.
|
||||
pub async fn lpush_list(&self, db: u32, list: &str, value: &str) -> Result<()> {
|
||||
let mut cm = self.manager_for_db(db).await?;
|
||||
let _: i64 = cm.lpush(list, value).await.map_err(|e| {
|
||||
error!(db=%db, list=%list, value=%value, error=%e, "LPUSH failed");
|
||||
e
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Enqueue a message key onto the outbound queue (msg_out).
|
||||
/// The value is the canonical message key "message:{caller_id}:{id}".
|
||||
pub async fn enqueue_msg_out(&self, db: u32, caller_id: u32, id: u32) -> Result<()> {
|
||||
let key = Self::message_key(caller_id, id);
|
||||
self.lpush_list(db, "msg_out", &key).await
|
||||
}
|
||||
|
||||
/// Block-pop from msg_out with timeout (seconds). Returns the message key if present.
|
||||
/// Uses BRPOP so that the queue behaves FIFO with LPUSH producer.
|
||||
pub async fn brpop_msg_out(&self, db: u32, timeout_secs: usize) -> Result<Option<String>> {
|
||||
let mut cm = self.manager_for_db(db).await?;
|
||||
// BRPOP returns (list, element) on success
|
||||
let res: Option<(String, String)> = redis::cmd("BRPOP")
|
||||
.arg("msg_out")
|
||||
.arg(timeout_secs)
|
||||
.query_async(&mut cm)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!(db=%db, timeout_secs=%timeout_secs, error=%e, "BRPOP failed");
|
||||
e
|
||||
})?;
|
||||
Ok(res.map(|(_, v)| v))
|
||||
}
|
||||
|
||||
/// Scan all runner:* keys in this DB and return the deserialized Runner entries.
|
||||
pub async fn scan_runners(&self, db: u32) -> Result<Vec<Runner>> {
|
||||
let mut cm = self.manager_for_db(db).await?;
|
||||
let mut out: Vec<Runner> = Vec::new();
|
||||
let mut cursor: u64 = 0;
|
||||
loop {
|
||||
let (next, keys): (u64, Vec<String>) = redis::cmd("SCAN")
|
||||
.arg(cursor)
|
||||
.arg("MATCH")
|
||||
.arg("runner:*")
|
||||
.arg("COUNT")
|
||||
.arg(100)
|
||||
.query_async(&mut cm)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!(db=%db, cursor=%cursor, error=%e, "SCAN failed");
|
||||
e
|
||||
})?;
|
||||
for k in keys {
|
||||
if let Ok(r) = self.hget_model::<Runner>(db, &k).await {
|
||||
out.push(r);
|
||||
}
|
||||
}
|
||||
if next == 0 {
|
||||
break;
|
||||
}
|
||||
cursor = next;
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
// -----------------------------
|
||||
// Global registry (DB 0) for Context IDs
|
||||
// -----------------------------
|
||||
|
||||
/// Register a context id in the global set "contexts" stored in DB 0.
|
||||
pub async fn register_context_id(&self, id: u32) -> Result<()> {
|
||||
let mut cm = self.manager_for_db(0).await?;
|
||||
let _: i64 = redis::cmd("SADD")
|
||||
.arg("contexts")
|
||||
.arg(id)
|
||||
.query_async(&mut cm)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!(db=0, context_id=%id, error=%e, "SADD contexts failed");
|
||||
e
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// List all registered context ids from the global set in DB 0.
|
||||
pub async fn list_context_ids(&self) -> Result<Vec<u32>> {
|
||||
let mut cm = self.manager_for_db(0).await?;
|
||||
// Using SMEMBERS and parsing into u32
|
||||
let vals: Vec<String> = redis::cmd("SMEMBERS")
|
||||
.arg("contexts")
|
||||
.query_async(&mut cm)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!(db=0, error=%e, "SMEMBERS contexts failed");
|
||||
e
|
||||
})?;
|
||||
let mut out = Vec::with_capacity(vals.len());
|
||||
for v in vals {
|
||||
if let Ok(n) = v.parse::<u32>() {
|
||||
out.push(n);
|
||||
}
|
||||
}
|
||||
out.sort_unstable();
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
// -----------------------------
|
||||
// Supervisor correlation mapping (DB 0)
|
||||
// Key: "supcorr:{inner_id_decimal}"
|
||||
// Value: JSON {"context_id":u32,"caller_id":u32,"job_id":u32,"message_id":u32}
|
||||
// TTL: 1 hour to avoid leaks in case of crashes
|
||||
pub async fn supcorr_set(
|
||||
&self,
|
||||
inner_id: u64,
|
||||
context_id: u32,
|
||||
caller_id: u32,
|
||||
job_id: u32,
|
||||
message_id: u32,
|
||||
) -> Result<()> {
|
||||
let mut cm = self.manager_for_db(0).await?;
|
||||
let key = format!("supcorr:{}", inner_id);
|
||||
let val = serde_json::json!({
|
||||
"context_id": context_id,
|
||||
"caller_id": caller_id,
|
||||
"job_id": job_id,
|
||||
"message_id": message_id,
|
||||
})
|
||||
.to_string();
|
||||
// SET key val EX 3600
|
||||
let _: () = redis::cmd("SET")
|
||||
.arg(&key)
|
||||
.arg(&val)
|
||||
.arg("EX")
|
||||
.arg(3600)
|
||||
.query_async(&mut cm)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!(db=0, key=%key, error=%e, "SET supcorr_set failed");
|
||||
e
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn supcorr_get(&self, inner_id: u64) -> Result<Option<(u32, u32, u32, u32)>> {
|
||||
let mut cm = self.manager_for_db(0).await?;
|
||||
let key = format!("supcorr:{}", inner_id);
|
||||
let res: Option<String> = redis::cmd("GET")
|
||||
.arg(&key)
|
||||
.query_async(&mut cm)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!(db=0, key=%key, error=%e, "GET supcorr_get failed");
|
||||
e
|
||||
})?;
|
||||
if let Some(s) = res {
|
||||
let v: Value = serde_json::from_str(&s)?;
|
||||
let ctx = v.get("context_id").and_then(|x| x.as_u64()).unwrap_or(0) as u32;
|
||||
let caller = v.get("caller_id").and_then(|x| x.as_u64()).unwrap_or(0) as u32;
|
||||
let job = v.get("job_id").and_then(|x| x.as_u64()).unwrap_or(0) as u32;
|
||||
let msg = v.get("message_id").and_then(|x| x.as_u64()).unwrap_or(0) as u32;
|
||||
return Ok(Some((ctx, caller, job, msg)));
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub async fn supcorr_del(&self, inner_id: u64) -> Result<()> {
|
||||
let mut cm = self.manager_for_db(0).await?;
|
||||
let key = format!("supcorr:{}", inner_id);
|
||||
let _: i64 = redis::cmd("DEL")
|
||||
.arg(&key)
|
||||
.query_async(&mut cm)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!(db=0, key=%key, error=%e, "DEL supcorr_del failed");
|
||||
e
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user