BLPOP + COMMAND + MGET/MSET + DEL/EXISTS + EXPIRE/PEXPIRE/PERSIST + HINCRBY/HINCRBYFLOAT + BRPOP + DBSIZE + EXPIREAT/PEXIREAT implementations #1
@@ -17,6 +17,7 @@ pub enum Cmd {
 | 
			
		||||
    MGet(Vec<String>),
 | 
			
		||||
    MSet(Vec<(String, String)>),
 | 
			
		||||
    Keys,
 | 
			
		||||
    DbSize,
 | 
			
		||||
    ConfigGet(String),
 | 
			
		||||
    Info(Option<String>),
 | 
			
		||||
    Del(String),
 | 
			
		||||
@@ -191,6 +192,12 @@ impl Cmd {
 | 
			
		||||
                                Cmd::Keys
 | 
			
		||||
                            }
 | 
			
		||||
                        }
 | 
			
		||||
                        "dbsize" => {
 | 
			
		||||
                            if cmd.len() != 1 {
 | 
			
		||||
                                return Err(DBError(format!("wrong number of arguments for DBSIZE command")));
 | 
			
		||||
                            }
 | 
			
		||||
                            Cmd::DbSize
 | 
			
		||||
                        }
 | 
			
		||||
                        "info" => {
 | 
			
		||||
                            let section = if cmd.len() == 2 {
 | 
			
		||||
                                Some(cmd[1].clone())
 | 
			
		||||
@@ -634,6 +641,7 @@ impl Cmd {
 | 
			
		||||
            Cmd::DelMulti(keys) => del_multi_cmd(server, &keys).await,
 | 
			
		||||
            Cmd::ConfigGet(name) => config_get_cmd(&name, server),
 | 
			
		||||
            Cmd::Keys => keys_cmd(server).await,
 | 
			
		||||
            Cmd::DbSize => dbsize_cmd(server).await,
 | 
			
		||||
            Cmd::Info(section) => info_cmd(server, §ion).await,
 | 
			
		||||
            Cmd::Type(k) => type_cmd(server, &k).await,
 | 
			
		||||
            Cmd::Incr(key) => incr_cmd(server, &key).await,
 | 
			
		||||
@@ -1060,6 +1068,13 @@ async fn keys_cmd(server: &Server) -> Result<Protocol, DBError> {
 | 
			
		||||
    ))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn dbsize_cmd(server: &Server) -> Result<Protocol, DBError> {
 | 
			
		||||
    match server.current_storage()?.dbsize() {
 | 
			
		||||
        Ok(n) => Ok(Protocol::SimpleString(n.to_string())),
 | 
			
		||||
        Err(e) => Ok(Protocol::err(&e.0)),
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[derive(Serialize)]
 | 
			
		||||
struct ServerInfo {
 | 
			
		||||
    redis_version: String,
 | 
			
		||||
 
 | 
			
		||||
@@ -215,4 +215,31 @@ impl Storage {
 | 
			
		||||
        
 | 
			
		||||
        Ok(keys)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Storage {
 | 
			
		||||
    pub fn dbsize(&self) -> Result<i64, DBError> {
 | 
			
		||||
        let read_txn = self.db.begin_read()?;
 | 
			
		||||
        let types_table = read_txn.open_table(TYPES_TABLE)?;
 | 
			
		||||
        let expiration_table = read_txn.open_table(EXPIRATION_TABLE)?;
 | 
			
		||||
 | 
			
		||||
        let mut count: i64 = 0;
 | 
			
		||||
        let mut iter = types_table.iter()?;
 | 
			
		||||
        while let Some(entry) = iter.next() {
 | 
			
		||||
            let entry = entry?;
 | 
			
		||||
            let key = entry.0.value();
 | 
			
		||||
            let ty = entry.1.value();
 | 
			
		||||
 | 
			
		||||
            if ty == "string" {
 | 
			
		||||
                if let Some(expires_at) = expiration_table.get(key)? {
 | 
			
		||||
                    if now_in_millis() > expires_at.value() as u128 {
 | 
			
		||||
                        // Skip logically expired string keys
 | 
			
		||||
                        continue;
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            count += 1;
 | 
			
		||||
        }
 | 
			
		||||
        Ok(count)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@@ -815,4 +815,38 @@ async fn test_05b_brpop_suite() {
 | 
			
		||||
    let brpop_res = brpop_task.await.expect("BRPOP task join");
 | 
			
		||||
    assert_contains(&brpop_res, "q:blockr", "BRPOP returned key");
 | 
			
		||||
    assert_contains(&brpop_res, "X", "BRPOP returned element");
 | 
			
		||||
}
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn test_13_dbsize() {
 | 
			
		||||
    let (server, port) = start_test_server("dbsize").await;
 | 
			
		||||
    spawn_listener(server, port).await;
 | 
			
		||||
    sleep(Duration::from_millis(150)).await;
 | 
			
		||||
 | 
			
		||||
    let mut s = connect(port).await;
 | 
			
		||||
 | 
			
		||||
    // Initially empty
 | 
			
		||||
    let n0 = send_cmd(&mut s, &["DBSIZE"]).await;
 | 
			
		||||
    assert_contains(&n0, "0", "DBSIZE initial should be 0");
 | 
			
		||||
 | 
			
		||||
    // Add a string, a hash, and a list -> dbsize = 3
 | 
			
		||||
    let _ = send_cmd(&mut s, &["SET", "s", "v"]).await;
 | 
			
		||||
    let _ = send_cmd(&mut s, &["HSET", "h", "f", "v"]).await;
 | 
			
		||||
    let _ = send_cmd(&mut s, &["LPUSH", "l", "a", "b"]).await;
 | 
			
		||||
 | 
			
		||||
    let n3 = send_cmd(&mut s, &["DBSIZE"]).await;
 | 
			
		||||
    assert_contains(&n3, "3", "DBSIZE after adding s,h,l should be 3");
 | 
			
		||||
 | 
			
		||||
    // Expire the string and wait, dbsize should drop to 2
 | 
			
		||||
    let _ = send_cmd(&mut s, &["PEXPIRE", "s", "400"]).await;
 | 
			
		||||
    sleep(Duration::from_millis(500)).await;
 | 
			
		||||
 | 
			
		||||
    let n2 = send_cmd(&mut s, &["DBSIZE"]).await;
 | 
			
		||||
    assert_contains(&n2, "2", "DBSIZE after string expiry should be 2");
 | 
			
		||||
 | 
			
		||||
    // Delete remaining keys and confirm 0
 | 
			
		||||
    let _ = send_cmd(&mut s, &["DEL", "h"]).await;
 | 
			
		||||
    let _ = send_cmd(&mut s, &["DEL", "l"]).await;
 | 
			
		||||
 | 
			
		||||
    let n_final = send_cmd(&mut s, &["DBSIZE"]).await;
 | 
			
		||||
    assert_contains(&n_final, "0", "DBSIZE after deleting all keys should be 0");
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user