Files
herodb/src/cmd.rs

2349 lines
107 KiB
Rust

use crate::{error::DBError, protocol::Protocol, server::Server, embedding::{EmbeddingConfig, EmbeddingProvider}};
use tokio::time::{timeout, Duration};
use futures::future::select_all;
#[derive(Debug, Clone)]
pub enum Cmd {
Ping,
Echo(String),
Select(u64, Option<String>), // db_index, optional_key
Get(String),
Set(String, String),
SetPx(String, String, u128),
SetEx(String, String, u128),
// Advanced SET with options: (key, value, ex_ms, nx, xx, get)
SetOpts(String, String, Option<u128>, bool, bool, bool),
MGet(Vec<String>),
MSet(Vec<(String, String)>),
Keys,
DbSize,
ConfigGet(String),
Info(Option<String>),
Del(String),
Type(String),
Incr(String),
Multi,
Exec,
Discard,
// Hash commands
HSet(String, Vec<(String, String)>),
HGet(String, String),
HGetAll(String),
HDel(String, Vec<String>),
HExists(String, String),
HKeys(String),
HVals(String),
HLen(String),
HMGet(String, Vec<String>),
HSetNx(String, String, String),
HIncrBy(String, String, i64),
HIncrByFloat(String, String, f64),
HScan(String, u64, Option<String>, Option<u64>), // key, cursor, pattern, count
Scan(u64, Option<String>, Option<u64>), // cursor, pattern, count
Ttl(String),
Expire(String, i64),
PExpire(String, i64),
ExpireAt(String, i64),
PExpireAt(String, i64),
Persist(String),
Exists(String),
ExistsMulti(Vec<String>),
DelMulti(Vec<String>),
Quit,
Client(Vec<String>),
ClientSetName(String),
ClientGetName,
Command(Vec<String>),
// List commands
LPush(String, Vec<String>),
RPush(String, Vec<String>),
LPop(String, Option<u64>),
RPop(String, Option<u64>),
BLPop(Vec<String>, f64),
BRPop(Vec<String>, f64),
LLen(String),
LRem(String, i64, String),
LTrim(String, i64, i64),
LIndex(String, i64),
LRange(String, i64, i64),
FlushDb,
Unknow(String),
// AGE (rage) commands — stateless
AgeGenEnc,
AgeGenSign,
AgeGenKey, // unified stateless: returns [verify_b64, signpriv_b64, x25519_pub_b64, x25519_sec_b64]
AgeEncrypt(String, String), // recipient, message
AgeDecrypt(String, String), // identity, ciphertext_b64
AgeSign(String, String), // signing_secret, message
AgeVerify(String, String, String), // verify_pub, message, signature_b64
// Persistent named-key commands
AgeKeygen(String), // name
AgeSignKeygen(String), // name
AgeEncryptName(String, String), // name, message
AgeDecryptName(String, String), // name, ciphertext_b64
AgeSignName(String, String), // name, message
AgeVerifyName(String, String, String), // name, message, signature_b64
AgeList,
// SYM (symmetric) commands — stateless
// Raw 32-byte key provided as base64; ciphertext returned as base64
SymKeygen,
SymEncrypt(String, String), // key_b64, message
SymDecrypt(String, String), // key_b64, ciphertext_b64
// Full-text search commands with schema support
FtCreate {
index_name: String,
schema: Vec<(String, String, Vec<String>)>, // (field_name, field_type, options)
},
FtAdd {
index_name: String,
doc_id: String,
score: f64,
fields: std::collections::HashMap<String, String>,
},
FtSearch {
index_name: String,
query: String,
filters: Vec<(String, String)>, // field, value pairs
limit: Option<usize>,
offset: Option<usize>,
return_fields: Option<Vec<String>>,
},
FtDel(String, String), // index_name, doc_id
FtInfo(String), // index_name
FtDrop(String), // index_name
FtAlter {
index_name: String,
field_name: String,
field_type: String,
options: Vec<String>,
},
FtAggregate {
index_name: String,
query: String,
group_by: Vec<String>,
reducers: Vec<String>,
},
// LanceDB text-first commands (no user-provided vectors)
LanceCreate {
name: String,
dim: usize,
},
LanceStoreText {
name: String,
id: String,
text: String,
meta: Vec<(String, String)>,
},
LanceSearchText {
name: String,
text: String,
k: usize,
filter: Option<String>,
return_fields: Option<Vec<String>>,
},
LanceCreateIndex {
name: String,
index_type: String,
params: Vec<(String, String)>,
},
// Embedding configuration per dataset
LanceEmbeddingConfigSet {
name: String,
provider: String,
model: String,
params: Vec<(String, String)>,
},
LanceEmbeddingConfigGet {
name: String,
},
LanceList,
LanceInfo {
name: String,
},
LanceDel {
name: String,
id: String,
},
LanceDrop {
name: String,
}
}
impl Cmd {
pub fn from(s: &str) -> Result<(Self, Protocol, &str), DBError> {
let (protocol, remaining) = Protocol::from(s)?;
match protocol.clone() {
Protocol::Array(p) => {
let cmd = p.into_iter().map(|x| x.decode()).collect::<Vec<_>>();
if cmd.is_empty() {
return Err(DBError("cmd length is 0".to_string()));
}
Ok((
match cmd[0].to_lowercase().as_str() {
"select" => {
if cmd.len() < 2 || cmd.len() > 4 {
return Err(DBError("wrong number of arguments for SELECT".to_string()));
}
let idx = cmd[1].parse::<u64>().map_err(|_| DBError("ERR DB index is not an integer".to_string()))?;
let key = if cmd.len() == 4 && cmd[2].to_lowercase() == "key" {
Some(cmd[3].clone())
} else if cmd.len() == 2 {
None
} else {
return Err(DBError("ERR syntax error".to_string()));
};
Cmd::Select(idx, key)
}
"echo" => Cmd::Echo(cmd[1].clone()),
"ping" => Cmd::Ping,
"get" => Cmd::Get(cmd[1].clone()),
"set" => {
if cmd.len() < 3 {
return Err(DBError("wrong number of arguments for SET".to_string()));
}
let key = cmd[1].clone();
let val = cmd[2].clone();
// Parse optional flags: EX sec | PX ms | NX | XX | GET
let mut ex_ms: Option<u128> = None;
let mut nx = false;
let mut xx = false;
let mut getflag = false;
let mut i = 3;
while i < cmd.len() {
match cmd[i].to_lowercase().as_str() {
"ex" => {
if i + 1 >= cmd.len() {
return Err(DBError("ERR syntax error".to_string()));
}
let secs: u128 = cmd[i + 1].parse().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
ex_ms = Some(secs * 1000);
i += 2;
}
"px" => {
if i + 1 >= cmd.len() {
return Err(DBError("ERR syntax error".to_string()));
}
let ms: u128 = cmd[i + 1].parse().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
ex_ms = Some(ms);
i += 2;
}
"nx" => { nx = true; i += 1; }
"xx" => { xx = true; i += 1; }
"get" => { getflag = true; i += 1; }
_ => {
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
}
}
}
// If no options, keep legacy behavior
if ex_ms.is_none() && !nx && !xx && !getflag {
Cmd::Set(key, val)
} else {
Cmd::SetOpts(key, val, ex_ms, nx, xx, getflag)
}
}
"setex" => {
if cmd.len() != 4 {
return Err(DBError(format!("wrong number of arguments for SETEX command")));
}
Cmd::SetEx(cmd[1].clone(), cmd[3].clone(), cmd[2].parse().unwrap())
}
"mget" => {
if cmd.len() < 2 {
return Err(DBError("wrong number of arguments for MGET command".to_string()));
}
Cmd::MGet(cmd[1..].to_vec())
}
"mset" => {
if cmd.len() < 3 || ((cmd.len() - 1) % 2 != 0) {
return Err(DBError("wrong number of arguments for MSET command".to_string()));
}
let mut pairs = Vec::new();
let mut i = 1;
while i + 1 < cmd.len() {
pairs.push((cmd[i].clone(), cmd[i + 1].clone()));
i += 2;
}
Cmd::MSet(pairs)
}
"config" => {
if cmd.len() != 3 || cmd[1].to_lowercase() != "get" {
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
} else {
Cmd::ConfigGet(cmd[2].clone())
}
}
"keys" => {
if cmd.len() != 2 || cmd[1] != "*" {
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
} else {
Cmd::Keys
}
}
"dbsize" => {
if cmd.len() != 1 {
return Err(DBError(format!("wrong number of arguments for DBSIZE command")));
}
Cmd::DbSize
}
"info" => {
let section = if cmd.len() == 2 {
Some(cmd[1].clone())
} else {
None
};
Cmd::Info(section)
}
"del" => {
if cmd.len() < 2 {
return Err(DBError(format!("wrong number of arguments for DEL command")));
}
if cmd.len() == 2 {
Cmd::Del(cmd[1].clone())
} else {
Cmd::DelMulti(cmd[1..].to_vec())
}
}
"type" => {
if cmd.len() != 2 {
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
}
Cmd::Type(cmd[1].clone())
}
"incr" => {
if cmd.len() != 2 {
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
}
Cmd::Incr(cmd[1].clone())
}
"multi" => {
if cmd.len() != 1 {
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
}
Cmd::Multi
}
"exec" => {
if cmd.len() != 1 {
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
}
Cmd::Exec
}
"discard" => Cmd::Discard,
// Hash commands
"hset" => {
if cmd.len() < 4 || (cmd.len() - 2) % 2 != 0 {
return Err(DBError(format!("wrong number of arguments for HSET command")));
}
let mut pairs = Vec::new();
let mut i = 2;
while i + 1 < cmd.len() {
pairs.push((cmd[i].clone(), cmd[i + 1].clone()));
i += 2;
}
Cmd::HSet(cmd[1].clone(), pairs)
}
"hget" => {
if cmd.len() != 3 {
return Err(DBError(format!("wrong number of arguments for HGET command")));
}
Cmd::HGet(cmd[1].clone(), cmd[2].clone())
}
"hgetall" => {
if cmd.len() != 2 {
return Err(DBError(format!("wrong number of arguments for HGETALL command")));
}
Cmd::HGetAll(cmd[1].clone())
}
"hdel" => {
if cmd.len() < 3 {
return Err(DBError(format!("wrong number of arguments for HDEL command")));
}
Cmd::HDel(cmd[1].clone(), cmd[2..].to_vec())
}
"hexists" => {
if cmd.len() != 3 {
return Err(DBError(format!("wrong number of arguments for HEXISTS command")));
}
Cmd::HExists(cmd[1].clone(), cmd[2].clone())
}
"hkeys" => {
if cmd.len() != 2 {
return Err(DBError(format!("wrong number of arguments for HKEYS command")));
}
Cmd::HKeys(cmd[1].clone())
}
"hvals" => {
if cmd.len() != 2 {
return Err(DBError(format!("wrong number of arguments for HVALS command")));
}
Cmd::HVals(cmd[1].clone())
}
"hlen" => {
if cmd.len() != 2 {
return Err(DBError(format!("wrong number of arguments for HLEN command")));
}
Cmd::HLen(cmd[1].clone())
}
"hmget" => {
if cmd.len() < 3 {
return Err(DBError(format!("wrong number of arguments for HMGET command")));
}
Cmd::HMGet(cmd[1].clone(), cmd[2..].to_vec())
}
"hsetnx" => {
if cmd.len() != 4 {
return Err(DBError(format!("wrong number of arguments for HSETNX command")));
}
Cmd::HSetNx(cmd[1].clone(), cmd[2].clone(), cmd[3].clone())
}
"hincrby" => {
if cmd.len() != 4 {
return Err(DBError(format!("wrong number of arguments for HINCRBY command")));
}
let delta = cmd[3].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
Cmd::HIncrBy(cmd[1].clone(), cmd[2].clone(), delta)
}
"hincrbyfloat" => {
if cmd.len() != 4 {
return Err(DBError(format!("wrong number of arguments for HINCRBYFLOAT command")));
}
let delta = cmd[3].parse::<f64>().map_err(|_| DBError("ERR value is not a valid float".to_string()))?;
Cmd::HIncrByFloat(cmd[1].clone(), cmd[2].clone(), delta)
}
"hscan" => {
if cmd.len() < 3 {
return Err(DBError(format!("wrong number of arguments for HSCAN command")));
}
let key = cmd[1].clone();
let cursor = cmd[2].parse::<u64>().map_err(|_|
DBError("ERR invalid cursor".to_string()))?;
let mut pattern = None;
let mut count = None;
let mut i = 3;
while i < cmd.len() {
match cmd[i].to_lowercase().as_str() {
"match" => {
if i + 1 >= cmd.len() {
return Err(DBError("ERR syntax error".to_string()));
}
pattern = Some(cmd[i + 1].clone());
i += 2;
}
"count" => {
if i + 1 >= cmd.len() {
return Err(DBError("ERR syntax error".to_string()));
}
count = Some(cmd[i + 1].parse::<u64>().map_err(|_|
DBError("ERR value is not an integer or out of range".to_string()))?);
i += 2;
}
_ => {
return Err(DBError(format!("ERR syntax error")));
}
}
}
Cmd::HScan(key, cursor, pattern, count)
}
"scan" => {
if cmd.len() < 2 {
return Err(DBError(format!("wrong number of arguments for SCAN command")));
}
let cursor = cmd[1].parse::<u64>().map_err(|_|
DBError("ERR invalid cursor".to_string()))?;
let mut pattern = None;
let mut count = None;
let mut i = 2;
while i < cmd.len() {
match cmd[i].to_lowercase().as_str() {
"match" => {
if i + 1 >= cmd.len() {
return Err(DBError("ERR syntax error".to_string()));
}
pattern = Some(cmd[i + 1].clone());
i += 2;
}
"count" => {
if i + 1 >= cmd.len() {
return Err(DBError("ERR syntax error".to_string()));
}
count = Some(cmd[i + 1].parse::<u64>().map_err(|_|
DBError("ERR value is not an integer or out of range".to_string()))?);
i += 2;
}
_ => {
return Err(DBError(format!("ERR syntax error")));
}
}
}
Cmd::Scan(cursor, pattern, count)
}
"ttl" => {
if cmd.len() != 2 {
return Err(DBError(format!("wrong number of arguments for TTL command")));
}
Cmd::Ttl(cmd[1].clone())
}
"expire" => {
if cmd.len() != 3 {
return Err(DBError("wrong number of arguments for EXPIRE command".to_string()));
}
let secs = cmd[2].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
Cmd::Expire(cmd[1].clone(), secs)
}
"pexpire" => {
if cmd.len() != 3 {
return Err(DBError("wrong number of arguments for PEXPIRE command".to_string()));
}
let ms = cmd[2].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
Cmd::PExpire(cmd[1].clone(), ms)
}
"expireat" => {
if cmd.len() != 3 {
return Err(DBError("wrong number of arguments for EXPIREAT command".to_string()));
}
let ts = cmd[2].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
Cmd::ExpireAt(cmd[1].clone(), ts)
}
"pexpireat" => {
if cmd.len() != 3 {
return Err(DBError("wrong number of arguments for PEXPIREAT command".to_string()));
}
let ts_ms = cmd[2].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
Cmd::PExpireAt(cmd[1].clone(), ts_ms)
}
"persist" => {
if cmd.len() != 2 {
return Err(DBError("wrong number of arguments for PERSIST command".to_string()));
}
Cmd::Persist(cmd[1].clone())
}
"exists" => {
if cmd.len() < 2 {
return Err(DBError(format!("wrong number of arguments for EXISTS command")));
}
if cmd.len() == 2 {
Cmd::Exists(cmd[1].clone())
} else {
Cmd::ExistsMulti(cmd[1..].to_vec())
}
}
"quit" => {
if cmd.len() != 1 {
return Err(DBError(format!("wrong number of arguments for QUIT command")));
}
Cmd::Quit
}
"client" => {
if cmd.len() > 1 {
match cmd[1].to_lowercase().as_str() {
"setname" => {
if cmd.len() == 3 {
Cmd::ClientSetName(cmd[2].clone())
} else {
return Err(DBError("wrong number of arguments for 'client setname' command".to_string()));
}
}
"getname" => {
if cmd.len() == 2 {
Cmd::ClientGetName
} else {
return Err(DBError("wrong number of arguments for 'client getname' command".to_string()));
}
}
_ => Cmd::Client(cmd[1..].to_vec()),
}
} else {
Cmd::Client(vec![])
}
}
"command" => {
let args = if cmd.len() > 1 { cmd[1..].to_vec() } else { vec![] };
Cmd::Command(args)
}
"lpush" => {
if cmd.len() < 3 {
return Err(DBError(format!("wrong number of arguments for LPUSH command")));
}
Cmd::LPush(cmd[1].clone(), cmd[2..].to_vec())
}
"rpush" => {
if cmd.len() < 3 {
return Err(DBError(format!("wrong number of arguments for RPUSH command")));
}
Cmd::RPush(cmd[1].clone(), cmd[2..].to_vec())
}
"lpop" => {
if cmd.len() < 2 || cmd.len() > 3 {
return Err(DBError(format!("wrong number of arguments for LPOP command")));
}
let count = if cmd.len() == 3 {
Some(cmd[2].parse::<u64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?)
} else {
None
};
Cmd::LPop(cmd[1].clone(), count)
}
"rpop" => {
if cmd.len() < 2 || cmd.len() > 3 {
return Err(DBError(format!("wrong number of arguments for RPOP command")));
}
let count = if cmd.len() == 3 {
Some(cmd[2].parse::<u64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?)
} else {
None
};
Cmd::RPop(cmd[1].clone(), count)
}
"blpop" => {
if cmd.len() < 3 {
return Err(DBError(format!("wrong number of arguments for BLPOP command")));
}
// keys are all but the last argument
let keys = cmd[1..cmd.len()-1].to_vec();
let timeout_f = cmd[cmd.len()-1]
.parse::<f64>()
.map_err(|_| DBError("ERR timeout is not a number".to_string()))?;
Cmd::BLPop(keys, timeout_f)
}
"brpop" => {
if cmd.len() < 3 {
return Err(DBError(format!("wrong number of arguments for BRPOP command")));
}
// keys are all but the last argument
let keys = cmd[1..cmd.len()-1].to_vec();
let timeout_f = cmd[cmd.len()-1]
.parse::<f64>()
.map_err(|_| DBError("ERR timeout is not a number".to_string()))?;
Cmd::BRPop(keys, timeout_f)
}
"llen" => {
if cmd.len() != 2 {
return Err(DBError(format!("wrong number of arguments for LLEN command")));
}
Cmd::LLen(cmd[1].clone())
}
"lrem" => {
if cmd.len() != 4 {
return Err(DBError(format!("wrong number of arguments for LREM command")));
}
let count = cmd[2].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
Cmd::LRem(cmd[1].clone(), count, cmd[3].clone())
}
"ltrim" => {
if cmd.len() != 4 {
return Err(DBError(format!("wrong number of arguments for LTRIM command")));
}
let start = cmd[2].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
let stop = cmd[3].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
Cmd::LTrim(cmd[1].clone(), start, stop)
}
"lindex" => {
if cmd.len() != 3 {
return Err(DBError(format!("wrong number of arguments for LINDEX command")));
}
let index = cmd[2].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
Cmd::LIndex(cmd[1].clone(), index)
}
"lrange" => {
if cmd.len() != 4 {
return Err(DBError(format!("wrong number of arguments for LRANGE command")));
}
let start = cmd[2].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
let stop = cmd[3].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
Cmd::LRange(cmd[1].clone(), start, stop)
}
"flushdb" => {
if cmd.len() != 1 {
return Err(DBError("wrong number of arguments for FLUSHDB command".to_string()));
}
Cmd::FlushDb
}
"age" => {
if cmd.len() < 2 {
return Err(DBError("wrong number of arguments for AGE".to_string()));
}
match cmd[1].to_lowercase().as_str() {
// stateless
"genenc" => { if cmd.len() != 2 { return Err(DBError("AGE GENENC takes no args".to_string())); }
Cmd::AgeGenEnc }
"gensign" => { if cmd.len() != 2 { return Err(DBError("AGE GENSIGN takes no args".to_string())); }
Cmd::AgeGenSign }
"genkey" => { if cmd.len() != 2 { return Err(DBError("AGE GENKEY takes no args".to_string())); }
Cmd::AgeGenKey }
"encrypt" => { if cmd.len() != 4 { return Err(DBError("AGE ENCRYPT <recipient> <message>".to_string())); }
Cmd::AgeEncrypt(cmd[2].clone(), cmd[3].clone()) }
"decrypt" => { if cmd.len() != 4 { return Err(DBError("AGE DECRYPT <identity> <ciphertext_b64>".to_string())); }
Cmd::AgeDecrypt(cmd[2].clone(), cmd[3].clone()) }
"sign" => { if cmd.len() != 4 { return Err(DBError("AGE SIGN <signing_secret> <message>".to_string())); }
Cmd::AgeSign(cmd[2].clone(), cmd[3].clone()) }
"verify" => { if cmd.len() != 5 { return Err(DBError("AGE VERIFY <verify_pub> <message> <signature_b64>".to_string())); }
Cmd::AgeVerify(cmd[2].clone(), cmd[3].clone(), cmd[4].clone()) }
// persistent names
"keygen" => { if cmd.len() != 3 { return Err(DBError("AGE KEYGEN <name>".to_string())); }
Cmd::AgeKeygen(cmd[2].clone()) }
"signkeygen" => { if cmd.len() != 3 { return Err(DBError("AGE SIGNKEYGEN <name>".to_string())); }
Cmd::AgeSignKeygen(cmd[2].clone()) }
"encryptname" => { if cmd.len() != 4 { return Err(DBError("AGE ENCRYPTNAME <name> <message>".to_string())); }
Cmd::AgeEncryptName(cmd[2].clone(), cmd[3].clone()) }
"decryptname" => { if cmd.len() != 4 { return Err(DBError("AGE DECRYPTNAME <name> <ciphertext_b64>".to_string())); }
Cmd::AgeDecryptName(cmd[2].clone(), cmd[3].clone()) }
"signname" => { if cmd.len() != 4 { return Err(DBError("AGE SIGNNAME <name> <message>".to_string())); }
Cmd::AgeSignName(cmd[2].clone(), cmd[3].clone()) }
"verifyname" => { if cmd.len() != 5 { return Err(DBError("AGE VERIFYNAME <name> <message> <signature_b64>".to_string())); }
Cmd::AgeVerifyName(cmd[2].clone(), cmd[3].clone(), cmd[4].clone()) }
"list" => { if cmd.len() != 2 { return Err(DBError("AGE LIST".to_string())); }
Cmd::AgeList }
_ => return Err(DBError(format!("unsupported AGE subcommand {:?}", cmd))),
}
}
"sym" => {
if cmd.len() < 2 {
return Err(DBError("wrong number of arguments for SYM".to_string()));
}
match cmd[1].to_lowercase().as_str() {
"keygen" => { if cmd.len() != 2 { return Err(DBError("SYM KEYGEN takes no args".to_string())); }
Cmd::SymKeygen }
"encrypt" => { if cmd.len() != 4 { return Err(DBError("SYM ENCRYPT <key_b64> <message>".to_string())); }
Cmd::SymEncrypt(cmd[2].clone(), cmd[3].clone()) }
"decrypt" => { if cmd.len() != 4 { return Err(DBError("SYM DECRYPT <key_b64> <ciphertext_b64>".to_string())); }
Cmd::SymDecrypt(cmd[2].clone(), cmd[3].clone()) }
_ => return Err(DBError(format!("unsupported SYM subcommand {:?}", cmd))),
}
}
"ft.create" => {
if cmd.len() < 4 || cmd[2].to_uppercase() != "SCHEMA" {
return Err(DBError("ERR FT.CREATE requires: indexname SCHEMA field1 type1 [options] ...".to_string()));
}
let index_name = cmd[1].clone();
let mut schema = Vec::new();
let mut i = 3;
while i < cmd.len() {
if i + 1 >= cmd.len() {
return Err(DBError("ERR incomplete field definition".to_string()));
}
let field_name = cmd[i].clone();
let field_type = cmd[i + 1].to_uppercase();
let mut options = Vec::new();
i += 2;
// Parse field options until we hit another field name or end
while i < cmd.len()
&& ["WEIGHT","SORTABLE","NOINDEX","SEPARATOR","CASESENSITIVE"]
.contains(&cmd[i].to_uppercase().as_str())
{
options.push(cmd[i].to_uppercase());
i += 1;
// If this option takes a value, consume it too
if i > 0 && ["SEPARATOR","WEIGHT"].contains(&cmd[i - 1].to_uppercase().as_str()) && i < cmd.len() {
options.push(cmd[i].clone());
i += 1;
}
}
schema.push((field_name, field_type, options));
}
Cmd::FtCreate { index_name, schema }
}
"ft.add" => {
if cmd.len() < 5 {
return Err(DBError("ERR FT.ADD requires: index_name doc_id score field value ...".to_string()));
}
let index_name = cmd[1].clone();
let doc_id = cmd[2].clone();
let score = cmd[3].parse::<f64>().map_err(|_| DBError("ERR score must be a number".to_string()))?;
let mut fields = std::collections::HashMap::new();
let mut i = 4;
while i + 1 < cmd.len() {
fields.insert(cmd[i].clone(), cmd[i + 1].clone());
i += 2;
}
Cmd::FtAdd { index_name, doc_id, score, fields }
}
"ft.search" => {
if cmd.len() < 3 {
return Err(DBError("ERR FT.SEARCH requires: index_name query [options]".to_string()));
}
let index_name = cmd[1].clone();
let query = cmd[2].clone();
let mut filters = Vec::new();
let mut limit = None;
let mut offset = None;
let mut return_fields = None;
let mut i = 3;
while i < cmd.len() {
match cmd[i].to_uppercase().as_str() {
"FILTER" => {
if i + 2 >= cmd.len() {
return Err(DBError("ERR FILTER requires field and value".to_string()));
}
filters.push((cmd[i + 1].clone(), cmd[i + 2].clone()));
i += 3;
}
"LIMIT" => {
if i + 2 >= cmd.len() {
return Err(DBError("ERR LIMIT requires offset and num".to_string()));
}
offset = Some(cmd[i + 1].parse().unwrap_or(0));
limit = Some(cmd[i + 2].parse().unwrap_or(10));
i += 3;
}
"RETURN" => {
if i + 1 >= cmd.len() {
return Err(DBError("ERR RETURN requires field count".to_string()));
}
let count: usize = cmd[i + 1].parse().unwrap_or(0);
i += 2;
let mut fields = Vec::new();
for _ in 0..count {
if i < cmd.len() {
fields.push(cmd[i].clone());
i += 1;
}
}
return_fields = Some(fields);
}
_ => i += 1,
}
}
Cmd::FtSearch { index_name, query, filters, limit, offset, return_fields }
}
"ft.del" => {
if cmd.len() != 3 {
return Err(DBError("ERR FT.DEL requires: index_name doc_id".to_string()));
}
Cmd::FtDel(cmd[1].clone(), cmd[2].clone())
}
"ft.info" => {
if cmd.len() != 2 {
return Err(DBError("ERR FT.INFO requires: index_name".to_string()));
}
Cmd::FtInfo(cmd[1].clone())
}
"ft.drop" => {
if cmd.len() != 2 {
return Err(DBError("ERR FT.DROP requires: index_name".to_string()));
}
Cmd::FtDrop(cmd[1].clone())
}
"ft.alter" => {
if cmd.len() < 5 {
return Err(DBError("ERR FT.ALTER requires: index_name field_name field_type [options]".to_string()));
}
let index_name = cmd[1].clone();
let field_name = cmd[2].clone();
let field_type = cmd[3].clone();
let options = if cmd.len() > 4 { cmd[4..].to_vec() } else { vec![] };
Cmd::FtAlter { index_name, field_name, field_type, options }
}
"ft.aggregate" => {
if cmd.len() < 3 {
return Err(DBError("ERR FT.AGGREGATE requires: index_name query [options]".to_string()));
}
let index_name = cmd[1].clone();
let query = cmd[2].clone();
// Minimal parse for now
let group_by = Vec::new();
let reducers = Vec::new();
Cmd::FtAggregate { index_name, query, group_by, reducers }
}
// ----- LANCE.* commands -----
"lance.create" => {
// LANCE.CREATE name DIM d
if cmd.len() != 4 || cmd[2].to_uppercase() != "DIM" {
return Err(DBError("ERR LANCE.CREATE requires: name DIM <dim>".to_string()));
}
let name = cmd[1].clone();
let dim: usize = cmd[3].parse().map_err(|_| DBError("ERR DIM must be an integer".to_string()))?;
Cmd::LanceCreate { name, dim }
}
"lance.store" => {
// LANCE.STORE name ID <id> TEXT <text> [META k v ...]
if cmd.len() < 6 {
return Err(DBError("ERR LANCE.STORE requires: name ID <id> TEXT <text> [META k v ...]".to_string()));
}
let name = cmd[1].clone();
let mut i = 2;
if cmd[i].to_uppercase() != "ID" || i + 1 >= cmd.len() {
return Err(DBError("ERR LANCE.STORE requires ID <id>".to_string()));
}
let id = cmd[i + 1].clone();
i += 2;
if i >= cmd.len() || cmd[i].to_uppercase() != "TEXT" {
return Err(DBError("ERR LANCE.STORE requires TEXT <text>".to_string()));
}
i += 1;
if i >= cmd.len() {
return Err(DBError("ERR LANCE.STORE requires TEXT <text>".to_string()));
}
let text = cmd[i].clone();
i += 1;
let mut meta: Vec<(String, String)> = Vec::new();
if i < cmd.len() && cmd[i].to_uppercase() == "META" {
i += 1;
while i + 1 < cmd.len() {
meta.push((cmd[i].clone(), cmd[i + 1].clone()));
i += 2;
}
}
Cmd::LanceStoreText { name, id, text, meta }
}
"lance.search" => {
// LANCE.SEARCH name K <k> QUERY <text> [FILTER expr] [RETURN n fields...]
if cmd.len() < 6 {
return Err(DBError("ERR LANCE.SEARCH requires: name K <k> QUERY <text> [FILTER expr] [RETURN n fields...]".to_string()));
}
let name = cmd[1].clone();
if cmd[2].to_uppercase() != "K" {
return Err(DBError("ERR LANCE.SEARCH requires K <k>".to_string()));
}
let k: usize = cmd[3].parse().map_err(|_| DBError("ERR K must be an integer".to_string()))?;
if cmd[4].to_uppercase() != "QUERY" {
return Err(DBError("ERR LANCE.SEARCH requires QUERY <text>".to_string()));
}
let mut i = 5;
if i >= cmd.len() {
return Err(DBError("ERR LANCE.SEARCH requires QUERY <text>".to_string()));
}
let text = cmd[i].clone();
i += 1;
let mut filter: Option<String> = None;
let mut return_fields: Option<Vec<String>> = None;
while i < cmd.len() {
match cmd[i].to_uppercase().as_str() {
"FILTER" => {
if i + 1 >= cmd.len() {
return Err(DBError("ERR FILTER requires an expression".to_string()));
}
filter = Some(cmd[i + 1].clone());
i += 2;
}
"RETURN" => {
if i + 1 >= cmd.len() {
return Err(DBError("ERR RETURN requires field count".to_string()));
}
let n: usize = cmd[i + 1].parse().map_err(|_| DBError("ERR RETURN count must be integer".to_string()))?;
i += 2;
let mut fields = Vec::new();
for _ in 0..n {
if i < cmd.len() {
fields.push(cmd[i].clone());
i += 1;
}
}
return_fields = Some(fields);
}
_ => { i += 1; }
}
}
Cmd::LanceSearchText { name, text, k, filter, return_fields }
}
"lance.createindex" => {
// LANCE.CREATEINDEX name TYPE t [PARAM k v ...]
if cmd.len() < 4 || cmd[2].to_uppercase() != "TYPE" {
return Err(DBError("ERR LANCE.CREATEINDEX requires: name TYPE <type> [PARAM k v ...]".to_string()));
}
let name = cmd[1].clone();
let index_type = cmd[3].clone();
let mut params: Vec<(String, String)> = Vec::new();
let mut i = 4;
if i < cmd.len() && cmd[i].to_uppercase() == "PARAM" {
i += 1;
while i + 1 < cmd.len() {
params.push((cmd[i].clone(), cmd[i + 1].clone()));
i += 2;
}
}
Cmd::LanceCreateIndex { name, index_type, params }
}
"lance.embedding" => {
// LANCE.EMBEDDING CONFIG SET name PROVIDER p MODEL m [PARAM k v ...]
// LANCE.EMBEDDING CONFIG GET name
if cmd.len() < 3 || cmd[1].to_uppercase() != "CONFIG" {
return Err(DBError("ERR LANCE.EMBEDDING requires CONFIG subcommand".to_string()));
}
if cmd.len() >= 4 && cmd[2].to_uppercase() == "SET" {
if cmd.len() < 8 {
return Err(DBError("ERR LANCE.EMBEDDING CONFIG SET requires: SET name PROVIDER p MODEL m [PARAM k v ...]".to_string()));
}
let name = cmd[3].clone();
let mut i = 4;
let mut provider: Option<String> = None;
let mut model: Option<String> = None;
let mut params: Vec<(String, String)> = Vec::new();
while i < cmd.len() {
match cmd[i].to_uppercase().as_str() {
"PROVIDER" => {
if i + 1 >= cmd.len() {
return Err(DBError("ERR PROVIDER requires a value".to_string()));
}
provider = Some(cmd[i + 1].clone());
i += 2;
}
"MODEL" => {
if i + 1 >= cmd.len() {
return Err(DBError("ERR MODEL requires a value".to_string()));
}
model = Some(cmd[i + 1].clone());
i += 2;
}
"PARAM" => {
i += 1;
while i + 1 < cmd.len() {
params.push((cmd[i].clone(), cmd[i + 1].clone()));
i += 2;
}
}
_ => {
// Unknown token; break to avoid infinite loop
i += 1;
}
}
}
let provider = provider.ok_or_else(|| DBError("ERR missing PROVIDER".to_string()))?;
let model = model.ok_or_else(|| DBError("ERR missing MODEL".to_string()))?;
Cmd::LanceEmbeddingConfigSet { name, provider, model, params }
} else if cmd.len() == 4 && cmd[2].to_uppercase() == "GET" {
let name = cmd[3].clone();
Cmd::LanceEmbeddingConfigGet { name }
} else {
return Err(DBError("ERR LANCE.EMBEDDING CONFIG supports: SET ... | GET name".to_string()));
}
}
"lance.list" => {
if cmd.len() != 1 {
return Err(DBError("ERR LANCE.LIST takes no arguments".to_string()));
}
Cmd::LanceList
}
"lance.info" => {
if cmd.len() != 2 {
return Err(DBError("ERR LANCE.INFO requires: name".to_string()));
}
Cmd::LanceInfo { name: cmd[1].clone() }
}
"lance.drop" => {
if cmd.len() != 2 {
return Err(DBError("ERR LANCE.DROP requires: name".to_string()));
}
Cmd::LanceDrop { name: cmd[1].clone() }
}
"lance.del" => {
if cmd.len() != 3 {
return Err(DBError("ERR LANCE.DEL requires: name id".to_string()));
}
Cmd::LanceDel { name: cmd[1].clone(), id: cmd[2].clone() }
}
_ => Cmd::Unknow(cmd[0].clone()),
},
protocol,
remaining
))
}
_ => Err(DBError(format!(
"fail to parse as cmd for {:?}",
protocol
))),
}
}
pub async fn run(self, server: &mut Server) -> Result<Protocol, DBError> {
// Handle queued commands for transactions
if server.queued_cmd.is_some()
&& !matches!(self, Cmd::Exec)
&& !matches!(self, Cmd::Multi)
&& !matches!(self, Cmd::Discard)
{
let protocol = self.clone().to_protocol();
server.queued_cmd.as_mut().unwrap().push((self, protocol));
return Ok(Protocol::SimpleString("QUEUED".to_string()));
}
// Backend gating for Tantivy-only DBs: allow only FT.* and basic control/info commands
// Determine per-selected-db backend via admin meta (not process default).
let is_tantivy_backend = crate::admin_meta::get_database_backend(
&server.option.dir,
server.option.backend.clone(),
&server.option.admin_secret,
server.selected_db,
)
.ok()
.flatten()
.map(|b| matches!(b, crate::options::BackendType::Tantivy))
.unwrap_or(false);
// Determine Lance backend similarly
let is_lance_backend = crate::admin_meta::get_database_backend(
&server.option.dir,
server.option.backend.clone(),
&server.option.admin_secret,
server.selected_db,
)
.ok()
.flatten()
.map(|b| matches!(b, crate::options::BackendType::Lance))
.unwrap_or(false);
if is_tantivy_backend {
match &self {
Cmd::Select(..)
| Cmd::Quit
| Cmd::Client(..)
| Cmd::ClientSetName(..)
| Cmd::ClientGetName
| Cmd::Command(..)
| Cmd::Info(..)
| Cmd::FtCreate { .. }
| Cmd::FtAdd { .. }
| Cmd::FtSearch { .. }
| Cmd::FtDel(..)
| Cmd::FtInfo(..)
| Cmd::FtDrop(..)
| Cmd::FtAlter { .. }
| Cmd::FtAggregate { .. } => {}
_ => {
return Ok(Protocol::err("ERR backend is Tantivy; only FT.* commands are allowed"));
}
}
}
// Lance backend gating: allow only LANCE.* and basic control/info commands
if is_lance_backend {
match &self {
Cmd::Select(..)
| Cmd::Quit
| Cmd::Client(..)
| Cmd::ClientSetName(..)
| Cmd::ClientGetName
| Cmd::Command(..)
| Cmd::Info(..)
| Cmd::LanceCreate { .. }
| Cmd::LanceStoreText { .. }
| Cmd::LanceSearchText { .. }
| Cmd::LanceEmbeddingConfigSet { .. }
| Cmd::LanceEmbeddingConfigGet { .. }
| Cmd::LanceCreateIndex { .. }
| Cmd::LanceList
| Cmd::LanceInfo { .. }
| Cmd::LanceDel { .. }
| Cmd::LanceDrop { .. } => {}
_ => {
return Ok(Protocol::err("ERR backend is Lance; only LANCE.* commands are allowed"));
}
}
}
// If selected DB is not Tantivy, forbid all FT.* commands here.
if !is_tantivy_backend {
match &self {
Cmd::FtCreate { .. }
| Cmd::FtAdd { .. }
| Cmd::FtSearch { .. }
| Cmd::FtDel(..)
| Cmd::FtInfo(..)
| Cmd::FtDrop(..)
| Cmd::FtAlter { .. }
| Cmd::FtAggregate { .. } => {
return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed"));
}
_ => {}
}
}
// If selected DB is not Lance, forbid all LANCE.* commands here.
if !is_lance_backend {
match &self {
Cmd::LanceCreate { .. }
| Cmd::LanceStoreText { .. }
| Cmd::LanceSearchText { .. }
| Cmd::LanceEmbeddingConfigSet { .. }
| Cmd::LanceEmbeddingConfigGet { .. }
| Cmd::LanceCreateIndex { .. }
| Cmd::LanceList
| Cmd::LanceInfo { .. }
| Cmd::LanceDel { .. }
| Cmd::LanceDrop { .. } => {
return Ok(Protocol::err("ERR DB backend is not Lance; LANCE.* commands are not allowed"));
}
_ => {}
}
}
match self {
Cmd::Select(db, key) => select_cmd(server, db, key).await,
Cmd::Ping => Ok(Protocol::SimpleString("PONG".to_string())),
Cmd::Echo(s) => Ok(Protocol::BulkString(s)),
Cmd::Get(k) => get_cmd(server, &k).await,
Cmd::Set(k, v) => set_cmd(server, &k, &v).await,
Cmd::SetPx(k, v, x) => set_px_cmd(server, &k, &v, &x).await,
Cmd::SetEx(k, v, x) => set_ex_cmd(server, &k, &v, &x).await,
Cmd::SetOpts(k, v, ex_ms, nx, xx, getflag) => set_with_opts_cmd(server, &k, &v, ex_ms, nx, xx, getflag).await,
Cmd::MGet(keys) => mget_cmd(server, &keys).await,
Cmd::MSet(pairs) => mset_cmd(server, &pairs).await,
Cmd::Del(k) => del_cmd(server, &k).await,
Cmd::DelMulti(keys) => del_multi_cmd(server, &keys).await,
Cmd::ConfigGet(name) => config_get_cmd(&name, server),
Cmd::Keys => keys_cmd(server).await,
Cmd::DbSize => dbsize_cmd(server).await,
Cmd::Info(section) => info_cmd(server, &section).await,
Cmd::Type(k) => type_cmd(server, &k).await,
Cmd::Incr(key) => incr_cmd(server, &key).await,
Cmd::Multi => {
server.queued_cmd = Some(Vec::<(Cmd, Protocol)>::new());
Ok(Protocol::SimpleString("OK".to_string()))
}
Cmd::Exec => exec_cmd(server).await,
Cmd::Discard => {
if server.queued_cmd.is_some() {
server.queued_cmd = None;
Ok(Protocol::SimpleString("OK".to_string()))
} else {
Ok(Protocol::err("ERR DISCARD without MULTI"))
}
}
// Hash commands
Cmd::HSet(key, pairs) => hset_cmd(server, &key, &pairs).await,
Cmd::HGet(key, field) => hget_cmd(server, &key, &field).await,
Cmd::HGetAll(key) => hgetall_cmd(server, &key).await,
Cmd::HDel(key, fields) => hdel_cmd(server, &key, &fields).await,
Cmd::HExists(key, field) => hexists_cmd(server, &key, &field).await,
Cmd::HKeys(key) => hkeys_cmd(server, &key).await,
Cmd::HVals(key) => hvals_cmd(server, &key).await,
Cmd::HLen(key) => hlen_cmd(server, &key).await,
Cmd::HMGet(key, fields) => hmget_cmd(server, &key, &fields).await,
Cmd::HSetNx(key, field, value) => hsetnx_cmd(server, &key, &field, &value).await,
Cmd::HIncrBy(key, field, delta) => hincrby_cmd(server, &key, &field, delta).await,
Cmd::HIncrByFloat(key, field, delta) => hincrbyfloat_cmd(server, &key, &field, delta).await,
Cmd::HScan(key, cursor, pattern, count) => hscan_cmd(server, &key, &cursor, pattern.as_deref(), &count).await,
Cmd::Scan(cursor, pattern, count) => scan_cmd(server, &cursor, pattern.as_deref(), &count).await,
Cmd::Ttl(key) => ttl_cmd(server, &key).await,
Cmd::Expire(key, secs) => expire_cmd(server, &key, secs).await,
Cmd::PExpire(key, ms) => pexpire_cmd(server, &key, ms).await,
Cmd::ExpireAt(key, ts_secs) => expireat_cmd(server, &key, ts_secs).await,
Cmd::PExpireAt(key, ts_ms) => pexpireat_cmd(server, &key, ts_ms).await,
Cmd::Persist(key) => persist_cmd(server, &key).await,
Cmd::Exists(key) => exists_cmd(server, &key).await,
Cmd::ExistsMulti(keys) => exists_multi_cmd(server, &keys).await,
Cmd::Quit => Ok(Protocol::SimpleString("OK".to_string())),
Cmd::Client(_) => Ok(Protocol::SimpleString("OK".to_string())),
Cmd::ClientSetName(name) => client_setname_cmd(server, &name).await,
Cmd::ClientGetName => client_getname_cmd(server).await,
Cmd::Command(args) => command_cmd(&args),
// List commands
Cmd::LPush(key, elements) => lpush_cmd(server, &key, &elements).await,
Cmd::RPush(key, elements) => rpush_cmd(server, &key, &elements).await,
Cmd::LPop(key, count) => lpop_cmd(server, &key, &count).await,
Cmd::RPop(key, count) => rpop_cmd(server, &key, &count).await,
Cmd::BLPop(keys, timeout) => blpop_cmd(server, &keys, timeout).await,
Cmd::BRPop(keys, timeout) => brpop_cmd(server, &keys, timeout).await,
Cmd::LLen(key) => llen_cmd(server, &key).await,
Cmd::LRem(key, count, element) => lrem_cmd(server, &key, count, &element).await,
Cmd::LTrim(key, start, stop) => ltrim_cmd(server, &key, start, stop).await,
Cmd::LIndex(key, index) => lindex_cmd(server, &key, index).await,
Cmd::LRange(key, start, stop) => lrange_cmd(server, &key, start, stop).await,
Cmd::FlushDb => flushdb_cmd(server).await,
// AGE (rage): stateless
Cmd::AgeGenEnc => Ok(crate::age::cmd_age_genenc().await),
Cmd::AgeGenSign => Ok(crate::age::cmd_age_gensign().await),
Cmd::AgeGenKey => Ok(crate::age::cmd_age_genkey().await),
Cmd::AgeEncrypt(recipient, message) => Ok(crate::age::cmd_age_encrypt(&recipient, &message).await),
Cmd::AgeDecrypt(identity, ct_b64) => Ok(crate::age::cmd_age_decrypt(&identity, &ct_b64).await),
Cmd::AgeSign(secret, message) => Ok(crate::age::cmd_age_sign(&secret, &message).await),
Cmd::AgeVerify(vpub, msg, sig_b64) => Ok(crate::age::cmd_age_verify(&vpub, &msg, &sig_b64).await),
// AGE (rage): persistent named keys
Cmd::AgeKeygen(name) => Ok(crate::age::cmd_age_keygen(server, &name).await),
Cmd::AgeSignKeygen(name) => Ok(crate::age::cmd_age_signkeygen(server, &name).await),
Cmd::AgeEncryptName(name, message) => Ok(crate::age::cmd_age_encrypt_name(server, &name, &message).await),
Cmd::AgeDecryptName(name, ct_b64) => Ok(crate::age::cmd_age_decrypt_name(server, &name, &ct_b64).await),
Cmd::AgeSignName(name, message) => Ok(crate::age::cmd_age_sign_name(server, &name, &message).await),
Cmd::AgeVerifyName(name, message, sig_b64) => Ok(crate::age::cmd_age_verify_name(server, &name, &message, &sig_b64).await),
Cmd::AgeList => Ok(crate::age::cmd_age_list(server).await),
// SYM (symmetric): stateless (Phase 1)
Cmd::SymKeygen => Ok(crate::sym::cmd_sym_keygen().await),
Cmd::SymEncrypt(key_b64, message) => Ok(crate::sym::cmd_sym_encrypt(&key_b64, &message).await),
Cmd::SymDecrypt(key_b64, ct_b64) => Ok(crate::sym::cmd_sym_decrypt(&key_b64, &ct_b64).await),
// Full-text search commands
Cmd::FtCreate { index_name, schema } => {
crate::search_cmd::ft_create_cmd(server, index_name, schema).await
}
Cmd::FtAdd { index_name, doc_id, score, fields } => {
crate::search_cmd::ft_add_cmd(server, index_name, doc_id, score, fields).await
}
Cmd::FtSearch { index_name, query, filters, limit, offset, return_fields } => {
crate::search_cmd::ft_search_cmd(server, index_name, query, filters, limit, offset, return_fields).await
}
Cmd::FtDel(index_name, doc_id) => {
crate::search_cmd::ft_del_cmd(server, index_name, doc_id).await
}
Cmd::FtInfo(index_name) => {
crate::search_cmd::ft_info_cmd(server, index_name).await
}
Cmd::FtDrop(index_name) => {
crate::search_cmd::ft_drop_cmd(server, index_name).await
}
Cmd::FtAlter { .. } => {
Ok(Protocol::err("FT.ALTER not implemented yet"))
}
Cmd::FtAggregate { .. } => {
Ok(Protocol::err("FT.AGGREGATE not implemented yet"))
}
// LanceDB commands
Cmd::LanceCreate { name, dim } => {
if !server.has_write_permission() {
return Ok(Protocol::err("ERR write permission denied"));
}
match server.lance_store()?.create_dataset(&name, dim).await {
Ok(()) => Ok(Protocol::SimpleString("OK".to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
Cmd::LanceEmbeddingConfigSet { name, provider, model, params } => {
if !server.has_write_permission() {
return Ok(Protocol::err("ERR write permission denied"));
}
// Map provider string to enum
let p_lc = provider.to_lowercase();
let prov = match p_lc.as_str() {
"test-hash" | "testhash" => EmbeddingProvider::TestHash,
"fastembed" | "lancefastembed" => EmbeddingProvider::LanceFastEmbed,
"openai" | "lanceopenai" => EmbeddingProvider::LanceOpenAI,
other => EmbeddingProvider::LanceOther(other.to_string()),
};
let cfg = EmbeddingConfig {
provider: prov,
model,
params: params.into_iter().collect(),
};
match server.set_dataset_embedding_config(&name, &cfg) {
Ok(()) => Ok(Protocol::SimpleString("OK".to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
Cmd::LanceEmbeddingConfigGet { name } => {
match server.get_dataset_embedding_config(&name) {
Ok(cfg) => {
let mut arr = Vec::new();
arr.push(Protocol::BulkString("provider".to_string()));
arr.push(Protocol::BulkString(match cfg.provider {
EmbeddingProvider::TestHash => "test-hash".to_string(),
EmbeddingProvider::LanceFastEmbed => "lancefastembed".to_string(),
EmbeddingProvider::LanceOpenAI => "lanceopenai".to_string(),
EmbeddingProvider::LanceOther(ref s) => s.clone(),
}));
arr.push(Protocol::BulkString("model".to_string()));
arr.push(Protocol::BulkString(cfg.model.clone()));
arr.push(Protocol::BulkString("params".to_string()));
arr.push(Protocol::BulkString(serde_json::to_string(&cfg.params).unwrap_or_else(|_| "{}".to_string())));
Ok(Protocol::Array(arr))
}
Err(e) => Ok(Protocol::err(&e.0)),
}
}
Cmd::LanceStoreText { name, id, text, meta } => {
if !server.has_write_permission() {
return Ok(Protocol::err("ERR write permission denied"));
}
// Resolve embedder and embed text
let embedder = server.get_embedder_for(&name)?;
let vector = embedder.embed(&text)?;
let meta_map: std::collections::HashMap<String, String> = meta.into_iter().collect();
match server.lance_store()?.store_vector(&name, &id, vector, meta_map, Some(text)).await {
Ok(()) => Ok(Protocol::SimpleString("OK".to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
Cmd::LanceSearchText { name, text, k, filter, return_fields } => {
// Resolve embedder and embed query text
let embedder = server.get_embedder_for(&name)?;
let qv = embedder.embed(&text)?;
match server.lance_store()?.search_vectors(&name, qv, k, filter, return_fields).await {
Ok(results) => {
// Encode as array of [id, score, [k1, v1, k2, v2, ...]]
let mut arr = Vec::new();
for (id, score, meta) in results {
let mut meta_arr: Vec<Protocol> = Vec::new();
for (k, v) in meta {
meta_arr.push(Protocol::BulkString(k));
meta_arr.push(Protocol::BulkString(v));
}
arr.push(Protocol::Array(vec![
Protocol::BulkString(id),
Protocol::BulkString(score.to_string()),
Protocol::Array(meta_arr),
]));
}
Ok(Protocol::Array(arr))
}
Err(e) => Ok(Protocol::err(&e.0)),
}
}
Cmd::LanceCreateIndex { name, index_type, params } => {
if !server.has_write_permission() {
return Ok(Protocol::err("ERR write permission denied"));
}
let params_map: std::collections::HashMap<String, String> = params.into_iter().collect();
match server.lance_store()?.create_index(&name, &index_type, params_map).await {
Ok(()) => Ok(Protocol::SimpleString("OK".to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
Cmd::LanceList => {
match server.lance_store()?.list_datasets().await {
Ok(list) => Ok(Protocol::Array(list.into_iter().map(Protocol::BulkString).collect())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
Cmd::LanceInfo { name } => {
match server.lance_store()?.get_dataset_info(&name).await {
Ok(info) => {
let mut arr = Vec::new();
for (k, v) in info {
arr.push(Protocol::BulkString(k));
arr.push(Protocol::BulkString(v));
}
Ok(Protocol::Array(arr))
}
Err(e) => Ok(Protocol::err(&e.0)),
}
}
Cmd::LanceDel { name, id } => {
if !server.has_write_permission() {
return Ok(Protocol::err("ERR write permission denied"));
}
match server.lance_store()?.delete_by_id(&name, &id).await {
Ok(b) => Ok(Protocol::SimpleString(if b { "1" } else { "0" }.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
Cmd::LanceDrop { name } => {
if !server.has_write_permission() {
return Ok(Protocol::err("ERR write permission denied"));
}
match server.lance_store()?.drop_dataset(&name).await {
Ok(_b) => Ok(Protocol::SimpleString("OK".to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
Cmd::Unknow(s) => Ok(Protocol::err(&format!("ERR unknown command `{}`", s))),
}
}
pub fn to_protocol(self) -> Protocol {
match self {
Cmd::Select(db, key) => {
let mut arr = vec![Protocol::BulkString("select".to_string()), Protocol::BulkString(db.to_string())];
if let Some(k) = key {
arr.push(Protocol::BulkString("key".to_string()));
arr.push(Protocol::BulkString(k));
}
Protocol::Array(arr)
}
Cmd::Ping => Protocol::Array(vec![Protocol::BulkString("ping".to_string())]),
Cmd::Echo(s) => Protocol::Array(vec![Protocol::BulkString("echo".to_string()), Protocol::BulkString(s)]),
Cmd::Get(k) => Protocol::Array(vec![Protocol::BulkString("get".to_string()), Protocol::BulkString(k)]),
Cmd::Set(k, v) => Protocol::Array(vec![Protocol::BulkString("set".to_string()), Protocol::BulkString(k), Protocol::BulkString(v)]),
_ => Protocol::SimpleString("...".to_string())
}
}
}
async fn flushdb_cmd(server: &mut Server) -> Result<Protocol, DBError> {
match server.current_storage()?.flushdb() {
Ok(_) => Ok(Protocol::SimpleString("OK".to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn select_cmd(server: &mut Server, db: u64, key: Option<String>) -> Result<Protocol, DBError> {
// Authorization and existence checks via admin DB 0
// DB 0: require KEY admin-secret
if db == 0 {
match key {
Some(k) if k == server.option.admin_secret => {
server.selected_db = 0;
server.current_permissions = Some(crate::rpc::Permissions::ReadWrite);
// Will create encrypted 0.db if missing
match server.current_storage() {
Ok(_) => return Ok(Protocol::SimpleString("OK".to_string())),
Err(e) => return Ok(Protocol::err(&e.0)),
}
}
_ => {
return Ok(Protocol::err("ERR invalid access key"));
}
}
}
// DB > 0: must exist in admin:dbs
let exists = match crate::admin_meta::db_exists(
&server.option.dir,
server.option.backend.clone(),
&server.option.admin_secret,
db,
) {
Ok(b) => b,
Err(e) => return Ok(Protocol::err(&e.0)),
};
if !exists {
return Ok(Protocol::err(&format!(
"Cannot open database instance {}, as that database instance does not exist.",
db
)));
}
// Verify permissions (public => RW; private => use key)
let perms_opt = match crate::admin_meta::verify_access(
&server.option.dir,
server.option.backend.clone(),
&server.option.admin_secret,
db,
key.as_deref(),
) {
Ok(p) => p,
Err(e) => return Ok(Protocol::err(&e.0)),
};
let perms = match perms_opt {
Some(p) => p,
None => return Ok(Protocol::err("ERR invalid access key")),
};
// Set selected database and permissions, then open storage (skip for Tantivy backend)
server.selected_db = db;
server.current_permissions = Some(perms);
// Resolve effective backend for this db_id from admin meta
let eff_backend = crate::admin_meta::get_database_backend(
&server.option.dir,
server.option.backend.clone(),
&server.option.admin_secret,
db,
)
.ok()
.flatten();
if matches!(eff_backend, Some(crate::options::BackendType::Tantivy) | Some(crate::options::BackendType::Lance)) {
// Search-only DBs (Tantivy/Lance) have no KV storage; allow SELECT to succeed
Ok(Protocol::SimpleString("OK".to_string()))
} else {
match server.current_storage() {
Ok(_) => Ok(Protocol::SimpleString("OK".to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
}
async fn lindex_cmd(server: &Server, key: &str, index: i64) -> Result<Protocol, DBError> {
match server.current_storage()?.lindex(key, index) {
Ok(Some(element)) => Ok(Protocol::BulkString(element)),
Ok(None) => Ok(Protocol::Null),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn lrange_cmd(server: &Server, key: &str, start: i64, stop: i64) -> Result<Protocol, DBError> {
match server.current_storage()?.lrange(key, start, stop) {
Ok(elements) => Ok(Protocol::Array(elements.into_iter().map(Protocol::BulkString).collect())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn ltrim_cmd(server: &Server, key: &str, start: i64, stop: i64) -> Result<Protocol, DBError> {
match server.current_storage()?.ltrim(key, start, stop) {
Ok(_) => Ok(Protocol::SimpleString("OK".to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn lrem_cmd(server: &Server, key: &str, count: i64, element: &str) -> Result<Protocol, DBError> {
match server.current_storage()?.lrem(key, count, element) {
Ok(removed_count) => Ok(Protocol::SimpleString(removed_count.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn llen_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
match server.current_storage()?.llen(key) {
Ok(len) => Ok(Protocol::SimpleString(len.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn lpop_cmd(server: &Server, key: &str, count: &Option<u64>) -> Result<Protocol, DBError> {
let count_val = count.unwrap_or(1);
match server.current_storage()?.lpop(key, count_val) {
Ok(elements) => {
if elements.is_empty() {
if count.is_some() {
Ok(Protocol::Array(vec![]))
} else {
Ok(Protocol::Null)
}
} else if count.is_some() {
Ok(Protocol::Array(elements.into_iter().map(Protocol::BulkString).collect()))
} else {
Ok(Protocol::BulkString(elements[0].clone()))
}
},
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn rpop_cmd(server: &Server, key: &str, count: &Option<u64>) -> Result<Protocol, DBError> {
let count_val = count.unwrap_or(1);
match server.current_storage()?.rpop(key, count_val) {
Ok(elements) => {
if elements.is_empty() {
if count.is_some() {
Ok(Protocol::Array(vec![]))
} else {
Ok(Protocol::Null)
}
} else if count.is_some() {
Ok(Protocol::Array(elements.into_iter().map(Protocol::BulkString).collect()))
} else {
Ok(Protocol::BulkString(elements[0].clone()))
}
},
Err(e) => Ok(Protocol::err(&e.0)),
}
}
// BLPOP implementation
async fn blpop_cmd(server: &Server, keys: &[String], timeout_secs: f64) -> Result<Protocol, DBError> {
// Immediate, non-blocking attempt in key order
for k in keys {
let elems = server.current_storage()?.lpop(k, 1)?;
if !elems.is_empty() {
return Ok(Protocol::Array(vec![
Protocol::BulkString(k.clone()),
Protocol::BulkString(elems[0].clone()),
]));
}
}
// If timeout is zero, return immediately with Null
if timeout_secs <= 0.0 {
return Ok(Protocol::Null);
}
// Register waiters for each key
let db_index = server.selected_db;
let mut ids: Vec<u64> = Vec::with_capacity(keys.len());
let mut names: Vec<String> = Vec::with_capacity(keys.len());
let mut rxs: Vec<tokio::sync::oneshot::Receiver<(String, String)>> = Vec::with_capacity(keys.len());
for k in keys {
let (id, rx) = server.register_waiter(db_index, k, crate::server::PopSide::Left).await;
ids.push(id);
names.push(k.clone());
rxs.push(rx);
}
// Wait for the first delivery or timeout
let wait_fut = async move {
let mut futures_vec = rxs;
loop {
if futures_vec.is_empty() {
return None;
}
let (res, idx, remaining) = select_all(futures_vec).await;
match res {
Ok((k, elem)) => {
return Some((k, elem, idx, remaining));
}
Err(_canceled) => {
// That waiter was canceled; continue with the rest
futures_vec = remaining;
continue;
}
}
}
};
match timeout(Duration::from_secs_f64(timeout_secs), wait_fut).await {
Ok(Some((k, elem, idx, _remaining))) => {
// Unregister other waiters
for (i, key_name) in names.iter().enumerate() {
if i != idx {
server.unregister_waiter(db_index, key_name, ids[i]).await;
}
}
Ok(Protocol::Array(vec![
Protocol::BulkString(k),
Protocol::BulkString(elem),
]))
}
Ok(None) => {
// No futures left; unregister all waiters
for (i, key_name) in names.iter().enumerate() {
server.unregister_waiter(db_index, key_name, ids[i]).await;
}
Ok(Protocol::Null)
}
Err(_elapsed) => {
// Timeout: unregister all waiters
for (i, key_name) in names.iter().enumerate() {
server.unregister_waiter(db_index, key_name, ids[i]).await;
}
Ok(Protocol::Null)
}
}
}
// BRPOP implementation (mirror of BLPOP, popping from the right)
async fn brpop_cmd(server: &Server, keys: &[String], timeout_secs: f64) -> Result<Protocol, DBError> {
// Immediate, non-blocking attempt in key order using RPOP
for k in keys {
let elems = server.current_storage()?.rpop(k, 1)?;
if !elems.is_empty() {
return Ok(Protocol::Array(vec![
Protocol::BulkString(k.clone()),
Protocol::BulkString(elems[0].clone()),
]));
}
}
// If timeout is zero, return immediately with Null
if timeout_secs <= 0.0 {
return Ok(Protocol::Null);
}
// Register waiters for each key (Right side)
let db_index = server.selected_db;
let mut ids: Vec<u64> = Vec::with_capacity(keys.len());
let mut names: Vec<String> = Vec::with_capacity(keys.len());
let mut rxs: Vec<tokio::sync::oneshot::Receiver<(String, String)>> = Vec::with_capacity(keys.len());
for k in keys {
let (id, rx) = server.register_waiter(db_index, k, crate::server::PopSide::Right).await;
ids.push(id);
names.push(k.clone());
rxs.push(rx);
}
// Wait for the first delivery or timeout
let wait_fut = async move {
let mut futures_vec = rxs;
loop {
if futures_vec.is_empty() {
return None;
}
let (res, idx, remaining) = select_all(futures_vec).await;
match res {
Ok((k, elem)) => {
return Some((k, elem, idx, remaining));
}
Err(_canceled) => {
// That waiter was canceled; continue with the rest
futures_vec = remaining;
continue;
}
}
}
};
match timeout(Duration::from_secs_f64(timeout_secs), wait_fut).await {
Ok(Some((k, elem, idx, _remaining))) => {
// Unregister other waiters
for (i, key_name) in names.iter().enumerate() {
if i != idx {
server.unregister_waiter(db_index, key_name, ids[i]).await;
}
}
Ok(Protocol::Array(vec![
Protocol::BulkString(k),
Protocol::BulkString(elem),
]))
}
Ok(None) => {
// No futures left; unregister all waiters
for (i, key_name) in names.iter().enumerate() {
server.unregister_waiter(db_index, key_name, ids[i]).await;
}
Ok(Protocol::Null)
}
Err(_elapsed) => {
// Timeout: unregister all waiters
for (i, key_name) in names.iter().enumerate() {
server.unregister_waiter(db_index, key_name, ids[i]).await;
}
Ok(Protocol::Null)
}
}
}
async fn lpush_cmd(server: &Server, key: &str, elements: &[String]) -> Result<Protocol, DBError> {
if !server.has_write_permission() {
return Ok(Protocol::err("ERR write permission denied"));
}
match server.current_storage()?.lpush(key, elements.to_vec()) {
Ok(len) => {
// Attempt to deliver to any blocked BLPOP waiters
let _ = server.drain_waiters_after_push(key).await;
Ok(Protocol::SimpleString(len.to_string()))
}
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn rpush_cmd(server: &Server, key: &str, elements: &[String]) -> Result<Protocol, DBError> {
match server.current_storage()?.rpush(key, elements.to_vec()) {
Ok(len) => {
// Attempt to deliver to any blocked BLPOP waiters
let _ = server.drain_waiters_after_push(key).await;
Ok(Protocol::SimpleString(len.to_string()))
}
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn exec_cmd(server: &mut Server) -> Result<Protocol, DBError> {
// Move the queued commands out of `server` so we drop the borrow immediately.
let cmds = if let Some(cmds) = server.queued_cmd.take() {
cmds
} else {
return Ok(Protocol::err("ERR EXEC without MULTI"));
};
let mut out = Vec::new();
for (cmd, _) in cmds {
// Use Box::pin to handle recursion in async function
let res = Box::pin(cmd.run(server)).await?;
out.push(res);
}
Ok(Protocol::Array(out))
}
async fn incr_cmd(server: &Server, key: &String) -> Result<Protocol, DBError> {
let storage = server.current_storage()?;
let current_value = storage.get(key)?;
let new_value = match current_value {
Some(v) => {
match v.parse::<i64>() {
Ok(num) => num + 1,
Err(_) => return Ok(Protocol::err("ERR value is not an integer or out of range")),
}
}
None => 1,
};
storage.set(key.clone(), new_value.to_string())?;
Ok(Protocol::SimpleString(new_value.to_string()))
}
fn config_get_cmd(name: &String, server: &Server) -> Result<Protocol, DBError> {
let value = match name.as_str() {
"dir" => Some(server.option.dir.display().to_string()),
"dbfilename" => Some(format!("{}.db", server.selected_db)),
"databases" => Some("16".to_string()), // Hardcoded as per original logic
_ => None,
};
if let Some(val) = value {
Ok(Protocol::Array(vec![
Protocol::BulkString(name.clone()),
Protocol::BulkString(val),
]))
} else {
// Return an empty array for unknown config options, which is standard Redis behavior
Ok(Protocol::Array(vec![]))
}
}
async fn keys_cmd(server: &Server) -> Result<Protocol, DBError> {
let keys = server.current_storage()?.keys("*")?;
Ok(Protocol::Array(
keys.into_iter().map(Protocol::BulkString).collect(),
))
}
async fn dbsize_cmd(server: &Server) -> Result<Protocol, DBError> {
match server.current_storage()?.dbsize() {
Ok(n) => Ok(Protocol::SimpleString(n.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn info_cmd(server: &Server, section: &Option<String>) -> Result<Protocol, DBError> {
// For Tantivy or Lance backend, there is no KV storage; synthesize minimal info.
// Determine effective backend for the currently selected db.
let is_search_only_db = crate::admin_meta::get_database_backend(
&server.option.dir,
server.option.backend.clone(),
&server.option.admin_secret,
server.selected_db,
)
.ok()
.flatten()
.map(|b| matches!(b, crate::options::BackendType::Tantivy | crate::options::BackendType::Lance))
.unwrap_or(false);
let storage_info: Vec<(String, String)> = if is_search_only_db {
vec![
("db_size".to_string(), "0".to_string()),
("is_encrypted".to_string(), "false".to_string()),
]
} else {
server.current_storage()?.info()?
};
let mut info_map: std::collections::HashMap<String, String> = storage_info.into_iter().collect();
info_map.insert("redis_version".to_string(), "7.0.0".to_string());
info_map.insert("selected_db".to_string(), server.selected_db.to_string());
info_map.insert("backend".to_string(), format!("{:?}", server.option.backend));
let mut info_string = String::new();
info_string.push_str("# Server\n");
info_string.push_str(&format!("redis_version:{}\n", info_map.get("redis_version").unwrap()));
info_string.push_str(&format!("backend:{}\n", info_map.get("backend").unwrap()));
info_string.push_str(&format!("encrypted:{}\n", info_map.get("is_encrypted").unwrap()));
info_string.push_str("# Keyspace\n");
info_string.push_str(&format!("db{}:keys={},expires=0,avg_ttl=0\n", info_map.get("selected_db").unwrap(), info_map.get("db_size").unwrap()));
match section {
Some(s) => {
let sl = s.to_lowercase();
if sl == "replication" {
Ok(Protocol::BulkString(
"role:master\nmaster_replid:8371b4fb1155b71f4a04d3e1bc3e18c4a990aeea\nmaster_repl_offset:0\n".to_string()
))
} else {
// Return general info for unknown sections (e.g., SERVER)
Ok(Protocol::BulkString(info_string))
}
}
None => Ok(Protocol::BulkString(info_string)),
}
}
async fn type_cmd(server: &Server, k: &String) -> Result<Protocol, DBError> {
match server.current_storage()?.get_key_type(k)? {
Some(type_str) => Ok(Protocol::SimpleString(type_str)),
None => Ok(Protocol::SimpleString("none".to_string())),
}
}
async fn del_cmd(server: &Server, k: &str) -> Result<Protocol, DBError> {
if !server.has_write_permission() {
return Ok(Protocol::err("ERR write permission denied"));
}
let storage = server.current_storage()?;
if storage.exists(k)? {
storage.del(k.to_string())?;
Ok(Protocol::SimpleString("1".to_string()))
} else {
Ok(Protocol::SimpleString("0".to_string()))
}
}
async fn set_ex_cmd(
server: &Server,
k: &str,
v: &str,
x: &u128,
) -> Result<Protocol, DBError> {
server.current_storage()?.setx(k.to_string(), v.to_string(), *x * 1000)?;
Ok(Protocol::SimpleString("OK".to_string()))
}
async fn set_px_cmd(
server: &Server,
k: &str,
v: &str,
x: &u128,
) -> Result<Protocol, DBError> {
server.current_storage()?.setx(k.to_string(), v.to_string(), *x)?;
Ok(Protocol::SimpleString("OK".to_string()))
}
async fn set_cmd(server: &Server, k: &str, v: &str) -> Result<Protocol, DBError> {
if !server.has_write_permission() {
return Ok(Protocol::err("ERR write permission denied"));
}
server.current_storage()?.set(k.to_string(), v.to_string())?;
Ok(Protocol::SimpleString("OK".to_string()))
}
// Advanced SET with options: EX/PX/NX/XX/GET
async fn set_with_opts_cmd(
server: &Server,
key: &str,
value: &str,
ex_ms: Option<u128>,
nx: bool,
xx: bool,
get_old: bool,
) -> Result<Protocol, DBError> {
let storage = server.current_storage()?;
// Determine existence (for NX/XX)
let exists = storage.exists(key)?;
// If both NX and XX, condition can never be satisfied -> no-op
let mut should_set = true;
if nx && exists {
should_set = false;
}
if xx && !exists {
should_set = false;
}
// Fetch old value if needed for GET
let old_val = if get_old {
storage.get(key)?
} else {
None
};
if should_set {
if let Some(ms) = ex_ms {
storage.setx(key.to_string(), value.to_string(), ms)?;
} else {
storage.set(key.to_string(), value.to_string())?;
}
}
if get_old {
// Return previous value (or Null), regardless of NX/XX outcome only if set executed?
// We follow Redis semantics: return old value if set executed, else Null
if should_set {
Ok(old_val.map_or(Protocol::Null, Protocol::BulkString))
} else {
Ok(Protocol::Null)
}
} else {
if should_set {
Ok(Protocol::SimpleString("OK".to_string()))
} else {
Ok(Protocol::Null)
}
}
}
// MGET: return array of bulk strings or Null for missing
async fn mget_cmd(server: &Server, keys: &[String]) -> Result<Protocol, DBError> {
let mut out: Vec<Protocol> = Vec::with_capacity(keys.len());
let storage = server.current_storage()?;
for k in keys {
match storage.get(k)? {
Some(v) => out.push(Protocol::BulkString(v)),
None => out.push(Protocol::Null),
}
}
Ok(Protocol::Array(out))
}
// MSET: set multiple key/value pairs, return OK
async fn mset_cmd(server: &Server, pairs: &[(String, String)]) -> Result<Protocol, DBError> {
let storage = server.current_storage()?;
for (k, v) in pairs {
storage.set(k.clone(), v.clone())?;
}
Ok(Protocol::SimpleString("OK".to_string()))
}
// DEL with multiple keys: return count of keys actually deleted
async fn del_multi_cmd(server: &Server, keys: &[String]) -> Result<Protocol, DBError> {
if !server.has_write_permission() {
return Ok(Protocol::err("ERR write permission denied"));
}
let storage = server.current_storage()?;
let mut deleted = 0i64;
for k in keys {
if storage.exists(k)? {
storage.del(k.clone())?;
deleted += 1;
}
}
Ok(Protocol::SimpleString(deleted.to_string()))
}
// EXISTS with multiple keys: return count existing
async fn exists_multi_cmd(server: &Server, keys: &[String]) -> Result<Protocol, DBError> {
let storage = server.current_storage()?;
let mut count = 0i64;
for k in keys {
if storage.exists(k)? {
count += 1;
}
}
Ok(Protocol::SimpleString(count.to_string()))
}
async fn get_cmd(server: &Server, k: &str) -> Result<Protocol, DBError> {
let v = server.current_storage()?.get(k)?;
Ok(v.map_or(Protocol::Null, Protocol::BulkString))
}
// Hash command implementations
async fn hset_cmd(server: &Server, key: &str, pairs: &[(String, String)]) -> Result<Protocol, DBError> {
if !server.has_write_permission() {
return Ok(Protocol::err("ERR write permission denied"));
}
let new_fields = server.current_storage()?.hset(key, pairs.to_vec())?;
Ok(Protocol::SimpleString(new_fields.to_string()))
}
async fn hget_cmd(server: &Server, key: &str, field: &str) -> Result<Protocol, DBError> {
match server.current_storage()?.hget(key, field) {
Ok(Some(value)) => Ok(Protocol::BulkString(value)),
Ok(None) => Ok(Protocol::Null),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn hgetall_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
match server.current_storage()?.hgetall(key) {
Ok(pairs) => {
let mut result = Vec::new();
for (field, value) in pairs {
result.push(Protocol::BulkString(field));
result.push(Protocol::BulkString(value));
}
Ok(Protocol::Array(result))
}
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn hdel_cmd(server: &Server, key: &str, fields: &[String]) -> Result<Protocol, DBError> {
match server.current_storage()?.hdel(key, fields.to_vec()) {
Ok(deleted) => Ok(Protocol::SimpleString(deleted.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn hexists_cmd(server: &Server, key: &str, field: &str) -> Result<Protocol, DBError> {
match server.current_storage()?.hexists(key, field) {
Ok(exists) => Ok(Protocol::SimpleString(if exists { "1" } else { "0" }.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn hkeys_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
match server.current_storage()?.hkeys(key) {
Ok(keys) => Ok(Protocol::Array(
keys.into_iter().map(Protocol::BulkString).collect(),
)),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn hvals_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
match server.current_storage()?.hvals(key) {
Ok(values) => Ok(Protocol::Array(
values.into_iter().map(Protocol::BulkString).collect(),
)),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn hlen_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
match server.current_storage()?.hlen(key) {
Ok(len) => Ok(Protocol::SimpleString(len.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn hmget_cmd(server: &Server, key: &str, fields: &[String]) -> Result<Protocol, DBError> {
match server.current_storage()?.hmget(key, fields.to_vec()) {
Ok(values) => {
let result: Vec<Protocol> = values
.into_iter()
.map(|v| v.map_or(Protocol::Null, Protocol::BulkString))
.collect();
Ok(Protocol::Array(result))
}
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn hsetnx_cmd(server: &Server, key: &str, field: &str, value: &str) -> Result<Protocol, DBError> {
match server.current_storage()?.hsetnx(key, field, value) {
Ok(was_set) => Ok(Protocol::SimpleString(if was_set { "1" } else { "0" }.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn hincrby_cmd(server: &Server, key: &str, field: &str, delta: i64) -> Result<Protocol, DBError> {
let storage = server.current_storage()?;
let current = storage.hget(key, field)?;
let base: i64 = match current {
Some(v) => v.parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?,
None => 0,
};
let new_val = base.checked_add(delta).ok_or_else(|| DBError("ERR increment or decrement would overflow".to_string()))?;
// Update the field
storage.hset(key, vec![(field.to_string(), new_val.to_string())])?;
Ok(Protocol::SimpleString(new_val.to_string()))
}
async fn hincrbyfloat_cmd(server: &Server, key: &str, field: &str, delta: f64) -> Result<Protocol, DBError> {
let storage = server.current_storage()?;
let current = storage.hget(key, field)?;
let base: f64 = match current {
Some(v) => v.parse::<f64>().map_err(|_| DBError("ERR value is not a valid float".to_string()))?,
None => 0.0,
};
let new_val = base + delta;
// Update the field
storage.hset(key, vec![(field.to_string(), new_val.to_string())])?;
Ok(Protocol::SimpleString(new_val.to_string()))
}
async fn scan_cmd(
server: &Server,
cursor: &u64,
pattern: Option<&str>,
count: &Option<u64>
) -> Result<Protocol, DBError> {
match server.current_storage()?.scan(*cursor, pattern, *count) {
Ok((next_cursor, key_value_pairs)) => {
let mut result = Vec::new();
result.push(Protocol::BulkString(next_cursor.to_string()));
// For SCAN, we only return the keys, not the values
let keys: Vec<Protocol> = key_value_pairs.into_iter().map(|(key, _)| Protocol::BulkString(key)).collect();
result.push(Protocol::Array(keys));
Ok(Protocol::Array(result))
}
Err(e) => Ok(Protocol::err(&format!("ERR {}", e.0))),
}
}
async fn hscan_cmd(
server: &Server,
key: &str,
cursor: &u64,
pattern: Option<&str>,
count: &Option<u64>
) -> Result<Protocol, DBError> {
match server.current_storage()?.hscan(key, *cursor, pattern, *count) {
Ok((next_cursor, field_value_pairs)) => {
let mut result = Vec::new();
result.push(Protocol::BulkString(next_cursor.to_string()));
// For HSCAN, we return field-value pairs flattened
let mut fields_and_values = Vec::new();
for (field, value) in field_value_pairs {
fields_and_values.push(Protocol::BulkString(field));
fields_and_values.push(Protocol::BulkString(value));
}
result.push(Protocol::Array(fields_and_values));
Ok(Protocol::Array(result))
}
Err(e) => Ok(Protocol::err(&format!("ERR {}", e.0))),
}
}
async fn ttl_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
match server.current_storage()?.ttl(key) {
Ok(ttl) => Ok(Protocol::SimpleString(ttl.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn exists_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
match server.current_storage()?.exists(key) {
Ok(exists) => Ok(Protocol::SimpleString(if exists { "1" } else { "0" }.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
// EXPIRE key seconds -> 1 if timeout set, 0 otherwise
async fn expire_cmd(server: &Server, key: &str, secs: i64) -> Result<Protocol, DBError> {
if secs < 0 {
return Ok(Protocol::SimpleString("0".to_string()));
}
match server.current_storage()?.expire_seconds(key, secs as u64) {
Ok(applied) => Ok(Protocol::SimpleString(if applied { "1" } else { "0" }.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
// PEXPIRE key milliseconds -> 1 if timeout set, 0 otherwise
async fn pexpire_cmd(server: &Server, key: &str, ms: i64) -> Result<Protocol, DBError> {
if ms < 0 {
return Ok(Protocol::SimpleString("0".to_string()));
}
match server.current_storage()?.pexpire_millis(key, ms as u128) {
Ok(applied) => Ok(Protocol::SimpleString(if applied { "1" } else { "0" }.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
// PERSIST key -> 1 if timeout removed, 0 otherwise
async fn persist_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
match server.current_storage()?.persist(key) {
Ok(removed) => Ok(Protocol::SimpleString(if removed { "1" } else { "0" }.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
// EXPIREAT key timestamp-seconds -> 1 if timeout set, 0 otherwise
async fn expireat_cmd(server: &Server, key: &str, ts_secs: i64) -> Result<Protocol, DBError> {
match server.current_storage()?.expire_at_seconds(key, ts_secs) {
Ok(applied) => Ok(Protocol::SimpleString(if applied { "1" } else { "0" }.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
// PEXPIREAT key timestamp-milliseconds -> 1 if timeout set, 0 otherwise
async fn pexpireat_cmd(server: &Server, key: &str, ts_ms: i64) -> Result<Protocol, DBError> {
match server.current_storage()?.pexpire_at_millis(key, ts_ms) {
Ok(applied) => Ok(Protocol::SimpleString(if applied { "1" } else { "0" }.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn client_setname_cmd(server: &mut Server, name: &str) -> Result<Protocol, DBError> {
server.client_name = Some(name.to_string());
Ok(Protocol::SimpleString("OK".to_string()))
}
async fn client_getname_cmd(server: &Server) -> Result<Protocol, DBError> {
match &server.client_name {
Some(name) => Ok(Protocol::BulkString(name.clone())),
None => Ok(Protocol::Null),
}
}
// Minimal COMMAND subcommands stub to satisfy redis-cli probes.
// - COMMAND DOCS ... => return empty array
// - COMMAND INFO ... => return empty array
// - Any other => empty array
fn command_cmd(args: &[String]) -> Result<Protocol, DBError> {
if args.is_empty() {
return Ok(Protocol::Array(vec![]));
}
let sub = args[0].to_lowercase();
match sub.as_str() {
"docs" => Ok(Protocol::Array(vec![])),
"info" => Ok(Protocol::Array(vec![])),
_ => Ok(Protocol::Array(vec![])),
}
}