fixed key-based access control for Tantivy backends
This commit is contained in:
@@ -432,20 +432,26 @@ pub fn verify_access(
|
|||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Public?
|
let is_public = load_public(&admin, id)?;
|
||||||
if load_public(&admin, id)? {
|
|
||||||
return Ok(Some(Permissions::ReadWrite));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Private: require key and verify
|
// If a key is explicitly provided, enforce its validity strictly.
|
||||||
|
// Do NOT fall back to public when an invalid key is supplied.
|
||||||
if let Some(k) = key_opt {
|
if let Some(k) = key_opt {
|
||||||
let hash = crate::rpc::hash_key(k);
|
let hash = crate::rpc::hash_key(k);
|
||||||
if let Some(v) = admin.hget(&k_meta_db_keys(id), &hash)? {
|
if let Some(v) = admin.hget(&k_meta_db_keys(id), &hash)? {
|
||||||
let (perm, _ts) = parse_perm_value(&v);
|
let (perm, _ts) = parse_perm_value(&v);
|
||||||
return Ok(Some(perm));
|
return Ok(Some(perm));
|
||||||
}
|
}
|
||||||
|
// Invalid key
|
||||||
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// No key provided: allow access if DB is public, otherwise deny
|
||||||
|
if is_public {
|
||||||
|
Ok(Some(Permissions::ReadWrite))
|
||||||
|
} else {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enumerate all db ids
|
// Enumerate all db ids
|
||||||
|
52
src/rpc.rs
52
src/rpc.rs
@@ -505,10 +505,15 @@ impl RpcServer for RpcServerImpl {
|
|||||||
if !matches!(server.option.backend, crate::options::BackendType::Tantivy) {
|
if !matches!(server.option.backend, crate::options::BackendType::Tantivy) {
|
||||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Tantivy", None::<()>));
|
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Tantivy", None::<()>));
|
||||||
}
|
}
|
||||||
crate::search_cmd::ft_create_cmd(&*server, index_name, schema)
|
let proto = crate::search_cmd::ft_create_cmd(&*server, index_name, schema)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?;
|
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?;
|
||||||
Ok(true)
|
match proto {
|
||||||
|
crate::protocol::Protocol::Error(msg) => {
|
||||||
|
Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, msg, None::<()>))
|
||||||
|
}
|
||||||
|
_ => Ok(true),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn ft_add(
|
async fn ft_add(
|
||||||
@@ -526,10 +531,15 @@ impl RpcServer for RpcServerImpl {
|
|||||||
if !matches!(server.option.backend, crate::options::BackendType::Tantivy) {
|
if !matches!(server.option.backend, crate::options::BackendType::Tantivy) {
|
||||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Tantivy", None::<()>));
|
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Tantivy", None::<()>));
|
||||||
}
|
}
|
||||||
crate::search_cmd::ft_add_cmd(&*server, index_name, doc_id, score, fields)
|
let proto = crate::search_cmd::ft_add_cmd(&*server, index_name, doc_id, score, fields)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?;
|
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?;
|
||||||
Ok(true)
|
match proto {
|
||||||
|
crate::protocol::Protocol::Error(msg) => {
|
||||||
|
Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, msg, None::<()>))
|
||||||
|
}
|
||||||
|
_ => Ok(true),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn ft_search(
|
async fn ft_search(
|
||||||
@@ -560,7 +570,12 @@ impl RpcServer for RpcServerImpl {
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?;
|
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?;
|
||||||
Ok(serde_json::json!({ "resp": proto.encode() }))
|
match proto {
|
||||||
|
crate::protocol::Protocol::Error(msg) => {
|
||||||
|
Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, msg, None::<()>))
|
||||||
|
}
|
||||||
|
_ => Ok(serde_json::json!({ "resp": proto.encode() })),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn ft_del(&self, db_id: u64, index_name: String, doc_id: String) -> RpcResult<bool> {
|
async fn ft_del(&self, db_id: u64, index_name: String, doc_id: String) -> RpcResult<bool> {
|
||||||
@@ -571,10 +586,16 @@ impl RpcServer for RpcServerImpl {
|
|||||||
if !matches!(server.option.backend, crate::options::BackendType::Tantivy) {
|
if !matches!(server.option.backend, crate::options::BackendType::Tantivy) {
|
||||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Tantivy", None::<()>));
|
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Tantivy", None::<()>));
|
||||||
}
|
}
|
||||||
crate::search_cmd::ft_del_cmd(&*server, index_name, doc_id)
|
let proto = crate::search_cmd::ft_del_cmd(&*server, index_name, doc_id)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?;
|
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?;
|
||||||
Ok(true)
|
match proto {
|
||||||
|
crate::protocol::Protocol::Error(msg) => {
|
||||||
|
Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, msg, None::<()>))
|
||||||
|
}
|
||||||
|
crate::protocol::Protocol::SimpleString(s) => Ok(s == "1"),
|
||||||
|
_ => Ok(false),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn ft_info(&self, db_id: u64, index_name: String) -> RpcResult<serde_json::Value> {
|
async fn ft_info(&self, db_id: u64, index_name: String) -> RpcResult<serde_json::Value> {
|
||||||
@@ -588,7 +609,12 @@ impl RpcServer for RpcServerImpl {
|
|||||||
let proto = crate::search_cmd::ft_info_cmd(&*server, index_name)
|
let proto = crate::search_cmd::ft_info_cmd(&*server, index_name)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?;
|
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?;
|
||||||
Ok(serde_json::json!({ "resp": proto.encode() }))
|
match proto {
|
||||||
|
crate::protocol::Protocol::Error(msg) => {
|
||||||
|
Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, msg, None::<()>))
|
||||||
|
}
|
||||||
|
_ => Ok(serde_json::json!({ "resp": proto.encode() })),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn ft_drop(&self, db_id: u64, index_name: String) -> RpcResult<bool> {
|
async fn ft_drop(&self, db_id: u64, index_name: String) -> RpcResult<bool> {
|
||||||
@@ -599,10 +625,16 @@ impl RpcServer for RpcServerImpl {
|
|||||||
if !matches!(server.option.backend, crate::options::BackendType::Tantivy) {
|
if !matches!(server.option.backend, crate::options::BackendType::Tantivy) {
|
||||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Tantivy", None::<()>));
|
return Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, "DB backend is not Tantivy", None::<()>));
|
||||||
}
|
}
|
||||||
crate::search_cmd::ft_drop_cmd(&*server, index_name)
|
let proto = crate::search_cmd::ft_drop_cmd(&*server, index_name)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?;
|
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(-32000, e.0, None::<()>))?;
|
||||||
Ok(true)
|
match proto {
|
||||||
|
crate::protocol::Protocol::Error(msg) => {
|
||||||
|
Err(jsonrpsee::types::ErrorObjectOwned::owned(-32000, msg, None::<()>))
|
||||||
|
}
|
||||||
|
crate::protocol::Protocol::SimpleString(s) => Ok(s.eq_ignore_ascii_case("OK")),
|
||||||
|
_ => Ok(false),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn add_access_key(&self, db_id: u64, key: String, permissions: String) -> RpcResult<bool> {
|
async fn add_access_key(&self, db_id: u64, key: String, permissions: String) -> RpcResult<bool> {
|
||||||
|
@@ -162,8 +162,8 @@ pub async fn ft_add_cmd(
|
|||||||
if !is_tantivy {
|
if !is_tantivy {
|
||||||
return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed"));
|
return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed"));
|
||||||
}
|
}
|
||||||
if !server.has_read_permission() {
|
if !server.has_write_permission() {
|
||||||
return Ok(Protocol::err("ERR read permission denied"));
|
return Ok(Protocol::err("ERR write permission denied"));
|
||||||
}
|
}
|
||||||
let indexes = server.search_indexes.read().unwrap();
|
let indexes = server.search_indexes.read().unwrap();
|
||||||
let search_index = indexes
|
let search_index = indexes
|
||||||
@@ -199,8 +199,8 @@ pub async fn ft_search_cmd(
|
|||||||
if !is_tantivy {
|
if !is_tantivy {
|
||||||
return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed"));
|
return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed"));
|
||||||
}
|
}
|
||||||
if !server.has_write_permission() {
|
if !server.has_read_permission() {
|
||||||
return Ok(Protocol::err("ERR write permission denied"));
|
return Ok(Protocol::err("ERR read permission denied"));
|
||||||
}
|
}
|
||||||
let indexes = server.search_indexes.read().unwrap();
|
let indexes = server.search_indexes.read().unwrap();
|
||||||
let search_index = indexes
|
let search_index = indexes
|
||||||
@@ -226,27 +226,26 @@ pub async fn ft_search_cmd(
|
|||||||
|
|
||||||
let results = search_index.search_with_options(&query, options)?;
|
let results = search_index.search_with_options(&query, options)?;
|
||||||
|
|
||||||
// Format results as Redis protocol
|
// Format results as a flattened Redis protocol array to match client expectations:
|
||||||
|
// [ total, doc_id, score, field, value, field, value, ... , doc_id, score, ... ]
|
||||||
let mut response = Vec::new();
|
let mut response = Vec::new();
|
||||||
// First element is the total count
|
// First element is the total count
|
||||||
response.push(Protocol::SimpleString(results.total.to_string()));
|
response.push(Protocol::BulkString(results.total.to_string()));
|
||||||
// Then each document
|
// Then each document flattened
|
||||||
for doc in results.documents {
|
for mut doc in results.documents {
|
||||||
let mut doc_array = Vec::new();
|
|
||||||
// Add document ID if it exists
|
// Add document ID if it exists
|
||||||
if let Some(id) = doc.fields.get("_id") {
|
if let Some(id) = doc.fields.get("_id") {
|
||||||
doc_array.push(Protocol::BulkString(id.clone()));
|
response.push(Protocol::BulkString(id.clone()));
|
||||||
}
|
}
|
||||||
// Add score
|
// Add score
|
||||||
doc_array.push(Protocol::BulkString(doc.score.to_string()));
|
response.push(Protocol::BulkString(doc.score.to_string()));
|
||||||
// Add fields as key-value pairs
|
// Add fields as key-value pairs
|
||||||
for (field_name, field_value) in doc.fields {
|
for (field_name, field_value) in std::mem::take(&mut doc.fields) {
|
||||||
if field_name != "_id" {
|
if field_name != "_id" {
|
||||||
doc_array.push(Protocol::BulkString(field_name));
|
response.push(Protocol::BulkString(field_name));
|
||||||
doc_array.push(Protocol::BulkString(field_value));
|
response.push(Protocol::BulkString(field_value));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
response.push(Protocol::Array(doc_array));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Protocol::Array(response))
|
Ok(Protocol::Array(response))
|
||||||
@@ -278,12 +277,11 @@ pub async fn ft_del_cmd(
|
|||||||
return Ok(Protocol::err("ERR write permission denied"));
|
return Ok(Protocol::err("ERR write permission denied"));
|
||||||
}
|
}
|
||||||
let indexes = server.search_indexes.read().unwrap();
|
let indexes = server.search_indexes.read().unwrap();
|
||||||
let _search_index = indexes
|
let search_index = indexes
|
||||||
.get(&index_name)
|
.get(&index_name)
|
||||||
.ok_or_else(|| DBError(format!("Index '{}' not found", index_name)))?;
|
.ok_or_else(|| DBError(format!("Index '{}' not found", index_name)))?;
|
||||||
// Not fully implemented yet: Tantivy delete by term would require a writer session and commit coordination.
|
let existed = search_index.delete_document_by_id(&doc_id)?;
|
||||||
println!("Deleting document '{}' from index '{}'", doc_id, index_name);
|
Ok(Protocol::SimpleString(if existed { "1".to_string() } else { "0".to_string() }))
|
||||||
Ok(Protocol::SimpleString("1".to_string()))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn ft_info_cmd(server: &Server, index_name: String) -> Result<Protocol, DBError> {
|
pub async fn ft_info_cmd(server: &Server, index_name: String) -> Result<Protocol, DBError> {
|
||||||
@@ -355,10 +353,13 @@ pub async fn ft_drop_cmd(server: &Server, index_name: String) -> Result<Protocol
|
|||||||
return Ok(Protocol::err("ERR write permission denied"));
|
return Ok(Protocol::err("ERR write permission denied"));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove from registry
|
// Remove from registry and files; report error if nothing to drop
|
||||||
|
let mut existed = false;
|
||||||
{
|
{
|
||||||
let mut indexes = server.search_indexes.write().unwrap();
|
let mut indexes = server.search_indexes.write().unwrap();
|
||||||
indexes.remove(&index_name);
|
if indexes.remove(&index_name).is_some() {
|
||||||
|
existed = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove the index files from disk
|
// Remove the index files from disk
|
||||||
@@ -366,6 +367,11 @@ pub async fn ft_drop_cmd(server: &Server, index_name: String) -> Result<Protocol
|
|||||||
if index_path.exists() {
|
if index_path.exists() {
|
||||||
std::fs::remove_dir_all(&index_path)
|
std::fs::remove_dir_all(&index_path)
|
||||||
.map_err(|e| DBError(format!("Failed to remove index files: {}", e)))?;
|
.map_err(|e| DBError(format!("Failed to remove index files: {}", e)))?;
|
||||||
|
existed = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if !existed {
|
||||||
|
return Ok(Protocol::err(&format!("ERR Index '{}' not found", index_name)));
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Protocol::SimpleString("OK".to_string()))
|
Ok(Protocol::SimpleString("OK".to_string()))
|
||||||
|
@@ -103,12 +103,40 @@ impl Server {
|
|||||||
|
|
||||||
/// Check if current permissions allow read operations
|
/// Check if current permissions allow read operations
|
||||||
pub fn has_read_permission(&self) -> bool {
|
pub fn has_read_permission(&self) -> bool {
|
||||||
matches!(self.current_permissions, Some(crate::rpc::Permissions::Read) | Some(crate::rpc::Permissions::ReadWrite))
|
// If an explicit permission is set for this connection, honor it.
|
||||||
|
if let Some(perms) = self.current_permissions.as_ref() {
|
||||||
|
return matches!(*perms, crate::rpc::Permissions::Read | crate::rpc::Permissions::ReadWrite);
|
||||||
|
}
|
||||||
|
// Fallback ONLY when no explicit permission context (e.g., JSON-RPC flows without SELECT).
|
||||||
|
match crate::admin_meta::verify_access(
|
||||||
|
&self.option.dir,
|
||||||
|
self.option.backend.clone(),
|
||||||
|
&self.option.admin_secret,
|
||||||
|
self.selected_db,
|
||||||
|
None,
|
||||||
|
) {
|
||||||
|
Ok(Some(crate::rpc::Permissions::Read)) | Ok(Some(crate::rpc::Permissions::ReadWrite)) => true,
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if current permissions allow write operations
|
/// Check if current permissions allow write operations
|
||||||
pub fn has_write_permission(&self) -> bool {
|
pub fn has_write_permission(&self) -> bool {
|
||||||
matches!(self.current_permissions, Some(crate::rpc::Permissions::ReadWrite))
|
// If an explicit permission is set for this connection, honor it.
|
||||||
|
if let Some(perms) = self.current_permissions.as_ref() {
|
||||||
|
return matches!(*perms, crate::rpc::Permissions::ReadWrite);
|
||||||
|
}
|
||||||
|
// Fallback ONLY when no explicit permission context (e.g., JSON-RPC flows without SELECT).
|
||||||
|
match crate::admin_meta::verify_access(
|
||||||
|
&self.option.dir,
|
||||||
|
self.option.backend.clone(),
|
||||||
|
&self.option.admin_secret,
|
||||||
|
self.selected_db,
|
||||||
|
None,
|
||||||
|
) {
|
||||||
|
Ok(Some(crate::rpc::Permissions::ReadWrite)) => true,
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ----- BLPOP waiter helpers -----
|
// ----- BLPOP waiter helpers -----
|
||||||
|
@@ -394,6 +394,10 @@ impl TantivySearch {
|
|||||||
writer
|
writer
|
||||||
.commit()
|
.commit()
|
||||||
.map_err(|e| DBError(format!("Failed to commit: {}", e)))?;
|
.map_err(|e| DBError(format!("Failed to commit: {}", e)))?;
|
||||||
|
// Make new documents visible to searches
|
||||||
|
self.reader
|
||||||
|
.reload()
|
||||||
|
.map_err(|e| DBError(format!("Failed to reload reader: {}", e)))?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -402,6 +406,10 @@ impl TantivySearch {
|
|||||||
query_str: &str,
|
query_str: &str,
|
||||||
options: SearchOptions,
|
options: SearchOptions,
|
||||||
) -> Result<SearchResults, DBError> {
|
) -> Result<SearchResults, DBError> {
|
||||||
|
// Ensure reader is up to date with latest commits
|
||||||
|
self.reader
|
||||||
|
.reload()
|
||||||
|
.map_err(|e| DBError(format!("Failed to reload reader: {}", e)))?;
|
||||||
let searcher = self.reader.searcher();
|
let searcher = self.reader.searcher();
|
||||||
|
|
||||||
// Ensure we have searchable fields
|
// Ensure we have searchable fields
|
||||||
@@ -602,6 +610,40 @@ impl TantivySearch {
|
|||||||
config: self.config.clone(),
|
config: self.config.clone(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Delete a document by its _id term. Returns true if the document existed before deletion.
|
||||||
|
pub fn delete_document_by_id(&self, doc_id: &str) -> Result<bool, DBError> {
|
||||||
|
// Determine existence by running a tiny term query
|
||||||
|
let existed = if let Some((id_field, _)) = self.index_schema.fields.get("_id") {
|
||||||
|
let term = Term::from_field_text(*id_field, doc_id);
|
||||||
|
let searcher = self.reader.searcher();
|
||||||
|
let tq = TermQuery::new(term.clone(), IndexRecordOption::Basic);
|
||||||
|
let hits = searcher
|
||||||
|
.search(&tq, &TopDocs::with_limit(1))
|
||||||
|
.map_err(|e| DBError(format!("Failed to search for existing doc: {}", e)))?;
|
||||||
|
!hits.is_empty()
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
};
|
||||||
|
|
||||||
|
// Perform deletion and commit
|
||||||
|
let mut writer = self
|
||||||
|
.writer
|
||||||
|
.write()
|
||||||
|
.map_err(|e| DBError(format!("Failed to acquire writer lock: {}", e)))?;
|
||||||
|
if let Some((id_field, _)) = self.index_schema.fields.get("_id") {
|
||||||
|
writer.delete_term(Term::from_field_text(*id_field, doc_id));
|
||||||
|
}
|
||||||
|
writer
|
||||||
|
.commit()
|
||||||
|
.map_err(|e| DBError(format!("Failed to commit delete: {}", e)))?;
|
||||||
|
// Refresh reader to observe deletion
|
||||||
|
self.reader
|
||||||
|
.reload()
|
||||||
|
.map_err(|e| DBError(format!("Failed to reload reader: {}", e)))?;
|
||||||
|
|
||||||
|
Ok(existed)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
|
@@ -87,6 +87,7 @@ async fn setup_server() -> (ServerProcessGuard, u16, Connection, HttpClient) {
|
|||||||
&port.to_string(),
|
&port.to_string(),
|
||||||
"--rpc-port",
|
"--rpc-port",
|
||||||
&(port + 1).to_string(),
|
&(port + 1).to_string(),
|
||||||
|
"--enable-rpc",
|
||||||
"--debug",
|
"--debug",
|
||||||
"--admin-secret",
|
"--admin-secret",
|
||||||
"test-admin",
|
"test-admin",
|
||||||
|
Reference in New Issue
Block a user