893 lines
		
	
	
		
			34 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
			
		
		
	
	
			893 lines
		
	
	
		
			34 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
use herodb::{options::DBOption, server::Server};
 | 
						|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
 | 
						|
use tokio::net::TcpStream;
 | 
						|
use tokio::time::{sleep, Duration};
 | 
						|
 | 
						|
// =========================
 | 
						|
// Helpers
 | 
						|
// =========================
 | 
						|
 | 
						|
async fn start_test_server(test_name: &str) -> (Server, u16) {
 | 
						|
    use std::sync::atomic::{AtomicU16, Ordering};
 | 
						|
    static PORT_COUNTER: AtomicU16 = AtomicU16::new(17100);
 | 
						|
    let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst);
 | 
						|
 | 
						|
    let test_dir = format!("/tmp/herodb_usage_suite_{}", test_name);
 | 
						|
    let _ = std::fs::remove_dir_all(&test_dir);
 | 
						|
    std::fs::create_dir_all(&test_dir).unwrap();
 | 
						|
 | 
						|
    let option = DBOption {
 | 
						|
        dir: test_dir,
 | 
						|
        port,
 | 
						|
        debug: false,
 | 
						|
        encrypt: false,
 | 
						|
        encryption_key: None,
 | 
						|
        backend: herodb::options::BackendType::Redb,
 | 
						|
    };
 | 
						|
 | 
						|
    let server = Server::new(option).await;
 | 
						|
    (server, port)
 | 
						|
}
 | 
						|
 | 
						|
async fn spawn_listener(server: Server, port: u16) {
 | 
						|
    tokio::spawn(async move {
 | 
						|
        let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
 | 
						|
            .await
 | 
						|
            .expect("bind listener");
 | 
						|
        loop {
 | 
						|
            match listener.accept().await {
 | 
						|
                Ok((stream, _)) => {
 | 
						|
                    let mut s_clone = server.clone();
 | 
						|
                    tokio::spawn(async move {
 | 
						|
                        let _ = s_clone.handle(stream).await;
 | 
						|
                    });
 | 
						|
                }
 | 
						|
                Err(_e) => break,
 | 
						|
            }
 | 
						|
        }
 | 
						|
    });
 | 
						|
}
 | 
						|
 | 
						|
/// Build RESP array for args ["PING"] -> "*1\r\n$4\r\nPING\r\n"
 | 
						|
fn build_resp(args: &[&str]) -> String {
 | 
						|
    let mut s = format!("*{}\r\n", args.len());
 | 
						|
    for a in args {
 | 
						|
        s.push_str(&format!("${}\r\n{}\r\n", a.len(), a));
 | 
						|
    }
 | 
						|
    s
 | 
						|
}
 | 
						|
 | 
						|
