Add redis storage driver
Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
		
							
								
								
									
										278
									
								
								src/storage/redis.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										278
									
								
								src/storage/redis.rs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,278 @@
 | 
			
		||||
use std::collections::HashMap as StdHashMap;
 | 
			
		||||
 | 
			
		||||
use redis::{AsyncCommands, aio::ConnectionManager};
 | 
			
		||||
use serde::Serialize;
 | 
			
		||||
use serde::de::DeserializeOwned;
 | 
			
		||||
use serde_json::{Map as JsonMap, Value};
 | 
			
		||||
use tokio::sync::Mutex;
 | 
			
		||||
 | 
			
		||||
use crate::models::{Actor, Context, Flow, Job, Message, Runner};
 | 
			
		||||
 | 
			
		||||
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
 | 
			
		||||
 | 
			
		||||
/// Async Redis driver that saves/loads every model as a Redis hash (HSET),
 | 
			
		||||
/// using canonical keys as specified in the specs.
 | 
			
		||||
/// - Complex fields (arrays, maps, nested structs) are JSON-encoded per field
 | 
			
		||||
/// - Scalars are written as plain strings (numbers/bools as their string representation)
 | 
			
		||||
/// - On load, each field value is first attempted to parse as JSON; if that fails it is treated as a plain string
 | 
			
		||||
pub struct RedisDriver {
 | 
			
		||||
    /// Base address, e.g. "127.0.0.1:6379" or "redis://127.0.0.1:6379"
 | 
			
		||||
    base_addr: String,
 | 
			
		||||
    /// Cache of connection managers per DB index
 | 
			
