Add redis driver

Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
Lee Smet
2025-08-20 15:24:55 +02:00
parent 9830abc2dc
commit 2531885b69
10 changed files with 747 additions and 1 deletions

View File

@@ -1,2 +1,6 @@
pub mod models;
pub mod redis_driver;
mod time;
pub use redis_driver::RedisDriver;
pub use redis_driver::Result as RedisResult;

View File

@@ -13,3 +13,13 @@ pub struct Actor {
created_at: Timestamp,
updated_at: Timestamp,
}
impl Actor {
pub fn redis_key(&self) -> String {
format!("actor:{}", self.id)
}
pub fn key(id: u32) -> String {
format!("actor:{}", id)
}
}

View File

@@ -15,3 +15,13 @@ pub struct Context {
created_at: Timestamp,
upddated_at: Timestamp,
}
impl Context {
pub fn redis_key(&self) -> String {
format!("context:{}", self.id)
}
pub fn key(id: u32) -> String {
format!("context:{}", id)
}
}

View File

@@ -31,3 +31,13 @@ pub enum FlowStatus {
Error,
Finished,
}
impl Flow {
pub fn redis_key(&self) -> String {
format!("flow:{}", self.id)
}
pub fn key(id: u32) -> String {
format!("flow:{}", id)
}
}

View File

@@ -36,3 +36,13 @@ pub enum JobStatus {
Error,
Finished,
}
impl Job {
pub fn redis_key(&self) -> String {
format!("job:{}:{}", self.caller_id, self.id)
}
pub fn key(caller_id: u32, id: u32) -> String {
format!("job:{}:{}", caller_id, id)
}
}

View File

@@ -52,3 +52,13 @@ pub enum MessageFormatType {
}
type Log = String;
impl Message {
pub fn redis_key(&self) -> String {
format!("message:{}:{}", self.caller_id, self.id)
}
pub fn key(caller_id: u32, id: u32) -> String {
format!("message:{}:{}", caller_id, id)
}
}

View File

@@ -26,3 +26,13 @@ pub enum RunnerType {
Osis,
Rust,
}
impl Runner {
pub fn redis_key(&self) -> String {
format!("runner:{}", self.id)
}
pub fn key(id: u32) -> String {
format!("runner:{}", id)
}
}

115
src/redis_driver.rs Normal file
View File

@@ -0,0 +1,115 @@
use std::net::SocketAddr;
use redis::{AsyncCommands, Client, RedisError, aio::ConnectionManager};
use serde::{Serialize, de::DeserializeOwned};
use crate::models::{Actor, Context, Flow, Job, Message, Runner};
pub type Result<T> = std::result::Result<T, RedisError>;
/// Async Redis driver that stores models as a single JSON blob under HSET field "data".
/// Keys follow the canonical patterns defined in specs/models.md.
pub struct RedisDriver {
manager: ConnectionManager,
}
impl RedisDriver {
/// Connect using a full Redis URL, e.g. "redis://127.0.0.1:6379".
pub async fn connect_url(url: impl AsRef<str>) -> Result<Self> {
let client = Client::open(url.as_ref())?;
let conn = client.get_connection_manager().await?;
Ok(Self { manager: conn })
}
/// Connect using a SocketAddr, e.g. 127.0.0.1:6379.
pub async fn connect_addr(addr: SocketAddr) -> Result<Self> {
Self::connect_url(Self::url_from_addr(addr)).await
}
fn url_from_addr(addr: SocketAddr) -> String {
format!("redis://{}", addr)
}
/// Save any Serialize value as JSON into HSET key field "data".
pub async fn save_json<T: Serialize>(&self, key: &str, value: &T) -> Result<()> {
let json = serde_json::to_string(value).map_err(|e| {
RedisError::from((
redis::ErrorKind::TypeError,
"serde_json::to_string failed",
format!("{}", e),
))
})?;
let mut conn = self.manager.clone();
let _: () = conn.hset(key, "data", json).await?;
Ok(())
}
/// Load any Deserialize value from HGET key field "data".
pub async fn load_json<T: DeserializeOwned>(&self, key: &str) -> Result<T> {
let mut conn = self.manager.clone();
let json: String = conn.hget(key, "data").await?;
let value = serde_json::from_str::<T>(&json).map_err(|e| {
RedisError::from((
redis::ErrorKind::TypeError,
"serde_json::from_str failed",
format!("{}", e),
))
})?;
Ok(value)
}
// Actor
pub async fn save_actor(&self, actor: &Actor) -> Result<()> {
self.save_json(&actor.redis_key(), actor).await
}
pub async fn load_actor(&self, id: u32) -> Result<Actor> {
self.load_json(&Actor::key(id)).await
}
// Context
pub async fn save_context(&self, ctx: &Context) -> Result<()> {
self.save_json(&ctx.redis_key(), ctx).await
}
pub async fn load_context(&self, id: u32) -> Result<Context> {
self.load_json(&Context::key(id)).await
}
// Flow
pub async fn save_flow(&self, flow: &Flow) -> Result<()> {
self.save_json(&flow.redis_key(), flow).await
}
pub async fn load_flow(&self, id: u32) -> Result<Flow> {
self.load_json(&Flow::key(id)).await
}
// Runner
pub async fn save_runner(&self, runner: &Runner) -> Result<()> {
self.save_json(&runner.redis_key(), runner).await
}
pub async fn load_runner(&self, id: u32) -> Result<Runner> {
self.load_json(&Runner::key(id)).await
}
// Job
pub async fn save_job(&self, job: &Job) -> Result<()> {
self.save_json(&job.redis_key(), job).await
}
pub async fn load_job(&self, caller_id: u32, id: u32) -> Result<Job> {
self.load_json(&Job::key(caller_id, id)).await
}
// Message
pub async fn save_message(&self, msg: &Message) -> Result<()> {
self.save_json(&msg.redis_key(), msg).await
}
pub async fn load_message(&self, caller_id: u32, id: u32) -> Result<Message> {
self.load_json(&Message::key(caller_id, id)).await
}
}