async fn connect(port: u16) -> TcpStream {
 | 
						|
    let mut attempts = 0;
 | 
						|
    loop {
 | 
						|
        match TcpStream::connect(format!("127.0.0.1:{}", port)).await {
 | 
						|
            Ok(s) => return s,
 | 
						|
            Err(_) if attempts < 30 => {
 | 
						|
                attempts += 1;
 | 
						|
                sleep(Duration::from_millis(100)).await;
 | 
						|
            }
 | 
						|
            Err(e) => panic!("Failed to connect: {}", e),
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
fn find_crlf(buf: &[u8], start: usize) -> Option<usize> {
 | 
						|
    let mut i = start;
 | 
						|
    while i + 1 < buf.len() {
 | 
						|
        if buf[i] == b'\r' && buf[i + 1] == b'\n' {
 | 
						|
            return Some(i);
 | 
						|
        }
 | 
						|
        i += 1;
 | 
						|
    }
 | 
						|
    None
 | 
						|
}
 | 
						|
 | 
						|
fn parse_number_i64(buf: &[u8], start: usize, end: usize) -> Option<i64> {
 | 
						|
    let s = std::str::from_utf8(&buf[start..end]).ok()?;
 | 
						|
    s.parse::<i64>().ok()
 | 
						|
}
 | 
						|
 | 
						|
// Return number of bytes that make up a complete RESP element starting at 'i', or None if incomplete.
 | 
						|
fn parse_elem(buf: &[u8], i: usize) -> Option<usize> {
 | 
						|
    if i >= buf.len() {
 | 
						|
        return None;
 | 
						|
    }
 | 
						|
    match buf[i] {
 | 
						|
        b'+' | b'-' | b':' => {
 | 
						|
            let end = find_crlf(buf, i + 1)?;
 | 
						|
            Some(end + 2 - i)
 | 
						|
        }
 | 
						|
        b'$' => {
 | 
						|
            let hdr_end = find_crlf(buf, i + 1)?;
 | 
						|
            let n = parse_number_i64(buf, i + 1, hdr_end)?;
 | 
						|
            if n < 0 {
 | 
						|
                // Null bulk string: only header
 | 
						|
                Some(hdr_end + 2 - i)
 | 
						|
            } else {
 | 
						|
                let need = hdr_end + 2 + (n as usize) + 2;
 | 
						|
                if need <= buf.len() {
 | 
						|
                    Some(need - i)
 | 
						|
                } else {
 | 
						|
                    None
 | 
						|
                }
 | 
						|
            }
 | 
						|
        }
 | 
						|
        b'*' => {
 | 
						|
            let hdr_end = find_crlf(buf, i + 1)?;
 | 
						|
            let n = parse_number_i64(buf, i + 1, hdr_end)?;
 | 
						|
            if n < 0 {
 | 
						|
                // Null array: only header
 | 
						|
                Some(hdr_end + 2 - i)
 | 
						|
            } else {
 | 
						|
                let mut j = hdr_end + 2;
 | 
						|
                for _ in 0..(n as usize) {
 | 
						|
                    let consumed = parse_elem(buf, j)?;
 | 
						|
                    j += consumed;
 | 
						|
                }
 | 
						|
                Some(j - i)
 | 
						|
            }
 | 
						|
        }
 | 
						|
        _ => None,
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
fn resp_frame_len(buf: &[u8]) -> Option<usize> {
 | 
						|
    parse_elem(buf, 0)
 | 
						|
}
 | 
						|
 | 
						|
async fn read_full_resp(stream: &mut TcpStream) -> String {
 | 
						|
    let mut buf: Vec<u8> = Vec::with_capacity(8192);
 | 
						|
    let mut tmp = vec![0u8; 4096];
 | 
						|
 | 
						|
    loop {
 | 
						|
        if let Some(total) = resp_frame_len(&buf) {
 | 
						|
            if buf.len() >= total {
 | 
						|
                return String::from_utf8_lossy(&buf[..total]).to_string();
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
        match tokio::time::timeout(Duration::from_secs(2), stream.read(&mut tmp)).await {
 | 
						|
            Ok(Ok(n)) => {
 | 
						|
                if n == 0 {
 | 
						|
                    if let Some(total) = resp_frame_len(&buf) {
 | 
						|
                        if buf.len() >= total {
 | 
						|
                            return String::from_utf8_lossy(&buf[..total]).to_string();
 | 
						|
                        }
 | 
						|
                    }
 | 
						|
                    return String::from_utf8_lossy(&buf).to_string();
 | 
						|
                }
 | 
						|
                buf.extend_from_slice(&tmp[..n]);
 | 
						|
            }
 | 
						|
            Ok(Err(e)) => panic!("read error: {}", e),
 | 
						|
            Err(_) => panic!("timeout waiting for reply"),
 | 
						|
        }
 | 
						|
 | 
						|
        if buf.len() > 8 * 1024 * 1024 {
 | 
						|
            panic!("reply too large");
 | 
						|
        }
 | 
						|
    }
 | 
						|
}
 | 
						|
 | 
						|
async fn send_cmd(stream: &mut TcpStream, args: &[&str]) -> String {
 | 
						|
    let req = build_resp(args);
 | 
						|
    stream.write_all(req.as_bytes()).await.unwrap();
 | 
						|
    read_full_resp(stream).await
 | 
						|
}
 | 
						|
 | 
						|
// Assert helpers with clearer output
 | 
						|
fn assert_contains(haystack: &str, needle: &str, ctx: &str) {
 | 
						|
    assert!(
 | 
						|
        haystack.contains(needle),
 | 
						|
        "ASSERT CONTAINS failed: '{}' not found in response.\nContext: {}\nResponse:\n{}",
 | 
						|
        needle,
 | 
						|
        ctx,
 | 
						|
        haystack
 | 
						|
    );
 | 
						|
}
 | 
						|
 | 
						|
fn assert_eq_resp(actual: &str, expected: &str, ctx: &str) {
 | 
						|
    assert!(
 | 
						|
        actual == expected,
 | 
						|
        "ASSERT EQUAL failed.\nContext: {}\nExpected:\n{:?}\nActual:\n{:?}",
 | 
						|
        ctx,
 | 
						|
        expected,
 | 
						|
        actual
 | 
						|
    );
 | 
						|
}
 | 
						|
 | 
						|
/// Extract the payload of a single RESP Bulk String reply.
 | 
						|
/// Example input:
 | 
						|
///   "$5\r\nhello\r\n" -> Some("hello".to_string())
 | 
						|
fn extract_bulk_payload(resp: &str) -> Option<String> {
 | 
						|
    // find first CRLF after "$len"
 | 
						|
    let first = resp.find("\r\n")?;
 | 
						|
    let after = &resp[(first + 2)..];
 | 
						|
    // find next CRLF ending payload
 | 
						|
    let second = after.find("\r\n")?;
 | 
						|
    Some(after[..second].to_string())
 | 
						|
}
 | 
						|
 | 
						|
// =========================
 | 
						|
// Test suites
 | 
						|
// =========================
 | 
						|
 | 
						|
#[tokio::test]
 | 
						|
async fn test_01_connection_and_info() {
 | 
						|
    let (server, port) = start_test_server("conn_info").await;
 | 
						|
    spawn_listener(server, port).await;
 | 
						|
    sleep(Duration::from_millis(150)).await;
 | 
						|
 | 
						|
    let mut s = connect(port).await;
 | 
						|
 | 
						|
    // redis-cli may send COMMAND DOCS, our server replies empty array; harmless.
 | 
						|
    let pong = send_cmd(&mut s, &["PING"]).await;
 | 
						|
    assert_contains(&pong, "PONG", "PING should return PONG");
 | 
						|
 | 
						|
    let echo = send_cmd(&mut s, &["ECHO", "hello"]).await;
 | 
						|
    assert_contains(&echo, "hello", "ECHO hello");
 | 
						|
 | 
						|
    // INFO (general)
 | 
						|
    let info = send_cmd(&mut s, &["INFO"]).await;
 | 
						|
    assert_contains(&info, "redis_version", "INFO should include redis_version");
 | 
						|
 | 
						|
    // INFO REPLICATION (static stub)
 | 
						|
    let repl = send_cmd(&mut s, &["INFO", "replication"]).await;
 | 
						|
    assert_contains(&repl, "role:master", "INFO replication role");
 | 
						|
 | 
						|
    // CONFIG GET subset
 | 
						|
    let cfg = send_cmd(&mut s, &["CONFIG", "GET", "databases"]).await;
 | 
						|
    assert_contains(&cfg, "databases", "CONFIG GET databases");
 | 
						|
    assert_contains(&cfg, "16", "CONFIG GET databases value");
 | 
						|
 | 
						|
    // CLIENT name
 | 
						|
    let setname = send_cmd(&mut s, &["CLIENT", "SETNAME", "myapp"]).await;
 | 
						|
    assert_contains(&setname, "OK", "CLIENT SETNAME");
 | 
						|
 | 
						|
    let getname = send_cmd(&mut s, &["CLIENT", "GETNAME"]).await;
 | 
						|
    assert_contains(&getname, "myapp", "CLIENT GETNAME");
 | 
						|
 | 
						|
    // SELECT db
 | 
						|
    let sel = send_cmd(&mut s, &["SELECT", "0"]).await;
 | 
						|
    assert_contains(&sel, "OK", "SELECT 0");
 | 
						|
 | 
						|
    // QUIT should close connection after sending OK
 | 
						|
    let quit = send_cmd(&mut s, &["QUIT"]).await;
 | 
						|
    assert_contains(&quit, "OK", "QUIT should return OK");
 | 
						|
}
 | 
						|
 | 
						|
#[tokio::test]
 | 
						|
async fn test_02_strings_and_expiry() {
 | 
						|
    let (server, port) = start_test_server("strings").await;
 | 
						|
    spawn_listener(server, port).await;
 | 
						|
    sleep(Duration::from_millis(150)).await;
 | 
						|
 | 
						|
    let mut s = connect(port).await;
 | 
						|
 | 
						|
    // SET / GET
 | 
						|
    let set = send_cmd(&mut s, &["SET", "user:1", "alice"]).await;
 | 
						|
    assert_contains(&set, "OK", "SET user:1 alice");
 | 
						|
 | 
						|
    let get = send_cmd(&mut s, &["GET", "user:1"]).await;
 | 
						|
    assert_contains(&get, "alice", "GET user:1");
 | 
						|
 | 
						|
    // EXISTS / DEL
 | 
						|
    let ex1 = send_cmd(&mut s, &["EXISTS", "user:1"]).await;
 | 
						|
    assert_contains(&ex1, "1", "EXISTS user:1");
 | 
						|
 | 
						|
    let del = send_cmd(&mut s, &["DEL", "user:1"]).await;
 | 
						|
    assert_contains(&del, "1", "DEL user:1");
 | 
						|
 | 
						|
    let ex0 = send_cmd(&mut s, &["EXISTS", "user:1"]).await;
 | 
						|
    assert_contains(&ex0, "0", "EXISTS after DEL");
 | 
						|
 | 
						|
    // INCR behavior
 | 
						|
    let i1 = send_cmd(&mut s, &["INCR", "count"]).await;
 | 
						|
    assert_contains(&i1, "1", "INCR new key -> 1");
 | 
						|
    let i2 = send_cmd(&mut s, &["INCR", "count"]).await;
 | 
						|
    assert_contains(&i2, "2", "INCR existing -> 2");
 | 
						|
    let _ = send_cmd(&mut s, &["SET", "notnum", "abc"]).await;
 | 
						|
    let ierr = send_cmd(&mut s, &["INCR", "notnum"]).await;
 | 
						|
    assert_contains(&ierr, "ERR", "INCR on non-numeric should ERR");
 | 
						|
 | 
						|
    // Expiration via SET EX
 | 
						|
    let setex = send_cmd(&mut s, &["SET", "tmp:1", "boom", "EX", "1"]).await;
 | 
						|
    assert_contains(&setex, "OK", "SET tmp:1 EX 1");
 | 
						|
 | 
						|
    let g_immediate = send_cmd(&mut s, &["GET", "tmp:1"]).await;
 | 
						|
    assert_contains(&g_immediate, "boom", "GET tmp:1 immediately");
 | 
						|
 | 
						|
    let ttl = send_cmd(&mut s, &["TTL", "tmp:1"]).await;
 | 
						|
    // Implementation returns a SimpleString, accept any numeric content
 | 
						|
    assert!(
 | 
						|
        ttl.contains("1") || ttl.contains("0"),
 | 
						|
        "TTL should be 1 or 0, got: {}",
 | 
						|
        ttl
 | 
						|
    );
 | 
						|
 | 
						|
    sleep(Duration::from_millis(1100)).await;
 | 
						|
    let g_after = send_cmd(&mut s, &["GET", "tmp:1"]).await;
 | 
						|
    assert_contains(&g_after, "$-1", "GET tmp:1 after expiry -> Null");
 | 
						|
 | 
						|
    // TYPE
 | 
						|
    let _ = send_cmd(&mut s, &["SET", "t", "v"]).await;
 | 
						|
    let ty = send_cmd(&mut s, &["TYPE", "t"]).await;
 | 
						|
    assert_contains(&ty, "string", "TYPE string key");
 | 
						|
    let ty_none = send_cmd(&mut s, &["TYPE", "noexist"]).await;
 | 
						|
    assert_contains(&ty_none, "none", "TYPE nonexistent");
 | 
						|
}
 | 
						|
 | 
						|
#[tokio::test]
 | 
						|
async fn test_03_scan_and_keys() {
 | 
						|
    let (server, port) = start_test_server("scan").await;
 | 
						|
    spawn_listener(server, port).await;
 | 
						|
    sleep(Duration::from_millis(150)).await;
 | 
						|
 | 
						|
    let mut s = connect(port).await;
 | 
						|
 | 
						|
    for i in 0..5 {
 | 
						|
        let _ = send_cmd(&mut s, &["SET", &format!("key{}", i), &format!("value{}", i)]).await;
 | 
						|
    }
 | 
						|
 | 
						|
    let scan = send_cmd(&mut s, &["SCAN", "0", "MATCH", "key*", "COUNT", "10"]).await;
 | 
						|
    assert_contains(&scan, "key0", "SCAN should return keys with MATCH");
 | 
						|
    assert_contains(&scan, "key4", "SCAN should return last key");
 | 
						|
 | 
						|
    let keys = send_cmd(&mut s, &["KEYS", "*"]).await;
 | 
						|
    assert_contains(&keys, "key0", "KEYS * includes key0");
 | 
						|
    assert_contains(&keys, "key4", "KEYS * includes key4");
 | 
						|
}
 | 
						|
 | 
						|
#[tokio::test]
 | 
						|
async fn test_04_hashes_suite() {
 | 
						|
    let (server, port) = start_test_server("hashes").await;
 | 
						|
    spawn_listener(server, port).await;
 | 
						|
    sleep(Duration::from_millis(150)).await;
 | 
						|
 | 
						|
    let mut s = connect(port).await;
 | 
						|
 | 
						|
    // HSET (single, returns number of new fields)
 | 
						|
    let h1 = send_cmd(&mut s, &["HSET", "profile:1", "name", "alice"]).await;
 | 
						|
    assert_contains(&h1, "1", "HSET new field -> 1");
 | 
						|
 | 
						|
    // HGET
 | 
						|
    let hg = send_cmd(&mut s, &["HGET", "profile:1", "name"]).await;
 | 
						|
    assert_contains(&hg, "alice", "HGET existing field");
 | 
						|
 | 
						|
    // HSET multiple
 | 
						|
    let h2 = send_cmd(&mut s, &["HSET", "profile:1", "age", "30", "city", "paris"]).await;
 | 
						|
    assert_contains(&h2, "2", "HSET added 2 new fields");
 | 
						|
 | 
						|
    // HMGET
 | 
						|
    let hmg = send_cmd(&mut s, &["HMGET", "profile:1", "name", "age", "city", "nope"]).await;
 | 
						|
    assert_contains(&hmg, "alice", "HMGET name");
 | 
						|
    assert_contains(&hmg, "30", "HMGET age");
 | 
						|
    assert_contains(&hmg, "paris", "HMGET city");
 | 
						|
    assert_contains(&hmg, "$-1", "HMGET non-existent -> Null");
 | 
						|
 | 
						|
    // HGETALL
 | 
						|
    let hga = send_cmd(&mut s, &["HGETALL", "profile:1"]).await;
 | 
						|
    assert_contains(&hga, "name", "HGETALL contains name");
 | 
						|
    assert_contains(&hga, "alice", "HGETALL contains alice");
 | 
						|
 | 
						|
    // HLEN
 | 
						|
    let hlen = send_cmd(&mut s, &["HLEN", "profile:1"]).await;
 | 
						|
    assert_contains(&hlen, "3", "HLEN is 3");
 | 
						|
 | 
						|
    // HEXISTS
 | 
						|
    let hex1 = send_cmd(&mut s, &["HEXISTS", "profile:1", "age"]).await;
 | 
						|
    assert_contains(&hex1, "1", "HEXISTS age true");
 | 
						|
    let hex0 = send_cmd(&mut s, &["HEXISTS", "profile:1", "nope"]).await;
 | 
						|
    assert_contains(&hex0, "0", "HEXISTS nope false");
 | 
						|
 | 
						|
    // HKEYS / HVALS
 | 
						|
    let hkeys = send_cmd(&mut s, &["HKEYS", "profile:1"]).await;
 | 
						|
    assert_contains(&hkeys, "name", "HKEYS includes name");
 | 
						|
    let hvals = send_cmd(&mut s, &["HVALS", "profile:1"]).await;
 | 
						|
    assert_contains(&hvals, "alice", "HVALS includes alice");
 | 
						|
 | 
						|
    // HSETNX
 | 
						|
    let hnx0 = send_cmd(&mut s, &["HSETNX", "profile:1", "name", "bob"]).await;
 | 
						|
    assert_contains(&hnx0, "0", "HSETNX existing field -> 0");
 | 
						|
    let hnx1 = send_cmd(&mut s, &["HSETNX", "profile:1", "nickname", "ali"]).await;
 | 
						|
    assert_contains(&hnx1, "1", "HSETNX new field -> 1");
 | 
						|
 | 
						|
    // HSCAN
 | 
						|
    let hscan = send_cmd(&mut s, &["HSCAN", "profile:1", "0", "MATCH", "n*", "COUNT", "10"]).await;
 | 
						|
    assert_contains(&hscan, "name", "HSCAN matches fields starting with n");
 | 
						|
    assert_contains(&hscan, "nickname", "HSCAN nickname present");
 | 
						|
 | 
						|
    // HDEL
 | 
						|
    let hdel = send_cmd(&mut s, &["HDEL", "profile:1", "city", "age"]).await;
 | 
						|
    assert_contains(&hdel, "2", "HDEL removed two fields");
 | 
						|
}
 | 
						|
 | 
						|
#[tokio::test]
 | 
						|
async fn test_05_lists_suite_including_blpop() {
 | 
						|
    let (server, port) = start_test_server("lists").await;
 | 
						|
    spawn_listener(server, port).await;
 | 
						|
    sleep(Duration::from_millis(150)).await;
 | 
						|
 | 
						|
    let mut a = connect(port).await;
 | 
						|
 | 
						|
    // LPUSH / RPUSH / LLEN
 | 
						|
    let lp = send_cmd(&mut a, &["LPUSH", "q:jobs", "a", "b"]).await;
 | 
						|
    assert_contains(&lp, "2", "LPUSH added 2, length 2");
 | 
						|
 | 
						|
    let rp = send_cmd(&mut a, &["RPUSH", "q:jobs", "c"]).await;
 | 
						|
    assert_contains(&rp, "3", "RPUSH now length 3");
 | 
						|
 | 
						|
    let llen = send_cmd(&mut a, &["LLEN", "q:jobs"]).await;
 | 
						|
    assert_contains(&llen, "3", "LLEN 3");
 | 
						|
 | 
						|
    // LINDEX / LRANGE
 | 
						|
    let lidx = send_cmd(&mut a, &["LINDEX", "q:jobs", "0"]).await;
 | 
						|
    assert_eq_resp(&lidx, "$1\r\nb\r\n", "LINDEX q:jobs 0 should be b");
 | 
						|
 | 
						|
    let lr = send_cmd(&mut a, &["LRANGE", "q:jobs", "0", "-1"]).await;
 | 
						|
    assert_eq_resp(&lr, "*3\r\n$1\r\nb\r\n$1\r\na\r\n$1\r\nc\r\n", "LRANGE q:jobs 0 -1 should be [b,a,c]");
 | 
						|
 | 
						|
    // LTRIM
 | 
						|
    let ltrim = send_cmd(&mut a, &["LTRIM", "q:jobs", "0", "1"]).await;
 | 
						|
    assert_contains(<rim, "OK", "LTRIM OK");
 | 
						|
    let lr_post = send_cmd(&mut a, &["LRANGE", "q:jobs", "0", "-1"]).await;
 | 
						|
    assert_eq_resp(&lr_post, "*2\r\n$1\r\nb\r\n$1\r\na\r\n", "After LTRIM, list [b,a]");
 | 
						|
 | 
						|
    // LREM remove first occurrence of b
 | 
						|
    let lrem = send_cmd(&mut a, &["LREM", "q:jobs", "1", "b"]).await;
 | 
						|
    assert_contains(&lrem, "1", "LREM removed 1");
 | 
						|
 | 
						|
    // LPOP and RPOP
 | 
						|
    let lpop1 = send_cmd(&mut a, &["LPOP", "q:jobs"]).await;
 | 
						|
    assert_contains(&lpop1, "$1\r\na\r\n", "LPOP returns a");
 | 
						|
    let rpop_empty = send_cmd(&mut a, &["RPOP", "q:jobs"]).await; // empty now
 | 
						|
    assert_contains(&rpop_empty, "$-1", "RPOP on empty -> Null");
 | 
						|
 | 
						|
    // LPOP with count on empty -> []
 | 
						|
    let lpop0 = send_cmd(&mut a, &["LPOP", "q:jobs", "2"]).await;
 | 
						|
    assert_eq_resp(&lpop0, "*0\r\n", "LPOP with count on empty returns empty array");
 | 
						|
 | 
						|
    // BLPOP: block on one client, push from another
 | 
						|
    let c1 = connect(port).await;
 | 
						|
    let mut c2 = connect(port).await;
 | 
						|
 | 
						|
    // Start BLPOP on c1
 | 
						|
    let blpop_task = tokio::spawn(async move {
 | 
						|
        let mut c1_local = c1;
 | 
						|
        send_cmd(&mut c1_local, &["BLPOP", "q:block", "5"]).await
 | 
						|
    });
 | 
						|
 | 
						|
    // Give it time to register waiter
 | 
						|
    sleep(Duration::from_millis(150)).await;
 | 
						|
 | 
						|
    // Push from c2 to wake BLPOP
 | 
						|
    let _ = send_cmd(&mut c2, &["LPUSH", "q:block", "x"]).await;
 | 
						|
 | 
						|
    // Await BLPOP result
 | 
						|
    let blpop_res = blpop_task.await.expect("BLPOP task join");
 | 
						|
    assert_contains(&blpop_res, "q:block", "BLPOP returned key");
 | 
						|
    assert_contains(&blpop_res, "x", "BLPOP returned element");
 | 
						|
}
 | 
						|
 | 
						|
#[tokio::test]
 | 
						|
async fn test_06_flushdb_suite() {
 | 
						|
    let (server, port) = start_test_server("flushdb").await;
 | 
						|
    spawn_listener(server, port).await;
 | 
						|
    sleep(Duration::from_millis(150)).await;
 | 
						|
 | 
						|
    let mut s = connect(port).await;
 | 
						|
 | 
						|
    let _ = send_cmd(&mut s, &["SET", "k1", "v1"]).await;
 | 
						|
    let _ = send_cmd(&mut s, &["HSET", "h1", "f", "v"]).await;
 | 
						|
    let _ = send_cmd(&mut s, &["LPUSH", "l1", "a"]).await;
 | 
						|
 | 
						|
    let keys_before = send_cmd(&mut s, &["KEYS", "*"]).await;
 | 
						|
    assert_contains(&keys_before, "k1", "have string key before FLUSHDB");
 | 
						|
    assert_contains(&keys_before, "h1", "have hash key before FLUSHDB");
 | 
						|
    assert_contains(&keys_before, "l1", "have list key before FLUSHDB");
 | 
						|
 | 
						|
    let fl = send_cmd(&mut s, &["FLUSHDB"]).await;
 | 
						|
    assert_contains(&fl, "OK", "FLUSHDB OK");
 | 
						|
 | 
						|
    let keys_after = send_cmd(&mut s, &["KEYS", "*"]).await;
 | 
						|
    assert_eq_resp(&keys_after, "*0\r\n", "DB should be empty after FLUSHDB");
 | 
						|
}
 | 
						|
 | 
						|
#[tokio::test]
 | 
						|
async fn test_07_age_stateless_suite() {
 | 
						|
    let (server, port) = start_test_server("age_stateless").await;
 | 
						|
    spawn_listener(server, port).await;
 | 
						|
    sleep(Duration::from_millis(150)).await;
 | 
						|
 | 
						|
    let mut s = connect(port).await;
 | 
						|
 | 
						|
    // GENENC -> [recipient, identity]
 | 
						|
    let genenc = send_cmd(&mut s, &["AGE", "GENENC"]).await;
 | 
						|
    assert!(
 | 
						|
        genenc.starts_with("*2\r\n$"),
 | 
						|
        "AGE GENENC should return array [recipient, identity], got:\n{}",
 | 
						|
        genenc
 | 
						|
    );
 | 
						|
 | 
						|
    // Parse simple RESP array of two bulk strings to extract keys
 | 
						|
    fn parse_two_bulk_array(resp: &str) -> (String, String) {
 | 
						|
        // naive parse for tests
 | 
						|
        let mut lines = resp.lines();
 | 
						|
        let _ = lines.next(); // *2
 | 
						|
        // $len
 | 
						|
        let _ = lines.next();
 | 
						|
        let recip = lines.next().unwrap_or("").to_string();
 | 
						|
        let _ = lines.next();
 | 
						|
        let ident = lines.next().unwrap_or("").to_string();
 | 
						|
        (recip, ident)
 | 
						|
    }
 | 
						|
    let (recipient, identity) = parse_two_bulk_array(&genenc);
 | 
						|
    assert!(
 | 
						|
        recipient.starts_with("age1") && identity.starts_with("AGE-SECRET-KEY-1"),
 | 
						|
        "Unexpected AGE key formats.\nrecipient: {}\nidentity: {}",
 | 
						|
        recipient,
 | 
						|
        identity
 | 
						|
    );
 | 
						|
 | 
						|
    // ENCRYPT / DECRYPT
 | 
						|
    let ct = send_cmd(&mut s, &["AGE", "ENCRYPT", &recipient, "hello world"]).await;
 | 
						|
    let ct_b64 = extract_bulk_payload(&ct).expect("Failed to parse bulk payload from ENCRYPT");
 | 
						|
    let pt = send_cmd(&mut s, &["AGE", "DECRYPT", &identity, &ct_b64]).await;
 | 
						|
    assert_contains(&pt, "hello world", "AGE DECRYPT round-trip");
 | 
						|
 | 
						|
    // GENSIGN -> [verify_pub_b64, sign_secret_b64]
 | 
						|
    let gensign = send_cmd(&mut s, &["AGE", "GENSIGN"]).await;
 | 
						|
    let (verify_pub, sign_secret) = parse_two_bulk_array(&gensign);
 | 
						|
    assert!(
 | 
						|
        !verify_pub.is_empty() && !sign_secret.is_empty(),
 | 
						|
        "GENSIGN returned empty keys"
 | 
						|
    );
 | 
						|
 | 
						|
    // SIGN / VERIFY
 | 
						|
    let sig = send_cmd(&mut s, &["AGE", "SIGN", &sign_secret, "msg"]).await;
 | 
						|
    let sig_b64 = extract_bulk_payload(&sig).expect("Failed to parse bulk payload from SIGN");
 | 
						|
    let v_ok = send_cmd(&mut s, &["AGE", "VERIFY", &verify_pub, "msg", &sig_b64]).await;
 | 
						|
    assert_contains(&v_ok, "1", "VERIFY should be 1 for valid signature");
 | 
						|
 | 
						|
    let v_bad = send_cmd(&mut s, &["AGE", "VERIFY", &verify_pub, "tampered", &sig_b64]).await;
 | 
						|
    assert_contains(&v_bad, "0", "VERIFY should be 0 for invalid message/signature");
 | 
						|
}
 | 
						|
 | 
						|
#[tokio::test]
 | 
						|
async fn test_08_age_persistent_named_suite() {
 | 
						|
    let (server, port) = start_test_server("age_persistent").await;
 | 
						|
    spawn_listener(server, port).await;
 | 
						|
    sleep(Duration::from_millis(150)).await;
 | 
						|
 | 
						|
    let mut s = connect(port).await;
 | 
						|
 | 
						|
    // KEYGEN + ENCRYPTNAME/DECRYPTNAME
 | 
						|
    let kg = send_cmd(&mut s, &["AGE", "KEYGEN", "app1"]).await;
 | 
						|
    assert!(
 | 
						|
        kg.starts_with("*2\r\n"),
 | 
						|
        "AGE KEYGEN should return [recipient, identity], got:\n{}",
 | 
						|
        kg
 | 
						|
    );
 | 
						|
 | 
						|
    let ct = send_cmd(&mut s, &["AGE", "ENCRYPTNAME", "app1", "hello"]).await;
 | 
						|
    let ct_b64 = extract_bulk_payload(&ct).expect("Failed to parse bulk payload from ENCRYPTNAME");
 | 
						|
    let pt = send_cmd(&mut s, &["AGE", "DECRYPTNAME", "app1", &ct_b64]).await;
 | 
						|
    assert_contains(&pt, "hello", "DECRYPTNAME round-trip");
 | 
						|
 | 
						|
    // SIGNKEYGEN + SIGNNAME/VERIFYNAME
 | 
						|
    let skg = send_cmd(&mut s, &["AGE", "SIGNKEYGEN", "app1"]).await;
 | 
						|
    assert!(
 | 
						|
        skg.starts_with("*2\r\n"),
 | 
						|
        "AGE SIGNKEYGEN should return [verify_pub, sign_secret], got:\n{}",
 | 
						|
        skg
 | 
						|
    );
 | 
						|
 | 
						|
    let sig = send_cmd(&mut s, &["AGE", "SIGNNAME", "app1", "m"] ).await;
 | 
						|
    let sig_b64 = extract_bulk_payload(&sig).expect("Failed to parse bulk payload from SIGNNAME");
 | 
						|
    let v1 = send_cmd(&mut s, &["AGE", "VERIFYNAME", "app1", "m", &sig_b64]).await;
 | 
						|
    assert_contains(&v1, "1", "VERIFYNAME valid => 1");
 | 
						|
 | 
						|
    let v0 = send_cmd(&mut s, &["AGE", "VERIFYNAME", "app1", "bad", &sig_b64]).await;
 | 
						|
    assert_contains(&v0, "0", "VERIFYNAME invalid => 0");
 | 
						|
 | 
						|
    // AGE LIST
 | 
						|
    let lst = send_cmd(&mut s, &["AGE", "LIST"]).await;
 | 
						|
    assert_contains(&lst, "encpub", "AGE LIST label encpub");
 | 
						|
    assert_contains(&lst, "app1", "AGE LIST includes app1");
 | 
						|
}
 | 
						|
 | 
						|
#[tokio::test]
 | 
						|
async fn test_10_expire_pexpire_persist() {
 | 
						|
   let (server, port) = start_test_server("expire_suite").await;
 | 
						|
   spawn_listener(server, port).await;
 | 
						|
   sleep(Duration::from_millis(150)).await;
 | 
						|
 | 
						|
   let mut s = connect(port).await;
 | 
						|
 | 
						|
   // EXPIRE: seconds
 | 
						|
   let _ = send_cmd(&mut s, &["SET", "exp:s", "v"]).await;
 | 
						|
   let ex = send_cmd(&mut s, &["EXPIRE", "exp:s", "1"]).await;
 | 
						|
   assert_contains(&ex, "1", "EXPIRE exp:s 1 -> 1 (applied)");
 | 
						|
   let ttl1 = send_cmd(&mut s, &["TTL", "exp:s"]).await;
 | 
						|
   assert!(
 | 
						|
       ttl1.contains("1") || ttl1.contains("0"),
 | 
						|
       "TTL exp:s should be 1 or 0, got: {}",
 | 
						|
       ttl1
 | 
						|
   );
 | 
						|
   sleep(Duration::from_millis(1100)).await;
 | 
						|
   let get_after = send_cmd(&mut s, &["GET", "exp:s"]).await;
 | 
						|
   assert_contains(&get_after, "$-1", "GET after expiry should be Null");
 | 
						|
   let ttl_after = send_cmd(&mut s, &["TTL", "exp:s"]).await;
 | 
						|
   assert_contains(&ttl_after, "-2", "TTL after expiry -> -2");
 | 
						|
   let exists_after = send_cmd(&mut s, &["EXISTS", "exp:s"]).await;
 | 
						|
   assert_contains(&exists_after, "0", "EXISTS after expiry -> 0");
 | 
						|
 | 
						|
   // PEXPIRE: milliseconds
 | 
						|
   let _ = send_cmd(&mut s, &["SET", "exp:ms", "v"]).await;
 | 
						|
   let pex = send_cmd(&mut s, &["PEXPIRE", "exp:ms", "1500"]).await;
 | 
						|
   assert_contains(&pex, "1", "PEXPIRE exp:ms 1500 -> 1 (applied)");
 | 
						|
   let ttl_ms1 = send_cmd(&mut s, &["TTL", "exp:ms"]).await;
 | 
						|
   assert!(
 | 
						|
       ttl_ms1.contains("1") || ttl_ms1.contains("0"),
 | 
						|
       "TTL exp:ms should be 1 or 0 soon after PEXPIRE, got: {}",
 | 
						|
       ttl_ms1
 | 
						|
   );
 | 
						|
   sleep(Duration::from_millis(1600)).await;
 | 
						|
   let exists_ms_after = send_cmd(&mut s, &["EXISTS", "exp:ms"]).await;
 | 
						|
   assert_contains(&exists_ms_after, "0", "EXISTS exp:ms after ms expiry -> 0");
 | 
						|
 | 
						|
   // PERSIST: remove expiration
 | 
						|
   let _ = send_cmd(&mut s, &["SET", "exp:persist", "v"]).await;
 | 
						|
   let _ = send_cmd(&mut s, &["EXPIRE", "exp:persist", "5"]).await;
 | 
						|
   let ttl_pre = send_cmd(&mut s, &["TTL", "exp:persist"]).await;
 | 
						|
   assert!(
 | 
						|
       ttl_pre.contains("5") || ttl_pre.contains("4") || ttl_pre.contains("3") || ttl_pre.contains("2") || ttl_pre.contains("1") || ttl_pre.contains("0"),
 | 
						|
       "TTL exp:persist should be >=0 before persist, got: {}",
 | 
						|
       ttl_pre
 | 
						|
   );
 | 
						|
   let persist1 = send_cmd(&mut s, &["PERSIST", "exp:persist"]).await;
 | 
						|
   assert_contains(&persist1, "1", "PERSIST should remove expiration");
 | 
						|
   let ttl_post = send_cmd(&mut s, &["TTL", "exp:persist"]).await;
 | 
						|
   assert_contains(&ttl_post, "-1", "TTL after PERSIST -> -1 (no expiration)");
 | 
						|
   // Second persist should return 0 (nothing to remove)
 | 
						|
   let persist2 = send_cmd(&mut s, &["PERSIST", "exp:persist"]).await;
 | 
						|
   assert_contains(&persist2, "0", "PERSIST again -> 0 (no expiration to remove)");
 | 
						|
}
 | 
						|
 | 
						|
#[tokio::test]
 | 
						|
async fn test_11_set_with_options() {
 | 
						|
    let (server, port) = start_test_server("set_opts").await;
 | 
						|
    spawn_listener(server, port).await;
 | 
						|
    sleep(Duration::from_millis(150)).await;
 | 
						|
 | 
						|
    let mut s = connect(port).await;
 | 
						|
 | 
						|
    // SET with GET on non-existing key -> returns Null, sets value
 | 
						|
    let set_get1 = send_cmd(&mut s, &["SET", "s1", "v1", "GET"]).await;
 | 
						|
    assert_contains(&set_get1, "$-1", "SET s1 v1 GET returns Null when key didn't exist");
 | 
						|
    let g1 = send_cmd(&mut s, &["GET", "s1"]).await;
 | 
						|
    assert_contains(&g1, "v1", "GET s1 after first SET");
 | 
						|
 | 
						|
    // SET with GET should return old value, then set to new
 | 
						|
    let set_get2 = send_cmd(&mut s, &["SET", "s1", "v2", "GET"]).await;
 | 
						|
    assert_contains(&set_get2, "v1", "SET s1 v2 GET returns previous value v1");
 | 
						|
    let g2 = send_cmd(&mut s, &["GET", "s1"]).await;
 | 
						|
    assert_contains(&g2, "v2", "GET s1 now v2");
 | 
						|
 | 
						|
    // NX prevents update when key exists; with GET should return Null and not change
 | 
						|
    let set_nx = send_cmd(&mut s, &["SET", "s1", "v3", "NX", "GET"]).await;
 | 
						|
    assert_contains(&set_nx, "$-1", "SET s1 v3 NX GET returns Null when not set");
 | 
						|
    let g3 = send_cmd(&mut s, &["GET", "s1"]).await;
 | 
						|
    assert_contains(&g3, "v2", "GET s1 remains v2 after NX prevented write");
 | 
						|
 | 
						|
    // NX allows set when key does not exist
 | 
						|
    let set_nx2 = send_cmd(&mut s, &["SET", "s2", "v10", "NX"]).await;
 | 
						|
    assert_contains(&set_nx2, "OK", "SET s2 v10 NX -> OK for new key");
 | 
						|
    let g4 = send_cmd(&mut s, &["GET", "s2"]).await;
 | 
						|
    assert_contains(&g4, "v10", "GET s2 is v10");
 | 
						|
 | 
						|
    // XX requires existing key; with GET returns old value and sets new
 | 
						|
    let set_xx = send_cmd(&mut s, &["SET", "s2", "v11", "XX", "GET"]).await;
 | 
						|
    assert_contains(&set_xx, "v10", "SET s2 v11 XX GET returns previous v10");
 | 
						|
    let g5 = send_cmd(&mut s, &["GET", "s2"]).await;
 | 
						|
    assert_contains(&g5, "v11", "GET s2 is now v11");
 | 
						|
 | 
						|
    // PX expiration path via SET options
 | 
						|
    let set_px = send_cmd(&mut s, &["SET", "s3", "vpx", "PX", "500"]).await;
 | 
						|
    assert_contains(&set_px, "OK", "SET s3 vpx PX 500 -> OK");
 | 
						|
    let ttl_px1 = send_cmd(&mut s, &["TTL", "s3"]).await;
 | 
						|
    assert!(
 | 
						|
        ttl_px1.contains("0") || ttl_px1.contains("1"),
 | 
						|
        "TTL s3 immediately after PX should be 1 or 0, got: {}",
 | 
						|
        ttl_px1
 | 
						|
    );
 | 
						|
    sleep(Duration::from_millis(650)).await;
 | 
						|
    let g6 = send_cmd(&mut s, &["GET", "s3"]).await;
 | 
						|
    assert_contains(&g6, "$-1", "GET s3 after PX expiry -> Null");
 | 
						|
}
 | 
						|
 | 
						|
#[tokio::test]
 | 
						|
async fn test_09_mget_mset_and_variadic_exists_del() {
 | 
						|
   let (server, port) = start_test_server("mget_mset_variadic").await;
 | 
						|
   spawn_listener(server, port).await;
 | 
						|
   sleep(Duration::from_millis(150)).await;
 | 
						|
 | 
						|
   let mut s = connect(port).await;
 | 
						|
 | 
						|
   // MSET multiple keys
 | 
						|
   let mset = send_cmd(&mut s, &["MSET", "k1", "v1", "k2", "v2", "k3", "v3"]).await;
 | 
						|
   assert_contains(&mset, "OK", "MSET k1 v1 k2 v2 k3 v3 -> OK");
 | 
						|
 | 
						|
   // MGET should return values and Null for missing
 | 
						|
   let mget = send_cmd(&mut s, &["MGET", "k1", "k2", "nope", "k3"]).await;
 | 
						|
   // Expect an array with 4 entries; verify payloads
 | 
						|
   assert_contains(&mget, "v1", "MGET k1");
 | 
						|
   assert_contains(&mget, "v2", "MGET k2");
 | 
						|
   assert_contains(&mget, "v3", "MGET k3");
 | 
						|
   assert_contains(&mget, "$-1", "MGET missing returns Null");
 | 
						|
 | 
						|
   // EXISTS variadic: count how many exist
 | 
						|
   let exists_multi = send_cmd(&mut s, &["EXISTS", "k1", "nope", "k3"]).await;
 | 
						|
   // Server returns SimpleString numeric, e.g. +2
 | 
						|
   assert_contains(&exists_multi, "2", "EXISTS k1 nope k3 -> 2");
 | 
						|
 | 
						|
   // DEL variadic: delete multiple keys, return count deleted
 | 
						|
   let del_multi = send_cmd(&mut s, &["DEL", "k1", "k3", "nope"]).await;
 | 
						|
   assert_contains(&del_multi, "2", "DEL k1 k3 nope -> 2");
 | 
						|
 | 
						|
   // Verify deletion
 | 
						|
   let exists_after = send_cmd(&mut s, &["EXISTS", "k1", "k3"]).await;
 | 
						|
   assert_contains(&exists_after, "0", "EXISTS k1 k3 after DEL -> 0");
 | 
						|
 | 
						|
   // MGET after deletion should include Nulls for deleted keys
 | 
						|
   let mget_after = send_cmd(&mut s, &["MGET", "k1", "k2", "k3"]).await;
 | 
						|
   assert_contains(&mget_after, "$-1", "MGET k1 after DEL -> Null");
 | 
						|
   assert_contains(&mget_after, "v2", "MGET k2 remains");
 | 
						|
   assert_contains(&mget_after, "$-1", "MGET k3 after DEL -> Null");
 | 
						|
}
 | 
						|
#[tokio::test]
 | 
						|
async fn test_12_hash_incr() {
 | 
						|
    let (server, port) = start_test_server("hash_incr").await;
 | 
						|
    spawn_listener(server, port).await;
 | 
						|
    sleep(Duration::from_millis(150)).await;
 | 
						|
 | 
						|
    let mut s = connect(port).await;
 | 
						|
 | 
						|
    // Integer increments
 | 
						|
    let _ = send_cmd(&mut s, &["HSET", "hinc", "a", "1"]).await;
 | 
						|
    let r1 = send_cmd(&mut s, &["HINCRBY", "hinc", "a", "2"]).await;
 | 
						|
    assert_contains(&r1, "3", "HINCRBY hinc a 2 -> 3");
 | 
						|
 | 
						|
    let r2 = send_cmd(&mut s, &["HINCRBY", "hinc", "a", "-1"]).await;
 | 
						|
    assert_contains(&r2, "2", "HINCRBY hinc a -1 -> 2");
 | 
						|
 | 
						|
    let r3 = send_cmd(&mut s, &["HINCRBY", "hinc", "b", "5"]).await;
 | 
						|
    assert_contains(&r3, "5", "HINCRBY hinc b 5 -> 5");
 | 
						|
 | 
						|
    // HINCRBY error on non-integer field
 | 
						|
    let _ = send_cmd(&mut s, &["HSET", "hinc", "s", "x"]).await;
 | 
						|
    let r_err = send_cmd(&mut s, &["HINCRBY", "hinc", "s", "1"]).await;
 | 
						|
    assert_contains(&r_err, "ERR", "HINCRBY on non-integer field should ERR");
 | 
						|
 | 
						|
    // Float increments
 | 
						|
    let r4 = send_cmd(&mut s, &["HINCRBYFLOAT", "hinc", "f", "1.5"]).await;
 | 
						|
    assert_contains(&r4, "1.5", "HINCRBYFLOAT hinc f 1.5 -> 1.5");
 | 
						|
 | 
						|
    let r5 = send_cmd(&mut s, &["HINCRBYFLOAT", "hinc", "f", "2.5"]).await;
 | 
						|
    // Could be "4", "4.0", or "4.000000", accept "4" substring
 | 
						|
    assert_contains(&r5, "4", "HINCRBYFLOAT hinc f 2.5 -> 4");
 | 
						|
 | 
						|
    // HINCRBYFLOAT error on non-float field
 | 
						|
    let _ = send_cmd(&mut s, &["HSET", "hinc", "notf", "abc"]).await;
 | 
						|
    let r6 = send_cmd(&mut s, &["HINCRBYFLOAT", "hinc", "notf", "1"]).await;
 | 
						|
    assert_contains(&r6, "ERR", "HINCRBYFLOAT on non-float field should ERR");
 | 
						|
}
 | 
						|
#[tokio::test]
 | 
						|
async fn test_05b_brpop_suite() {
 | 
						|
    let (server, port) = start_test_server("lists_brpop").await;
 | 
						|
    spawn_listener(server, port).await;
 | 
						|
    sleep(Duration::from_millis(150)).await;
 | 
						|
 | 
						|
    let mut a = connect(port).await;
 | 
						|
 | 
						|
    // RPUSH some initial data, BRPOP should take from the right
 | 
						|
    let _ = send_cmd(&mut a, &["RPUSH", "q:rjobs", "1", "2"]).await;
 | 
						|
    let br_nonblock = send_cmd(&mut a, &["BRPOP", "q:rjobs", "0"]).await;
 | 
						|
    // Should pop the rightmost element "2"
 | 
						|
    assert_contains(&br_nonblock, "q:rjobs", "BRPOP returns key");
 | 
						|
    assert_contains(&br_nonblock, "2", "BRPOP returns rightmost element");
 | 
						|
 | 
						|
    // Now test blocking BRPOP: start blocked client, then RPUSH from another client
 | 
						|
    let c1 = connect(port).await;
 | 
						|
    let mut c2 = connect(port).await;
 | 
						|
 | 
						|
    // Start BRPOP on c1
 | 
						|
    let brpop_task = tokio::spawn(async move {
 | 
						|
        let mut c1_local = c1;
 | 
						|
        send_cmd(&mut c1_local, &["BRPOP", "q:blockr", "5"]).await
 | 
						|
    });
 | 
						|
 | 
						|
    // Give it time to register waiter
 | 
						|
    sleep(Duration::from_millis(150)).await;
 | 
						|
 | 
						|
    // Push from right to wake BRPOP
 | 
						|
    let _ = send_cmd(&mut c2, &["RPUSH", "q:blockr", "X"]).await;
 | 
						|
 | 
						|
    // Await BRPOP result
 | 
						|
    let brpop_res = brpop_task.await.expect("BRPOP task join");
 | 
						|
    assert_contains(&brpop_res, "q:blockr", "BRPOP returned key");
 | 
						|
    assert_contains(&brpop_res, "X", "BRPOP returned element");
 | 
						|
}
 | 
						|
#[tokio::test]
 | 
						|
async fn test_13_dbsize() {
 | 
						|
    let (server, port) = start_test_server("dbsize").await;
 | 
						|
    spawn_listener(server, port).await;
 | 
						|
    sleep(Duration::from_millis(150)).await;
 | 
						|
 | 
						|
    let mut s = connect(port).await;
 | 
						|
 | 
						|
    // Initially empty
 | 
						|
    let n0 = send_cmd(&mut s, &["DBSIZE"]).await;
 | 
						|
    assert_contains(&n0, "0", "DBSIZE initial should be 0");
 | 
						|
 | 
						|
    // Add a string, a hash, and a list -> dbsize = 3
 | 
						|
    let _ = send_cmd(&mut s, &["SET", "s", "v"]).await;
 | 
						|
    let _ = send_cmd(&mut s, &["HSET", "h", "f", "v"]).await;
 | 
						|
    let _ = send_cmd(&mut s, &["LPUSH", "l", "a", "b"]).await;
 | 
						|
 | 
						|
    let n3 = send_cmd(&mut s, &["DBSIZE"]).await;
 | 
						|
    assert_contains(&n3, "3", "DBSIZE after adding s,h,l should be 3");
 | 
						|
 | 
						|
    // Expire the string and wait, dbsize should drop to 2
 | 
						|
    let _ = send_cmd(&mut s, &["PEXPIRE", "s", "400"]).await;
 | 
						|
    sleep(Duration::from_millis(500)).await;
 | 
						|
 | 
						|
    let n2 = send_cmd(&mut s, &["DBSIZE"]).await;
 | 
						|
    assert_contains(&n2, "2", "DBSIZE after string expiry should be 2");
 | 
						|
 | 
						|
    // Delete remaining keys and confirm 0
 | 
						|
    let _ = send_cmd(&mut s, &["DEL", "h"]).await;
 | 
						|
    let _ = send_cmd(&mut s, &["DEL", "l"]).await;
 | 
						|
 | 
						|
    let n_final = send_cmd(&mut s, &["DBSIZE"]).await;
 | 
						|
    assert_contains(&n_final, "0", "DBSIZE after deleting all keys should be 0");
 | 
						|
}
 | 
						|
#[tokio::test]
 | 
						|
async fn test_14_expireat_pexpireat() {
 | 
						|
    use std::time::{SystemTime, UNIX_EPOCH};
 | 
						|
 | 
						|
    let (server, port) = start_test_server("expireat_suite").await;
 | 
						|
    spawn_listener(server, port).await;
 | 
						|
    sleep(Duration::from_millis(150)).await;
 | 
						|
 | 
						|
    let mut s = connect(port).await;
 | 
						|
 | 
						|
    // EXPIREAT: seconds since epoch
 | 
						|
    let now_secs = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() as i64;
 | 
						|
    let _ = send_cmd(&mut s, &["SET", "exp:at:s", "v"]).await;
 | 
						|
    let exat = send_cmd(&mut s, &["EXPIREAT", "exp:at:s", &format!("{}", now_secs + 1)]).await;
 | 
						|
    assert_contains(&exat, "1", "EXPIREAT exp:at:s now+1s -> 1 (applied)");
 | 
						|
    let ttl1 = send_cmd(&mut s, &["TTL", "exp:at:s"]).await;
 | 
						|
    assert!(
 | 
						|
        ttl1.contains("1") || ttl1.contains("0"),
 | 
						|
        "TTL exp:at:s should be 1 or 0 shortly after EXPIREAT, got: {}",
 | 
						|
        ttl1
 | 
						|
    );
 | 
						|
    sleep(Duration::from_millis(1200)).await;
 | 
						|
    let exists_after_exat = send_cmd(&mut s, &["EXISTS", "exp:at:s"]).await;
 | 
						|
    assert_contains(&exists_after_exat, "0", "EXISTS exp:at:s after EXPIREAT expiry -> 0");
 | 
						|
 | 
						|
    // PEXPIREAT: milliseconds since epoch
 | 
						|
    let now_ms = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as i64;
 | 
						|
    let _ = send_cmd(&mut s, &["SET", "exp:at:ms", "v"]).await;
 | 
						|
    let pexat = send_cmd(&mut s, &["PEXPIREAT", "exp:at:ms", &format!("{}", now_ms + 450)]).await;
 | 
						|
    assert_contains(&pexat, "1", "PEXPIREAT exp:at:ms now+450ms -> 1 (applied)");
 | 
						|
    let ttl2 = send_cmd(&mut s, &["TTL", "exp:at:ms"]).await;
 | 
						|
    assert!(
 | 
						|
        ttl2.contains("0") || ttl2.contains("1"),
 | 
						|
        "TTL exp:at:ms should be 0..1 soon after PEXPIREAT, got: {}",
 | 
						|
        ttl2
 | 
						|
    );
 | 
						|
    sleep(Duration::from_millis(600)).await;
 | 
						|
    let exists_after_pexat = send_cmd(&mut s, &["EXISTS", "exp:at:ms"]).await;
 | 
						|
    assert_contains(&exists_after_pexat, "0", "EXISTS exp:at:ms after PEXPIREAT expiry -> 0");
 | 
						|
} |