		||||
    managers: Mutex<StdHashMap<u32, ConnectionManager>>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl RedisDriver {
 | 
			
		||||
    /// Create a new driver for the given Redis address.
 | 
			
		||||
    /// Accepts either "host:port" or "redis://host:port"
 | 
			
		||||
    pub async fn new(addr: impl Into<String>) -> Result<Self> {
 | 
			
		||||
        let raw = addr.into();
 | 
			
		||||
        let base_addr = if raw.starts_with("redis://") {
 | 
			
		||||
            raw
 | 
			
		||||
        } else {
 | 
			
		||||
            format!("redis://{}", raw)
 | 
			
		||||
        };
 | 
			
		||||
        Ok(Self {
 | 
			
		||||
            base_addr,
 | 
			
		||||
            managers: Mutex::new(StdHashMap::new()),
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Get or create a ConnectionManager for the given DB index.
 | 
			
		||||
    async fn manager_for_db(&self, db: u32) -> Result<ConnectionManager> {
 | 
			
		||||
        {
 | 
			
		||||
            // Fast path: check existing
 | 
			
		||||
            let guard = self.managers.lock().await;
 | 
			
		||||
            if let Some(cm) = guard.get(&db) {
 | 
			
		||||
                return Ok(cm.clone());
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // Slow path: create a new manager and cache it
 | 
			
		||||
        let url = format!("{}/{}", self.base_addr.trim_end_matches('/'), db);
 | 
			
		||||
        let client = redis::Client::open(url.as_str())?;
 | 
			
		||||
        let cm = client.get_connection_manager().await?;
 | 
			
		||||
 | 
			
		||||
        let mut guard = self.managers.lock().await;
 | 
			
		||||
        let entry = guard.entry(db).or_insert(cm);
 | 
			
		||||
        Ok(entry.clone())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
    // Generic helpers (serde <-> HSET)
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
 | 
			
		||||
    fn struct_to_hset_pairs<T: Serialize>(value: &T) -> Result<Vec<(String, String)>> {
 | 
			
		||||
        let json = serde_json::to_value(value)?;
 | 
			
		||||
        let obj = json
 | 
			
		||||
            .as_object()
 | 
			
		||||
            .ok_or("Model must serialize to a JSON object")?;
 | 
			
		||||
        let mut pairs = Vec::with_capacity(obj.len());
 | 
			
		||||
        for (k, v) in obj {
 | 
			
		||||
            let s = match v {
 | 
			
		||||
                Value::Array(_) | Value::Object(_) => serde_json::to_string(v)?, // complex - store JSON
 | 
			
		||||
                Value::String(s) => s.clone(),                                   // string - plain
 | 
			
		||||
                Value::Number(n) => n.to_string(),                               // number - plain
 | 
			
		||||
                Value::Bool(b) => b.to_string(),                                 // bool - plain
 | 
			
		||||
                Value::Null => "null".to_string(),                               // null sentinel
 | 
			
		||||
            };
 | 
			
		||||
            pairs.push((k.clone(), s));
 | 
			
		||||
        }
 | 
			
		||||
        Ok(pairs)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn hmap_to_struct<T: DeserializeOwned>(map: StdHashMap<String, String>) -> Result<T> {
 | 
			
		||||
        let mut obj = JsonMap::with_capacity(map.len());
 | 
			
		||||
        for (k, s) in map {
 | 
			
		||||
            // Try parse as JSON first (works for arrays, objects, numbers, booleans, null)
 | 
			
		||||
            // If that fails, fallback to string.
 | 
			
		||||
            match serde_json::from_str::<Value>(&s) {
 | 
			
		||||
                Ok(v) => {
 | 
			
		||||
                    obj.insert(k, v);
 | 
			
		||||
                }
 | 
			
		||||
                Err(_) => {
 | 
			
		||||
                    obj.insert(k, Value::String(s));
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        let json = Value::Object(obj);
 | 
			
		||||
        let model = serde_json::from_value(json)?;
 | 
			
		||||
        Ok(model)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn hset_model<T: Serialize>(&self, db: u32, key: &str, model: &T) -> Result<()> {
 | 
			
		||||
        let mut cm = self.manager_for_db(db).await?;
 | 
			
		||||
        let pairs = Self::struct_to_hset_pairs(model)?;
 | 
			
		||||
        // Ensure no stale fields
 | 
			
		||||
        let _: u64 = cm.del(key).await.unwrap_or(0);
 | 
			
		||||
        // Write all fields
 | 
			
		||||
        let _: usize = cm.hset_multiple(key, &pairs).await?;
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn hget_model<T: DeserializeOwned>(&self, db: u32, key: &str) -> Result<T> {
 | 
			
		||||
        let mut cm = self.manager_for_db(db).await?;
 | 
			
		||||
        let map: StdHashMap<String, String> = cm.hgetall(key).await?;
 | 
			
		||||
        if map.is_empty() {
 | 
			
		||||
            return Err(format!("Key not found: {}", key).into());
 | 
			
		||||
        }
 | 
			
		||||
        Self::hmap_to_struct(map)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
    // Key helpers (canonical keys)
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
 | 
			
		||||
    fn actor_key(id: u32) -> String {
 | 
			
		||||
        format!("actor:{}", id)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn context_key(id: u32) -> String {
 | 
			
		||||
        format!("context:{}", id)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn flow_key(id: u32) -> String {
 | 
			
		||||
        format!("flow:{}", id)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn runner_key(id: u32) -> String {
 | 
			
		||||
        format!("runner:{}", id)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn job_key(caller_id: u32, id: u32) -> String {
 | 
			
		||||
        format!("job:{}:{}", caller_id, id)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn message_key(caller_id: u32, id: u32) -> String {
 | 
			
		||||
        format!("message:{}:{}", caller_id, id)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
    // Context (DB = context.id)
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
 | 
			
		||||
    /// Save a Context in its own DB (db index = context.id)
 | 
			
		||||
    pub async fn save_context(&self, ctx: &Context) -> Result<()> {
 | 
			
		||||
        // We don't have field access; compute db and key via JSON to avoid changing model definitions.
 | 
			
		||||
        // Extract "id" from serialized JSON object.
 | 
			
		||||
        let json = serde_json::to_value(ctx)?;
 | 
			
		||||
        let id = json
 | 
			
		||||
            .get("id")
 | 
			
		||||
            .and_then(|v| v.as_u64())
 | 
			
		||||
            .ok_or("Context.id missing or not a number")? as u32;
 | 
			
		||||
        let key = Self::context_key(id);
 | 
			
		||||
        self.hset_model(id, &key, ctx).await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Load a Context from its own DB (db index = id)
 | 
			
		||||
    pub async fn load_context(&self, id: u32) -> Result<Context> {
 | 
			
		||||
        let key = Self::context_key(id);
 | 
			
		||||
        self.hget_model(id, &key).await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
    // Actor
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
 | 
			
		||||
    /// Save an Actor to the given DB (tenant/context DB)
 | 
			
		||||
    pub async fn save_actor(&self, db: u32, actor: &Actor) -> Result<()> {
 | 
			
		||||
        let json = serde_json::to_value(actor)?;
 | 
			
		||||
        let id = json
 | 
			
		||||
            .get("id")
 | 
			
		||||
            .and_then(|v| v.as_u64())
 | 
			
		||||
            .ok_or("Actor.id missing or not a number")? as u32;
 | 
			
		||||
        let key = Self::actor_key(id);
 | 
			
		||||
        self.hset_model(db, &key, actor).await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Load an Actor by id from the given DB
 | 
			
		||||
    pub async fn load_actor(&self, db: u32, id: u32) -> Result<Actor> {
 | 
			
		||||
        let key = Self::actor_key(id);
 | 
			
		||||
        self.hget_model(db, &key).await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
    // Runner
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
 | 
			
		||||
    pub async fn save_runner(&self, db: u32, runner: &Runner) -> Result<()> {
 | 
			
		||||
        let json = serde_json::to_value(runner)?;
 | 
			
		||||
        let id = json
 | 
			
		||||
            .get("id")
 | 
			
		||||
            .and_then(|v| v.as_u64())
 | 
			
		||||
            .ok_or("Runner.id missing or not a number")? as u32;
 | 
			
		||||
        let key = Self::runner_key(id);
 | 
			
		||||
        self.hset_model(db, &key, runner).await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn load_runner(&self, db: u32, id: u32) -> Result<Runner> {
 | 
			
		||||
        let key = Self::runner_key(id);
 | 
			
		||||
        self.hget_model(db, &key).await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
    // Flow
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
 | 
			
		||||
    pub async fn save_flow(&self, db: u32, flow: &Flow) -> Result<()> {
 | 
			
		||||
        let json = serde_json::to_value(flow)?;
 | 
			
		||||
        let id = json
 | 
			
		||||
            .get("id")
 | 
			
		||||
            .and_then(|v| v.as_u64())
 | 
			
		||||
            .ok_or("Flow.id missing or not a number")? as u32;
 | 
			
		||||
        let key = Self::flow_key(id);
 | 
			
		||||
        self.hset_model(db, &key, flow).await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn load_flow(&self, db: u32, id: u32) -> Result<Flow> {
 | 
			
		||||
        let key = Self::flow_key(id);
 | 
			
		||||
        self.hget_model(db, &key).await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
    // Job
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
 | 
			
		||||
    pub async fn save_job(&self, db: u32, job: &Job) -> Result<()> {
 | 
			
		||||
        let json = serde_json::to_value(job)?;
 | 
			
		||||
        let id = json
 | 
			
		||||
            .get("id")
 | 
			
		||||
            .and_then(|v| v.as_u64())
 | 
			
		||||
            .ok_or("Job.id missing or not a number")? as u32;
 | 
			
		||||
        let caller_id = json
 | 
			
		||||
            .get("caller_id")
 | 
			
		||||
            .and_then(|v| v.as_u64())
 | 
			
		||||
            .ok_or("Job.caller_id missing or not a number")? as u32;
 | 
			
		||||
        let key = Self::job_key(caller_id, id);
 | 
			
		||||
        self.hset_model(db, &key, job).await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn load_job(&self, db: u32, caller_id: u32, id: u32) -> Result<Job> {
 | 
			
		||||
        let key = Self::job_key(caller_id, id);
 | 
			
		||||
        self.hget_model(db, &key).await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
    // Message
 | 
			
		||||
    // -----------------------------
 | 
			
		||||
 | 
			
		||||
    pub async fn save_message(&self, db: u32, message: &Message) -> Result<()> {
 | 
			
		||||
        let json = serde_json::to_value(message)?;
 | 
			
		||||
        let id = json
 | 
			
		||||
            .get("id")
 | 
			
		||||
            .and_then(|v| v.as_u64())
 | 
			
		||||
            .ok_or("Message.id missing or not a number")? as u32;
 | 
			
		||||
        let caller_id = json
 | 
			
		||||
            .get("caller_id")
 | 
			
		||||
            .and_then(|v| v.as_u64())
 | 
			
		||||
            .ok_or("Message.caller_id missing or not a number")? as u32;
 | 
			
		||||
        let key = Self::message_key(caller_id, id);
 | 
			
		||||
        self.hset_model(db, &key, message).await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn load_message(&self, db: u32, caller_id: u32, id: u32) -> Result<Message> {
 | 
			
		||||
        let key = Self::message_key(caller_id, id);
 | 
			
		||||
        self.hget_model(db, &key).await
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user