Implemented EXPIREAT and PEXPIREAT

This commit is contained in:
Maxime Van Hees 2025-08-19 16:21:43 +02:00
parent b9a9f3e6d6
commit 892e6e2b90
3 changed files with 117 additions and 0 deletions

View File

@ -44,6 +44,8 @@ pub enum Cmd {
Ttl(String), Ttl(String),
Expire(String, i64), Expire(String, i64),
PExpire(String, i64), PExpire(String, i64),
ExpireAt(String, i64),
PExpireAt(String, i64),
Persist(String), Persist(String),
Exists(String), Exists(String),
ExistsMulti(Vec<String>), ExistsMulti(Vec<String>),
@ -417,6 +419,20 @@ impl Cmd {
let ms = cmd[2].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?; let ms = cmd[2].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
Cmd::PExpire(cmd[1].clone(), ms) Cmd::PExpire(cmd[1].clone(), ms)
} }
"expireat" => {
if cmd.len() != 3 {
return Err(DBError("wrong number of arguments for EXPIREAT command".to_string()));
}
let ts = cmd[2].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
Cmd::ExpireAt(cmd[1].clone(), ts)
}
"pexpireat" => {
if cmd.len() != 3 {
return Err(DBError("wrong number of arguments for PEXPIREAT command".to_string()));
}
let ts_ms = cmd[2].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
Cmd::PExpireAt(cmd[1].clone(), ts_ms)
}
"persist" => { "persist" => {
if cmd.len() != 2 { if cmd.len() != 2 {
return Err(DBError("wrong number of arguments for PERSIST command".to_string())); return Err(DBError("wrong number of arguments for PERSIST command".to_string()));
@ -676,6 +692,8 @@ impl Cmd {
Cmd::Ttl(key) => ttl_cmd(server, &key).await, Cmd::Ttl(key) => ttl_cmd(server, &key).await,
Cmd::Expire(key, secs) => expire_cmd(server, &key, secs).await, Cmd::Expire(key, secs) => expire_cmd(server, &key, secs).await,
Cmd::PExpire(key, ms) => pexpire_cmd(server, &key, ms).await, Cmd::PExpire(key, ms) => pexpire_cmd(server, &key, ms).await,
Cmd::ExpireAt(key, ts_secs) => expireat_cmd(server, &key, ts_secs).await,
Cmd::PExpireAt(key, ts_ms) => pexpireat_cmd(server, &key, ts_ms).await,
Cmd::Persist(key) => persist_cmd(server, &key).await, Cmd::Persist(key) => persist_cmd(server, &key).await,
Cmd::Exists(key) => exists_cmd(server, &key).await, Cmd::Exists(key) => exists_cmd(server, &key).await,
Cmd::ExistsMulti(keys) => exists_multi_cmd(server, &keys).await, Cmd::ExistsMulti(keys) => exists_multi_cmd(server, &keys).await,
@ -1456,6 +1474,21 @@ async fn persist_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
Err(e) => Ok(Protocol::err(&e.0)), Err(e) => Ok(Protocol::err(&e.0)),
} }
} }
// EXPIREAT key timestamp-seconds -> 1 if timeout set, 0 otherwise
async fn expireat_cmd(server: &Server, key: &str, ts_secs: i64) -> Result<Protocol, DBError> {
match server.current_storage()?.expire_at_seconds(key, ts_secs) {
Ok(applied) => Ok(Protocol::SimpleString(if applied { "1" } else { "0" }.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
// PEXPIREAT key timestamp-milliseconds -> 1 if timeout set, 0 otherwise
async fn pexpireat_cmd(server: &Server, key: &str, ts_ms: i64) -> Result<Protocol, DBError> {
match server.current_storage()?.pexpire_at_millis(key, ts_ms) {
Ok(applied) => Ok(Protocol::SimpleString(if applied { "1" } else { "0" }.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn client_setname_cmd(server: &mut Server, name: &str) -> Result<Protocol, DBError> { async fn client_setname_cmd(server: &mut Server, name: &str) -> Result<Protocol, DBError> {
server.client_name = Some(name.to_string()); server.client_name = Some(name.to_string());

View File

@ -164,6 +164,50 @@ impl Storage {
write_txn.commit()?; write_txn.commit()?;
Ok(removed) Ok(removed)
} }
// Absolute EXPIREAT in seconds since epoch
// Returns true if applied (key exists and is string), false otherwise
pub fn expire_at_seconds(&self, key: &str, ts_secs: i64) -> Result<bool, DBError> {
let mut applied = false;
let write_txn = self.db.begin_write()?;
{
let types_table = write_txn.open_table(TYPES_TABLE)?;
let is_string = types_table
.get(key)?
.map(|v| v.value() == "string")
.unwrap_or(false);
if is_string {
let mut expiration_table = write_txn.open_table(EXPIRATION_TABLE)?;
let expires_at_ms: u128 = if ts_secs <= 0 { 0 } else { (ts_secs as u128) * 1000 };
expiration_table.insert(key, &((expires_at_ms as u64)))?;
applied = true;
}
}
write_txn.commit()?;
Ok(applied)
}
// Absolute PEXPIREAT in milliseconds since epoch
// Returns true if applied (key exists and is string), false otherwise
pub fn pexpire_at_millis(&self, key: &str, ts_ms: i64) -> Result<bool, DBError> {
let mut applied = false;
let write_txn = self.db.begin_write()?;
{
let types_table = write_txn.open_table(TYPES_TABLE)?;
let is_string = types_table
.get(key)?
.map(|v| v.value() == "string")
.unwrap_or(false);
if is_string {
let mut expiration_table = write_txn.open_table(EXPIRATION_TABLE)?;
let expires_at_ms: u128 = if ts_ms <= 0 { 0 } else { ts_ms as u128 };
expiration_table.insert(key, &((expires_at_ms as u64)))?;
applied = true;
}
}
write_txn.commit()?;
Ok(applied)
}
} }
// Utility function for glob pattern matching // Utility function for glob pattern matching

View File

@ -850,3 +850,43 @@ async fn test_13_dbsize() {
let n_final = send_cmd(&mut s, &["DBSIZE"]).await; let n_final = send_cmd(&mut s, &["DBSIZE"]).await;
assert_contains(&n_final, "0", "DBSIZE after deleting all keys should be 0"); assert_contains(&n_final, "0", "DBSIZE after deleting all keys should be 0");
} }
#[tokio::test]
async fn test_14_expireat_pexpireat() {
use std::time::{SystemTime, UNIX_EPOCH};
let (server, port) = start_test_server("expireat_suite").await;
spawn_listener(server, port).await;
sleep(Duration::from_millis(150)).await;
let mut s = connect(port).await;
// EXPIREAT: seconds since epoch
let now_secs = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() as i64;
let _ = send_cmd(&mut s, &["SET", "exp:at:s", "v"]).await;
let exat = send_cmd(&mut s, &["EXPIREAT", "exp:at:s", &format!("{}", now_secs + 1)]).await;
assert_contains(&exat, "1", "EXPIREAT exp:at:s now+1s -> 1 (applied)");
let ttl1 = send_cmd(&mut s, &["TTL", "exp:at:s"]).await;
assert!(
ttl1.contains("1") || ttl1.contains("0"),
"TTL exp:at:s should be 1 or 0 shortly after EXPIREAT, got: {}",
ttl1
);
sleep(Duration::from_millis(1200)).await;
let exists_after_exat = send_cmd(&mut s, &["EXISTS", "exp:at:s"]).await;
assert_contains(&exists_after_exat, "0", "EXISTS exp:at:s after EXPIREAT expiry -> 0");
// PEXPIREAT: milliseconds since epoch
let now_ms = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as i64;
let _ = send_cmd(&mut s, &["SET", "exp:at:ms", "v"]).await;
let pexat = send_cmd(&mut s, &["PEXPIREAT", "exp:at:ms", &format!("{}", now_ms + 450)]).await;
assert_contains(&pexat, "1", "PEXPIREAT exp:at:ms now+450ms -> 1 (applied)");
let ttl2 = send_cmd(&mut s, &["TTL", "exp:at:ms"]).await;
assert!(
ttl2.contains("0") || ttl2.contains("1"),
"TTL exp:at:ms should be 0..1 soon after PEXPIREAT, got: {}",
ttl2
);
sleep(Duration::from_millis(600)).await;
let exists_after_pexat = send_cmd(&mut s, &["EXISTS", "exp:at:ms"]).await;
assert_contains(&exists_after_pexat, "0", "EXISTS exp:at:ms after PEXPIREAT expiry -> 0");
}