Implemented EXPIRE/PEXPIRE/PERSIST
This commit is contained in:
parent
b644bf873f
commit
34808fc1c9
@ -37,6 +37,9 @@ pub enum Cmd {
|
|||||||
HScan(String, u64, Option<String>, Option<u64>), // key, cursor, pattern, count
|
HScan(String, u64, Option<String>, Option<u64>), // key, cursor, pattern, count
|
||||||
Scan(u64, Option<String>, Option<u64>), // cursor, pattern, count
|
Scan(u64, Option<String>, Option<u64>), // cursor, pattern, count
|
||||||
Ttl(String),
|
Ttl(String),
|
||||||
|
Expire(String, i64),
|
||||||
|
PExpire(String, i64),
|
||||||
|
Persist(String),
|
||||||
Exists(String),
|
Exists(String),
|
||||||
ExistsMulti(Vec<String>),
|
ExistsMulti(Vec<String>),
|
||||||
DelMulti(Vec<String>),
|
DelMulti(Vec<String>),
|
||||||
@ -337,6 +340,26 @@ impl Cmd {
|
|||||||
}
|
}
|
||||||
Cmd::Ttl(cmd[1].clone())
|
Cmd::Ttl(cmd[1].clone())
|
||||||
}
|
}
|
||||||
|
"expire" => {
|
||||||
|
if cmd.len() != 3 {
|
||||||
|
return Err(DBError("wrong number of arguments for EXPIRE command".to_string()));
|
||||||
|
}
|
||||||
|
let secs = cmd[2].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
|
||||||
|
Cmd::Expire(cmd[1].clone(), secs)
|
||||||
|
}
|
||||||
|
"pexpire" => {
|
||||||
|
if cmd.len() != 3 {
|
||||||
|
return Err(DBError("wrong number of arguments for PEXPIRE command".to_string()));
|
||||||
|
}
|
||||||
|
let ms = cmd[2].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
|
||||||
|
Cmd::PExpire(cmd[1].clone(), ms)
|
||||||
|
}
|
||||||
|
"persist" => {
|
||||||
|
if cmd.len() != 2 {
|
||||||
|
return Err(DBError("wrong number of arguments for PERSIST command".to_string()));
|
||||||
|
}
|
||||||
|
Cmd::Persist(cmd[1].clone())
|
||||||
|
}
|
||||||
"exists" => {
|
"exists" => {
|
||||||
if cmd.len() < 2 {
|
if cmd.len() < 2 {
|
||||||
return Err(DBError(format!("wrong number of arguments for EXISTS command")));
|
return Err(DBError(format!("wrong number of arguments for EXISTS command")));
|
||||||
@ -573,6 +596,9 @@ impl Cmd {
|
|||||||
Cmd::HScan(key, cursor, pattern, count) => hscan_cmd(server, &key, &cursor, pattern.as_deref(), &count).await,
|
Cmd::HScan(key, cursor, pattern, count) => hscan_cmd(server, &key, &cursor, pattern.as_deref(), &count).await,
|
||||||
Cmd::Scan(cursor, pattern, count) => scan_cmd(server, &cursor, pattern.as_deref(), &count).await,
|
Cmd::Scan(cursor, pattern, count) => scan_cmd(server, &cursor, pattern.as_deref(), &count).await,
|
||||||
Cmd::Ttl(key) => ttl_cmd(server, &key).await,
|
Cmd::Ttl(key) => ttl_cmd(server, &key).await,
|
||||||
|
Cmd::Expire(key, secs) => expire_cmd(server, &key, secs).await,
|
||||||
|
Cmd::PExpire(key, ms) => pexpire_cmd(server, &key, ms).await,
|
||||||
|
Cmd::Persist(key) => persist_cmd(server, &key).await,
|
||||||
Cmd::Exists(key) => exists_cmd(server, &key).await,
|
Cmd::Exists(key) => exists_cmd(server, &key).await,
|
||||||
Cmd::ExistsMulti(keys) => exists_multi_cmd(server, &keys).await,
|
Cmd::ExistsMulti(keys) => exists_multi_cmd(server, &keys).await,
|
||||||
Cmd::Quit => Ok(Protocol::SimpleString("OK".to_string())),
|
Cmd::Quit => Ok(Protocol::SimpleString("OK".to_string())),
|
||||||
@ -1151,6 +1177,36 @@ async fn exists_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EXPIRE key seconds -> 1 if timeout set, 0 otherwise
|
||||||
|
async fn expire_cmd(server: &Server, key: &str, secs: i64) -> Result<Protocol, DBError> {
|
||||||
|
if secs < 0 {
|
||||||
|
return Ok(Protocol::SimpleString("0".to_string()));
|
||||||
|
}
|
||||||
|
match server.current_storage()?.expire_seconds(key, secs as u64) {
|
||||||
|
Ok(applied) => Ok(Protocol::SimpleString(if applied { "1" } else { "0" }.to_string())),
|
||||||
|
Err(e) => Ok(Protocol::err(&e.0)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// PEXPIRE key milliseconds -> 1 if timeout set, 0 otherwise
|
||||||
|
async fn pexpire_cmd(server: &Server, key: &str, ms: i64) -> Result<Protocol, DBError> {
|
||||||
|
if ms < 0 {
|
||||||
|
return Ok(Protocol::SimpleString("0".to_string()));
|
||||||
|
}
|
||||||
|
match server.current_storage()?.pexpire_millis(key, ms as u128) {
|
||||||
|
Ok(applied) => Ok(Protocol::SimpleString(if applied { "1" } else { "0" }.to_string())),
|
||||||
|
Err(e) => Ok(Protocol::err(&e.0)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// PERSIST key -> 1 if timeout removed, 0 otherwise
|
||||||
|
async fn persist_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
|
||||||
|
match server.current_storage()?.persist(key) {
|
||||||
|
Ok(removed) => Ok(Protocol::SimpleString(if removed { "1" } else { "0" }.to_string())),
|
||||||
|
Err(e) => Ok(Protocol::err(&e.0)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn client_setname_cmd(server: &mut Server, name: &str) -> Result<Protocol, DBError> {
|
async fn client_setname_cmd(server: &mut Server, name: &str) -> Result<Protocol, DBError> {
|
||||||
server.client_name = Some(name.to_string());
|
server.client_name = Some(name.to_string());
|
||||||
Ok(Protocol::SimpleString("OK".to_string()))
|
Ok(Protocol::SimpleString("OK".to_string()))
|
||||||
|
@ -98,6 +98,72 @@ impl Storage {
|
|||||||
None => Ok(false), // Key does not exist
|
None => Ok(false), // Key does not exist
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// -------- Expiration helpers (string keys only, consistent with TTL/EXISTS) --------
|
||||||
|
|
||||||
|
// Set expiry in seconds; returns true if applied (key exists and is string), false otherwise
|
||||||
|
pub fn expire_seconds(&self, key: &str, secs: u64) -> Result<bool, DBError> {
|
||||||
|
// Determine eligibility first to avoid holding borrows across commit
|
||||||
|
let mut applied = false;
|
||||||
|
let write_txn = self.db.begin_write()?;
|
||||||
|
{
|
||||||
|
let types_table = write_txn.open_table(TYPES_TABLE)?;
|
||||||
|
let is_string = types_table
|
||||||
|
.get(key)?
|
||||||
|
.map(|v| v.value() == "string")
|
||||||
|
.unwrap_or(false);
|
||||||
|
if is_string {
|
||||||
|
let mut expiration_table = write_txn.open_table(EXPIRATION_TABLE)?;
|
||||||
|
let expires_at = now_in_millis() + (secs as u128) * 1000;
|
||||||
|
expiration_table.insert(key, &(expires_at as u64))?;
|
||||||
|
applied = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
write_txn.commit()?;
|
||||||
|
Ok(applied)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set expiry in milliseconds; returns true if applied (key exists and is string), false otherwise
|
||||||
|
pub fn pexpire_millis(&self, key: &str, ms: u128) -> Result<bool, DBError> {
|
||||||
|
let mut applied = false;
|
||||||
|
let write_txn = self.db.begin_write()?;
|
||||||
|
{
|
||||||
|
let types_table = write_txn.open_table(TYPES_TABLE)?;
|
||||||
|
let is_string = types_table
|
||||||
|
.get(key)?
|
||||||
|
.map(|v| v.value() == "string")
|
||||||
|
.unwrap_or(false);
|
||||||
|
if is_string {
|
||||||
|
let mut expiration_table = write_txn.open_table(EXPIRATION_TABLE)?;
|
||||||
|
let expires_at = now_in_millis() + ms;
|
||||||
|
expiration_table.insert(key, &(expires_at as u64))?;
|
||||||
|
applied = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
write_txn.commit()?;
|
||||||
|
Ok(applied)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove expiry if present; returns true if removed, false otherwise
|
||||||
|
pub fn persist(&self, key: &str) -> Result<bool, DBError> {
|
||||||
|
let mut removed = false;
|
||||||
|
let write_txn = self.db.begin_write()?;
|
||||||
|
{
|
||||||
|
let types_table = write_txn.open_table(TYPES_TABLE)?;
|
||||||
|
let is_string = types_table
|
||||||
|
.get(key)?
|
||||||
|
.map(|v| v.value() == "string")
|
||||||
|
.unwrap_or(false);
|
||||||
|
if is_string {
|
||||||
|
let mut expiration_table = write_txn.open_table(EXPIRATION_TABLE)?;
|
||||||
|
if expiration_table.remove(key)?.is_some() {
|
||||||
|
removed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
write_txn.commit()?;
|
||||||
|
Ok(removed)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Utility function for glob pattern matching
|
// Utility function for glob pattern matching
|
||||||
|
600
herodb/tests/usage_suite.rs
Normal file
600
herodb/tests/usage_suite.rs
Normal file
@ -0,0 +1,600 @@
|
|||||||
|
use herodb::{options::DBOption, server::Server};
|
||||||
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
use tokio::net::TcpStream;
|
||||||
|
use tokio::time::{sleep, Duration};
|
||||||
|
|
||||||
|
// =========================
|
||||||
|
// Helpers
|
||||||
|
// =========================
|
||||||
|
|
||||||
|
async fn start_test_server(test_name: &str) -> (Server, u16) {
|
||||||
|
use std::sync::atomic::{AtomicU16, Ordering};
|
||||||
|
static PORT_COUNTER: AtomicU16 = AtomicU16::new(17100);
|
||||||
|
let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst);
|
||||||
|
|
||||||
|
let test_dir = format!("/tmp/herodb_usage_suite_{}", test_name);
|
||||||
|
let _ = std::fs::remove_dir_all(&test_dir);
|
||||||
|
std::fs::create_dir_all(&test_dir).unwrap();
|
||||||
|
|
||||||
|
let option = DBOption {
|
||||||
|
dir: test_dir,
|
||||||
|
port,
|
||||||
|
debug: false,
|
||||||
|
encrypt: false,
|
||||||
|
encryption_key: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let server = Server::new(option).await;
|
||||||
|
(server, port)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn spawn_listener(server: Server, port: u16) {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
|
||||||
|
.await
|
||||||
|
.expect("bind listener");
|
||||||
|
loop {
|
||||||
|
match listener.accept().await {
|
||||||
|
Ok((stream, _)) => {
|
||||||
|
let mut s_clone = server.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let _ = s_clone.handle(stream).await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(_e) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build RESP array for args ["PING"] -> "*1\r\n$4\r\nPING\r\n"
|
||||||
|
fn build_resp(args: &[&str]) -> String {
|
||||||
|
let mut s = format!("*{}\r\n", args.len());
|
||||||
|
for a in args {
|
||||||
|
s.push_str(&format!("${}\r\n{}\r\n", a.len(), a));
|
||||||
|
}
|
||||||
|
s
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn connect(port: u16) -> TcpStream {
|
||||||
|
let mut attempts = 0;
|
||||||
|
loop {
|
||||||
|
match TcpStream::connect(format!("127.0.0.1:{}", port)).await {
|
||||||
|
Ok(s) => return s,
|
||||||
|
Err(_) if attempts < 30 => {
|
||||||
|
attempts += 1;
|
||||||
|
sleep(Duration::from_millis(100)).await;
|
||||||
|
}
|
||||||
|
Err(e) => panic!("Failed to connect: {}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_cmd(stream: &mut TcpStream, args: &[&str]) -> String {
|
||||||
|
let req = build_resp(args);
|
||||||
|
stream.write_all(req.as_bytes()).await.unwrap();
|
||||||
|
|
||||||
|
// Single read is enough for these small replies
|
||||||
|
let mut buf = vec![0u8; 8192];
|
||||||
|
let n = stream.read(&mut buf).await.unwrap();
|
||||||
|
String::from_utf8_lossy(&buf[..n]).to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assert helpers with clearer output
|
||||||
|
fn assert_contains(haystack: &str, needle: &str, ctx: &str) {
|
||||||
|
assert!(
|
||||||
|
haystack.contains(needle),
|
||||||
|
"ASSERT CONTAINS failed: '{}' not found in response.\nContext: {}\nResponse:\n{}",
|
||||||
|
needle,
|
||||||
|
ctx,
|
||||||
|
haystack
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn assert_eq_resp(actual: &str, expected: &str, ctx: &str) {
|
||||||
|
assert!(
|
||||||
|
actual == expected,
|
||||||
|
"ASSERT EQUAL failed.\nContext: {}\nExpected:\n{:?}\nActual:\n{:?}",
|
||||||
|
ctx,
|
||||||
|
expected,
|
||||||
|
actual
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extract the payload of a single RESP Bulk String reply.
|
||||||
|
/// Example input:
|
||||||
|
/// "$5\r\nhello\r\n" -> Some("hello".to_string())
|
||||||
|
fn extract_bulk_payload(resp: &str) -> Option<String> {
|
||||||
|
// find first CRLF after "$len"
|
||||||
|
let first = resp.find("\r\n")?;
|
||||||
|
let after = &resp[(first + 2)..];
|
||||||
|
// find next CRLF ending payload
|
||||||
|
let second = after.find("\r\n")?;
|
||||||
|
Some(after[..second].to_string())
|
||||||
|
}
|
||||||
|
|
||||||
|
// =========================
|
||||||
|
// Test suites
|
||||||
|
// =========================
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_01_connection_and_info() {
|
||||||
|
let (server, port) = start_test_server("conn_info").await;
|
||||||
|
spawn_listener(server, port).await;
|
||||||
|
sleep(Duration::from_millis(150)).await;
|
||||||
|
|
||||||
|
let mut s = connect(port).await;
|
||||||
|
|
||||||
|
// redis-cli may send COMMAND DOCS, our server replies empty array; harmless.
|
||||||
|
let pong = send_cmd(&mut s, &["PING"]).await;
|
||||||
|
assert_contains(&pong, "PONG", "PING should return PONG");
|
||||||
|
|
||||||
|
let echo = send_cmd(&mut s, &["ECHO", "hello"]).await;
|
||||||
|
assert_contains(&echo, "hello", "ECHO hello");
|
||||||
|
|
||||||
|
// INFO (general)
|
||||||
|
let info = send_cmd(&mut s, &["INFO"]).await;
|
||||||
|
assert_contains(&info, "redis_version", "INFO should include redis_version");
|
||||||
|
|
||||||
|
// INFO REPLICATION (static stub)
|
||||||
|
let repl = send_cmd(&mut s, &["INFO", "replication"]).await;
|
||||||
|
assert_contains(&repl, "role:master", "INFO replication role");
|
||||||
|
|
||||||
|
// CONFIG GET subset
|
||||||
|
let cfg = send_cmd(&mut s, &["CONFIG", "GET", "databases"]).await;
|
||||||
|
assert_contains(&cfg, "databases", "CONFIG GET databases");
|
||||||
|
assert_contains(&cfg, "16", "CONFIG GET databases value");
|
||||||
|
|
||||||
|
// CLIENT name
|
||||||
|
let setname = send_cmd(&mut s, &["CLIENT", "SETNAME", "myapp"]).await;
|
||||||
|
assert_contains(&setname, "OK", "CLIENT SETNAME");
|
||||||
|
|
||||||
|
let getname = send_cmd(&mut s, &["CLIENT", "GETNAME"]).await;
|
||||||
|
assert_contains(&getname, "myapp", "CLIENT GETNAME");
|
||||||
|
|
||||||
|
// SELECT db
|
||||||
|
let sel = send_cmd(&mut s, &["SELECT", "0"]).await;
|
||||||
|
assert_contains(&sel, "OK", "SELECT 0");
|
||||||
|
|
||||||
|
// QUIT should close connection after sending OK
|
||||||
|
let quit = send_cmd(&mut s, &["QUIT"]).await;
|
||||||
|
assert_contains(&quit, "OK", "QUIT should return OK");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_02_strings_and_expiry() {
|
||||||
|
let (server, port) = start_test_server("strings").await;
|
||||||
|
spawn_listener(server, port).await;
|
||||||
|
sleep(Duration::from_millis(150)).await;
|
||||||
|
|
||||||
|
let mut s = connect(port).await;
|
||||||
|
|
||||||
|
// SET / GET
|
||||||
|
let set = send_cmd(&mut s, &["SET", "user:1", "alice"]).await;
|
||||||
|
assert_contains(&set, "OK", "SET user:1 alice");
|
||||||
|
|
||||||
|
let get = send_cmd(&mut s, &["GET", "user:1"]).await;
|
||||||
|
assert_contains(&get, "alice", "GET user:1");
|
||||||
|
|
||||||
|
// EXISTS / DEL
|
||||||
|
let ex1 = send_cmd(&mut s, &["EXISTS", "user:1"]).await;
|
||||||
|
assert_contains(&ex1, "1", "EXISTS user:1");
|
||||||
|
|
||||||
|
let del = send_cmd(&mut s, &["DEL", "user:1"]).await;
|
||||||
|
assert_contains(&del, "1", "DEL user:1");
|
||||||
|
|
||||||
|
let ex0 = send_cmd(&mut s, &["EXISTS", "user:1"]).await;
|
||||||
|
assert_contains(&ex0, "0", "EXISTS after DEL");
|
||||||
|
|
||||||
|
// INCR behavior
|
||||||
|
let i1 = send_cmd(&mut s, &["INCR", "count"]).await;
|
||||||
|
assert_contains(&i1, "1", "INCR new key -> 1");
|
||||||
|
let i2 = send_cmd(&mut s, &["INCR", "count"]).await;
|
||||||
|
assert_contains(&i2, "2", "INCR existing -> 2");
|
||||||
|
let _ = send_cmd(&mut s, &["SET", "notnum", "abc"]).await;
|
||||||
|
let ierr = send_cmd(&mut s, &["INCR", "notnum"]).await;
|
||||||
|
assert_contains(&ierr, "ERR", "INCR on non-numeric should ERR");
|
||||||
|
|
||||||
|
// Expiration via SET EX
|
||||||
|
let setex = send_cmd(&mut s, &["SET", "tmp:1", "boom", "EX", "1"]).await;
|
||||||
|
assert_contains(&setex, "OK", "SET tmp:1 EX 1");
|
||||||
|
|
||||||
|
let g_immediate = send_cmd(&mut s, &["GET", "tmp:1"]).await;
|
||||||
|
assert_contains(&g_immediate, "boom", "GET tmp:1 immediately");
|
||||||
|
|
||||||
|
let ttl = send_cmd(&mut s, &["TTL", "tmp:1"]).await;
|
||||||
|
// Implementation returns a SimpleString, accept any numeric content
|
||||||
|
assert!(
|
||||||
|
ttl.contains("1") || ttl.contains("0"),
|
||||||
|
"TTL should be 1 or 0, got: {}",
|
||||||
|
ttl
|
||||||
|
);
|
||||||
|
|
||||||
|
sleep(Duration::from_millis(1100)).await;
|
||||||
|
let g_after = send_cmd(&mut s, &["GET", "tmp:1"]).await;
|
||||||
|
assert_contains(&g_after, "$-1", "GET tmp:1 after expiry -> Null");
|
||||||
|
|
||||||
|
// TYPE
|
||||||
|
let _ = send_cmd(&mut s, &["SET", "t", "v"]).await;
|
||||||
|
let ty = send_cmd(&mut s, &["TYPE", "t"]).await;
|
||||||
|
assert_contains(&ty, "string", "TYPE string key");
|
||||||
|
let ty_none = send_cmd(&mut s, &["TYPE", "noexist"]).await;
|
||||||
|
assert_contains(&ty_none, "none", "TYPE nonexistent");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_03_scan_and_keys() {
|
||||||
|
let (server, port) = start_test_server("scan").await;
|
||||||
|
spawn_listener(server, port).await;
|
||||||
|
sleep(Duration::from_millis(150)).await;
|
||||||
|
|
||||||
|
let mut s = connect(port).await;
|
||||||
|
|
||||||
|
for i in 0..5 {
|
||||||
|
let _ = send_cmd(&mut s, &["SET", &format!("key{}", i), &format!("value{}", i)]).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
let scan = send_cmd(&mut s, &["SCAN", "0", "MATCH", "key*", "COUNT", "10"]).await;
|
||||||
|
assert_contains(&scan, "key0", "SCAN should return keys with MATCH");
|
||||||
|
assert_contains(&scan, "key4", "SCAN should return last key");
|
||||||
|
|
||||||
|
let keys = send_cmd(&mut s, &["KEYS", "*"]).await;
|
||||||
|
assert_contains(&keys, "key0", "KEYS * includes key0");
|
||||||
|
assert_contains(&keys, "key4", "KEYS * includes key4");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_04_hashes_suite() {
|
||||||
|
let (server, port) = start_test_server("hashes").await;
|
||||||
|
spawn_listener(server, port).await;
|
||||||
|
sleep(Duration::from_millis(150)).await;
|
||||||
|
|
||||||
|
let mut s = connect(port).await;
|
||||||
|
|
||||||
|
// HSET (single, returns number of new fields)
|
||||||
|
let h1 = send_cmd(&mut s, &["HSET", "profile:1", "name", "alice"]).await;
|
||||||
|
assert_contains(&h1, "1", "HSET new field -> 1");
|
||||||
|
|
||||||
|
// HGET
|
||||||
|
let hg = send_cmd(&mut s, &["HGET", "profile:1", "name"]).await;
|
||||||
|
assert_contains(&hg, "alice", "HGET existing field");
|
||||||
|
|
||||||
|
// HSET multiple
|
||||||
|
let h2 = send_cmd(&mut s, &["HSET", "profile:1", "age", "30", "city", "paris"]).await;
|
||||||
|
assert_contains(&h2, "2", "HSET added 2 new fields");
|
||||||
|
|
||||||
|
// HMGET
|
||||||
|
let hmg = send_cmd(&mut s, &["HMGET", "profile:1", "name", "age", "city", "nope"]).await;
|
||||||
|
assert_contains(&hmg, "alice", "HMGET name");
|
||||||
|
assert_contains(&hmg, "30", "HMGET age");
|
||||||
|
assert_contains(&hmg, "paris", "HMGET city");
|
||||||
|
assert_contains(&hmg, "$-1", "HMGET non-existent -> Null");
|
||||||
|
|
||||||
|
// HGETALL
|
||||||
|
let hga = send_cmd(&mut s, &["HGETALL", "profile:1"]).await;
|
||||||
|
assert_contains(&hga, "name", "HGETALL contains name");
|
||||||
|
assert_contains(&hga, "alice", "HGETALL contains alice");
|
||||||
|
|
||||||
|
// HLEN
|
||||||
|
let hlen = send_cmd(&mut s, &["HLEN", "profile:1"]).await;
|
||||||
|
assert_contains(&hlen, "3", "HLEN is 3");
|
||||||
|
|
||||||
|
// HEXISTS
|
||||||
|
let hex1 = send_cmd(&mut s, &["HEXISTS", "profile:1", "age"]).await;
|
||||||
|
assert_contains(&hex1, "1", "HEXISTS age true");
|
||||||
|
let hex0 = send_cmd(&mut s, &["HEXISTS", "profile:1", "nope"]).await;
|
||||||
|
assert_contains(&hex0, "0", "HEXISTS nope false");
|
||||||
|
|
||||||
|
// HKEYS / HVALS
|
||||||
|
let hkeys = send_cmd(&mut s, &["HKEYS", "profile:1"]).await;
|
||||||
|
assert_contains(&hkeys, "name", "HKEYS includes name");
|
||||||
|
let hvals = send_cmd(&mut s, &["HVALS", "profile:1"]).await;
|
||||||
|
assert_contains(&hvals, "alice", "HVALS includes alice");
|
||||||
|
|
||||||
|
// HSETNX
|
||||||
|
let hnx0 = send_cmd(&mut s, &["HSETNX", "profile:1", "name", "bob"]).await;
|
||||||
|
assert_contains(&hnx0, "0", "HSETNX existing field -> 0");
|
||||||
|
let hnx1 = send_cmd(&mut s, &["HSETNX", "profile:1", "nickname", "ali"]).await;
|
||||||
|
assert_contains(&hnx1, "1", "HSETNX new field -> 1");
|
||||||
|
|
||||||
|
// HSCAN
|
||||||
|
let hscan = send_cmd(&mut s, &["HSCAN", "profile:1", "0", "MATCH", "n*", "COUNT", "10"]).await;
|
||||||
|
assert_contains(&hscan, "name", "HSCAN matches fields starting with n");
|
||||||
|
assert_contains(&hscan, "nickname", "HSCAN nickname present");
|
||||||
|
|
||||||
|
// HDEL
|
||||||
|
let hdel = send_cmd(&mut s, &["HDEL", "profile:1", "city", "age"]).await;
|
||||||
|
assert_contains(&hdel, "2", "HDEL removed two fields");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_05_lists_suite_including_blpop() {
|
||||||
|
let (server, port) = start_test_server("lists").await;
|
||||||
|
spawn_listener(server, port).await;
|
||||||
|
sleep(Duration::from_millis(150)).await;
|
||||||
|
|
||||||
|
let mut a = connect(port).await;
|
||||||
|
|
||||||
|
// LPUSH / RPUSH / LLEN
|
||||||
|
let lp = send_cmd(&mut a, &["LPUSH", "q:jobs", "a", "b"]).await;
|
||||||
|
assert_contains(&lp, "2", "LPUSH added 2, length 2");
|
||||||
|
|
||||||
|
let rp = send_cmd(&mut a, &["RPUSH", "q:jobs", "c"]).await;
|
||||||
|
assert_contains(&rp, "3", "RPUSH now length 3");
|
||||||
|
|
||||||
|
let llen = send_cmd(&mut a, &["LLEN", "q:jobs"]).await;
|
||||||
|
assert_contains(&llen, "3", "LLEN 3");
|
||||||
|
|
||||||
|
// LINDEX / LRANGE
|
||||||
|
let lidx = send_cmd(&mut a, &["LINDEX", "q:jobs", "0"]).await;
|
||||||
|
assert_eq_resp(&lidx, "$1\r\nb\r\n", "LINDEX q:jobs 0 should be b");
|
||||||
|
|
||||||
|
let lr = send_cmd(&mut a, &["LRANGE", "q:jobs", "0", "-1"]).await;
|
||||||
|
assert_eq_resp(&lr, "*3\r\n$1\r\nb\r\n$1\r\na\r\n$1\r\nc\r\n", "LRANGE q:jobs 0 -1 should be [b,a,c]");
|
||||||
|
|
||||||
|
// LTRIM
|
||||||
|
let ltrim = send_cmd(&mut a, &["LTRIM", "q:jobs", "0", "1"]).await;
|
||||||
|
assert_contains(<rim, "OK", "LTRIM OK");
|
||||||
|
let lr_post = send_cmd(&mut a, &["LRANGE", "q:jobs", "0", "-1"]).await;
|
||||||
|
assert_eq_resp(&lr_post, "*2\r\n$1\r\nb\r\n$1\r\na\r\n", "After LTRIM, list [b,a]");
|
||||||
|
|
||||||
|
// LREM remove first occurrence of b
|
||||||
|
let lrem = send_cmd(&mut a, &["LREM", "q:jobs", "1", "b"]).await;
|
||||||
|
assert_contains(&lrem, "1", "LREM removed 1");
|
||||||
|
|
||||||
|
// LPOP and RPOP
|
||||||
|
let lpop1 = send_cmd(&mut a, &["LPOP", "q:jobs"]).await;
|
||||||
|
assert_contains(&lpop1, "$1\r\na\r\n", "LPOP returns a");
|
||||||
|
let rpop_empty = send_cmd(&mut a, &["RPOP", "q:jobs"]).await; // empty now
|
||||||
|
assert_contains(&rpop_empty, "$-1", "RPOP on empty -> Null");
|
||||||
|
|
||||||
|
// LPOP with count on empty -> []
|
||||||
|
let lpop0 = send_cmd(&mut a, &["LPOP", "q:jobs", "2"]).await;
|
||||||
|
assert_eq_resp(&lpop0, "*0\r\n", "LPOP with count on empty returns empty array");
|
||||||
|
|
||||||
|
// BLPOP: block on one client, push from another
|
||||||
|
let c1 = connect(port).await;
|
||||||
|
let mut c2 = connect(port).await;
|
||||||
|
|
||||||
|
// Start BLPOP on c1
|
||||||
|
let blpop_task = tokio::spawn(async move {
|
||||||
|
let mut c1_local = c1;
|
||||||
|
send_cmd(&mut c1_local, &["BLPOP", "q:block", "5"]).await
|
||||||
|
});
|
||||||
|
|
||||||
|
// Give it time to register waiter
|
||||||
|
sleep(Duration::from_millis(150)).await;
|
||||||
|
|
||||||
|
// Push from c2 to wake BLPOP
|
||||||
|
let _ = send_cmd(&mut c2, &["LPUSH", "q:block", "x"]).await;
|
||||||
|
|
||||||
|
// Await BLPOP result
|
||||||
|
let blpop_res = blpop_task.await.expect("BLPOP task join");
|
||||||
|
assert_contains(&blpop_res, "q:block", "BLPOP returned key");
|
||||||
|
assert_contains(&blpop_res, "x", "BLPOP returned element");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_06_flushdb_suite() {
|
||||||
|
let (server, port) = start_test_server("flushdb").await;
|
||||||
|
spawn_listener(server, port).await;
|
||||||
|
sleep(Duration::from_millis(150)).await;
|
||||||
|
|
||||||
|
let mut s = connect(port).await;
|
||||||
|
|
||||||
|
let _ = send_cmd(&mut s, &["SET", "k1", "v1"]).await;
|
||||||
|
let _ = send_cmd(&mut s, &["HSET", "h1", "f", "v"]).await;
|
||||||
|
let _ = send_cmd(&mut s, &["LPUSH", "l1", "a"]).await;
|
||||||
|
|
||||||
|
let keys_before = send_cmd(&mut s, &["KEYS", "*"]).await;
|
||||||
|
assert_contains(&keys_before, "k1", "have string key before FLUSHDB");
|
||||||
|
assert_contains(&keys_before, "h1", "have hash key before FLUSHDB");
|
||||||
|
assert_contains(&keys_before, "l1", "have list key before FLUSHDB");
|
||||||
|
|
||||||
|
let fl = send_cmd(&mut s, &["FLUSHDB"]).await;
|
||||||
|
assert_contains(&fl, "OK", "FLUSHDB OK");
|
||||||
|
|
||||||
|
let keys_after = send_cmd(&mut s, &["KEYS", "*"]).await;
|
||||||
|
assert_eq_resp(&keys_after, "*0\r\n", "DB should be empty after FLUSHDB");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_07_age_stateless_suite() {
|
||||||
|
let (server, port) = start_test_server("age_stateless").await;
|
||||||
|
spawn_listener(server, port).await;
|
||||||
|
sleep(Duration::from_millis(150)).await;
|
||||||
|
|
||||||
|
let mut s = connect(port).await;
|
||||||
|
|
||||||
|
// GENENC -> [recipient, identity]
|
||||||
|
let gen = send_cmd(&mut s, &["AGE", "GENENC"]).await;
|
||||||
|
assert!(
|
||||||
|
gen.starts_with("*2\r\n$"),
|
||||||
|
"AGE GENENC should return array [recipient, identity], got:\n{}",
|
||||||
|
gen
|
||||||
|
);
|
||||||
|
|
||||||
|
// Parse simple RESP array of two bulk strings to extract keys
|
||||||
|
fn parse_two_bulk_array(resp: &str) -> (String, String) {
|
||||||
|
// naive parse for tests
|
||||||
|
let mut lines = resp.lines();
|
||||||
|
let _ = lines.next(); // *2
|
||||||
|
// $len
|
||||||
|
let _ = lines.next();
|
||||||
|
let recip = lines.next().unwrap_or("").to_string();
|
||||||
|
let _ = lines.next();
|
||||||
|
let ident = lines.next().unwrap_or("").to_string();
|
||||||
|
(recip, ident)
|
||||||
|
}
|
||||||
|
let (recipient, identity) = parse_two_bulk_array(&gen);
|
||||||
|
assert!(
|
||||||
|
recipient.starts_with("age1") && identity.starts_with("AGE-SECRET-KEY-1"),
|
||||||
|
"Unexpected AGE key formats.\nrecipient: {}\nidentity: {}",
|
||||||
|
recipient,
|
||||||
|
identity
|
||||||
|
);
|
||||||
|
|
||||||
|
// ENCRYPT / DECRYPT
|
||||||
|
let ct = send_cmd(&mut s, &["AGE", "ENCRYPT", &recipient, "hello world"]).await;
|
||||||
|
let ct_b64 = extract_bulk_payload(&ct).expect("Failed to parse bulk payload from ENCRYPT");
|
||||||
|
let pt = send_cmd(&mut s, &["AGE", "DECRYPT", &identity, &ct_b64]).await;
|
||||||
|
assert_contains(&pt, "hello world", "AGE DECRYPT round-trip");
|
||||||
|
|
||||||
|
// GENSIGN -> [verify_pub_b64, sign_secret_b64]
|
||||||
|
let gensign = send_cmd(&mut s, &["AGE", "GENSIGN"]).await;
|
||||||
|
let (verify_pub, sign_secret) = parse_two_bulk_array(&gensign);
|
||||||
|
assert!(
|
||||||
|
!verify_pub.is_empty() && !sign_secret.is_empty(),
|
||||||
|
"GENSIGN returned empty keys"
|
||||||
|
);
|
||||||
|
|
||||||
|
// SIGN / VERIFY
|
||||||
|
let sig = send_cmd(&mut s, &["AGE", "SIGN", &sign_secret, "msg"]).await;
|
||||||
|
let sig_b64 = extract_bulk_payload(&sig).expect("Failed to parse bulk payload from SIGN");
|
||||||
|
let v_ok = send_cmd(&mut s, &["AGE", "VERIFY", &verify_pub, "msg", &sig_b64]).await;
|
||||||
|
assert_contains(&v_ok, "1", "VERIFY should be 1 for valid signature");
|
||||||
|
|
||||||
|
let v_bad = send_cmd(&mut s, &["AGE", "VERIFY", &verify_pub, "tampered", &sig_b64]).await;
|
||||||
|
assert_contains(&v_bad, "0", "VERIFY should be 0 for invalid message/signature");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_08_age_persistent_named_suite() {
|
||||||
|
let (server, port) = start_test_server("age_persistent").await;
|
||||||
|
spawn_listener(server, port).await;
|
||||||
|
sleep(Duration::from_millis(150)).await;
|
||||||
|
|
||||||
|
let mut s = connect(port).await;
|
||||||
|
|
||||||
|
// KEYGEN + ENCRYPTNAME/DECRYPTNAME
|
||||||
|
let kg = send_cmd(&mut s, &["AGE", "KEYGEN", "app1"]).await;
|
||||||
|
assert!(
|
||||||
|
kg.starts_with("*2\r\n"),
|
||||||
|
"AGE KEYGEN should return [recipient, identity], got:\n{}",
|
||||||
|
kg
|
||||||
|
);
|
||||||
|
|
||||||
|
let ct = send_cmd(&mut s, &["AGE", "ENCRYPTNAME", "app1", "hello"]).await;
|
||||||
|
let ct_b64 = extract_bulk_payload(&ct).expect("Failed to parse bulk payload from ENCRYPTNAME");
|
||||||
|
let pt = send_cmd(&mut s, &["AGE", "DECRYPTNAME", "app1", &ct_b64]).await;
|
||||||
|
assert_contains(&pt, "hello", "DECRYPTNAME round-trip");
|
||||||
|
|
||||||
|
// SIGNKEYGEN + SIGNNAME/VERIFYNAME
|
||||||
|
let skg = send_cmd(&mut s, &["AGE", "SIGNKEYGEN", "app1"]).await;
|
||||||
|
assert!(
|
||||||
|
skg.starts_with("*2\r\n"),
|
||||||
|
"AGE SIGNKEYGEN should return [verify_pub, sign_secret], got:\n{}",
|
||||||
|
skg
|
||||||
|
);
|
||||||
|
|
||||||
|
let sig = send_cmd(&mut s, &["AGE", "SIGNNAME", "app1", "m"] ).await;
|
||||||
|
let sig_b64 = extract_bulk_payload(&sig).expect("Failed to parse bulk payload from SIGNNAME");
|
||||||
|
let v1 = send_cmd(&mut s, &["AGE", "VERIFYNAME", "app1", "m", &sig_b64]).await;
|
||||||
|
assert_contains(&v1, "1", "VERIFYNAME valid => 1");
|
||||||
|
|
||||||
|
let v0 = send_cmd(&mut s, &["AGE", "VERIFYNAME", "app1", "bad", &sig_b64]).await;
|
||||||
|
assert_contains(&v0, "0", "VERIFYNAME invalid => 0");
|
||||||
|
|
||||||
|
// AGE LIST
|
||||||
|
let lst = send_cmd(&mut s, &["AGE", "LIST"]).await;
|
||||||
|
assert_contains(&lst, "encpub", "AGE LIST label encpub");
|
||||||
|
assert_contains(&lst, "app1", "AGE LIST includes app1");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_10_expire_pexpire_persist() {
|
||||||
|
let (server, port) = start_test_server("expire_suite").await;
|
||||||
|
spawn_listener(server, port).await;
|
||||||
|
sleep(Duration::from_millis(150)).await;
|
||||||
|
|
||||||
|
let mut s = connect(port).await;
|
||||||
|
|
||||||
|
// EXPIRE: seconds
|
||||||
|
let _ = send_cmd(&mut s, &["SET", "exp:s", "v"]).await;
|
||||||
|
let ex = send_cmd(&mut s, &["EXPIRE", "exp:s", "1"]).await;
|
||||||
|
assert_contains(&ex, "1", "EXPIRE exp:s 1 -> 1 (applied)");
|
||||||
|
let ttl1 = send_cmd(&mut s, &["TTL", "exp:s"]).await;
|
||||||
|
assert!(
|
||||||
|
ttl1.contains("1") || ttl1.contains("0"),
|
||||||
|
"TTL exp:s should be 1 or 0, got: {}",
|
||||||
|
ttl1
|
||||||
|
);
|
||||||
|
sleep(Duration::from_millis(1100)).await;
|
||||||
|
let get_after = send_cmd(&mut s, &["GET", "exp:s"]).await;
|
||||||
|
assert_contains(&get_after, "$-1", "GET after expiry should be Null");
|
||||||
|
let ttl_after = send_cmd(&mut s, &["TTL", "exp:s"]).await;
|
||||||
|
assert_contains(&ttl_after, "-2", "TTL after expiry -> -2");
|
||||||
|
let exists_after = send_cmd(&mut s, &["EXISTS", "exp:s"]).await;
|
||||||
|
assert_contains(&exists_after, "0", "EXISTS after expiry -> 0");
|
||||||
|
|
||||||
|
// PEXPIRE: milliseconds
|
||||||
|
let _ = send_cmd(&mut s, &["SET", "exp:ms", "v"]).await;
|
||||||
|
let pex = send_cmd(&mut s, &["PEXPIRE", "exp:ms", "1500"]).await;
|
||||||
|
assert_contains(&pex, "1", "PEXPIRE exp:ms 1500 -> 1 (applied)");
|
||||||
|
let ttl_ms1 = send_cmd(&mut s, &["TTL", "exp:ms"]).await;
|
||||||
|
assert!(
|
||||||
|
ttl_ms1.contains("1") || ttl_ms1.contains("0"),
|
||||||
|
"TTL exp:ms should be 1 or 0 soon after PEXPIRE, got: {}",
|
||||||
|
ttl_ms1
|
||||||
|
);
|
||||||
|
sleep(Duration::from_millis(1600)).await;
|
||||||
|
let exists_ms_after = send_cmd(&mut s, &["EXISTS", "exp:ms"]).await;
|
||||||
|
assert_contains(&exists_ms_after, "0", "EXISTS exp:ms after ms expiry -> 0");
|
||||||
|
|
||||||
|
// PERSIST: remove expiration
|
||||||
|
let _ = send_cmd(&mut s, &["SET", "exp:persist", "v"]).await;
|
||||||
|
let _ = send_cmd(&mut s, &["EXPIRE", "exp:persist", "5"]).await;
|
||||||
|
let ttl_pre = send_cmd(&mut s, &["TTL", "exp:persist"]).await;
|
||||||
|
assert!(
|
||||||
|
ttl_pre.contains("5") || ttl_pre.contains("4") || ttl_pre.contains("3") || ttl_pre.contains("2") || ttl_pre.contains("1") || ttl_pre.contains("0"),
|
||||||
|
"TTL exp:persist should be >=0 before persist, got: {}",
|
||||||
|
ttl_pre
|
||||||
|
);
|
||||||
|
let persist1 = send_cmd(&mut s, &["PERSIST", "exp:persist"]).await;
|
||||||
|
assert_contains(&persist1, "1", "PERSIST should remove expiration");
|
||||||
|
let ttl_post = send_cmd(&mut s, &["TTL", "exp:persist"]).await;
|
||||||
|
assert_contains(&ttl_post, "-1", "TTL after PERSIST -> -1 (no expiration)");
|
||||||
|
// Second persist should return 0 (nothing to remove)
|
||||||
|
let persist2 = send_cmd(&mut s, &["PERSIST", "exp:persist"]).await;
|
||||||
|
assert_contains(&persist2, "0", "PERSIST again -> 0 (no expiration to remove)");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_09_mget_mset_and_variadic_exists_del() {
|
||||||
|
let (server, port) = start_test_server("mget_mset_variadic").await;
|
||||||
|
spawn_listener(server, port).await;
|
||||||
|
sleep(Duration::from_millis(150)).await;
|
||||||
|
|
||||||
|
let mut s = connect(port).await;
|
||||||
|
|
||||||
|
// MSET multiple keys
|
||||||
|
let mset = send_cmd(&mut s, &["MSET", "k1", "v1", "k2", "v2", "k3", "v3"]).await;
|
||||||
|
assert_contains(&mset, "OK", "MSET k1 v1 k2 v2 k3 v3 -> OK");
|
||||||
|
|
||||||
|
// MGET should return values and Null for missing
|
||||||
|
let mget = send_cmd(&mut s, &["MGET", "k1", "k2", "nope", "k3"]).await;
|
||||||
|
// Expect an array with 4 entries; verify payloads
|
||||||
|
assert_contains(&mget, "v1", "MGET k1");
|
||||||
|
assert_contains(&mget, "v2", "MGET k2");
|
||||||
|
assert_contains(&mget, "v3", "MGET k3");
|
||||||
|
assert_contains(&mget, "$-1", "MGET missing returns Null");
|
||||||
|
|
||||||
|
// EXISTS variadic: count how many exist
|
||||||
|
let exists_multi = send_cmd(&mut s, &["EXISTS", "k1", "nope", "k3"]).await;
|
||||||
|
// Server returns SimpleString numeric, e.g. +2
|
||||||
|
assert_contains(&exists_multi, "2", "EXISTS k1 nope k3 -> 2");
|
||||||
|
|
||||||
|
// DEL variadic: delete multiple keys, return count deleted
|
||||||
|
let del_multi = send_cmd(&mut s, &["DEL", "k1", "k3", "nope"]).await;
|
||||||
|
assert_contains(&del_multi, "2", "DEL k1 k3 nope -> 2");
|
||||||
|
|
||||||
|
// Verify deletion
|
||||||
|
let exists_after = send_cmd(&mut s, &["EXISTS", "k1", "k3"]).await;
|
||||||
|
assert_contains(&exists_after, "0", "EXISTS k1 k3 after DEL -> 0");
|
||||||
|
|
||||||
|
// MGET after deletion should include Nulls for deleted keys
|
||||||
|
let mget_after = send_cmd(&mut s, &["MGET", "k1", "k2", "k3"]).await;
|
||||||
|
assert_contains(&mget_after, "$-1", "MGET k1 after DEL -> Null");
|
||||||
|
assert_contains(&mget_after, "v2", "MGET k2 remains");
|
||||||
|
assert_contains(&mget_after, "$-1", "MGET k3 after DEL -> Null");
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user