378 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
			
		
		
	
	
			378 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
use crate::{
 | 
						|
    error::DBError,
 | 
						|
    protocol::Protocol,
 | 
						|
    server::Server,
 | 
						|
    tantivy_search::{
 | 
						|
        FieldDef, Filter, FilterType, IndexConfig, NumericType, SearchOptions, TantivySearch,
 | 
						|
    },
 | 
						|
};
 | 
						|
use std::collections::HashMap;
 | 
						|
use std::sync::Arc;
 | 
						|
 | 
						|
pub async fn ft_create_cmd(
 | 
						|
    server: &Server,
 | 
						|
    index_name: String,
 | 
						|
    schema: Vec<(String, String, Vec<String>)>,
 | 
						|
) -> Result<Protocol, DBError> {
 | 
						|
    if server.selected_db == 0 {
 | 
						|
        return Ok(Protocol::err("FT commands are not allowed on DB 0"));
 | 
						|
    }
 | 
						|
    // Enforce Tantivy backend for selected DB
 | 
						|
    let is_tantivy = crate::admin_meta::get_database_backend(
 | 
						|
        &server.option.dir,
 | 
						|
        server.option.backend.clone(),
 | 
						|
        &server.option.admin_secret,
 | 
						|
        server.selected_db,
 | 
						|
    )
 | 
						|
    .ok()
 | 
						|
    .flatten()
 | 
						|
    .map(|b| matches!(b, crate::options::BackendType::Tantivy))
 | 
						|
    .unwrap_or(false);
 | 
						|
    if !is_tantivy {
 | 
						|
        return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed"));
 | 
						|
    }
 | 
						|
 | 
						|
    if !server.has_write_permission() {
 | 
						|
        return Ok(Protocol::err("ERR write permission denied"));
 | 
						|
    }
 | 
						|
 | 
						|
    // Parse schema into field definitions
 | 
						|
    let mut field_definitions = Vec::new();
 | 
						|
    for (field_name, field_type, options) in schema {
 | 
						|
        let field_def = match field_type.to_uppercase().as_str() {
 | 
						|
            "TEXT" => {
 | 
						|
                let mut sortable = false;
 | 
						|
                let mut no_index = false;
 | 
						|
                // Weight is not used in current implementation
 | 
						|
                let mut _weight = 1.0f32;
 | 
						|
                let mut i = 0;
 | 
						|
                while i < options.len() {
 | 
						|
                    match options[i].to_uppercase().as_str() {
 | 
						|
                        "WEIGHT" => {
 | 
						|
                            if i + 1 < options.len() {
 | 
						|
                                _weight = options[i + 1].parse::<f32>().unwrap_or(1.0);
 | 
						|
                                i += 2;
 | 
						|
                                continue;
 | 
						|
                            }
 | 
						|
                        }
 | 
						|
                        "SORTABLE" => {
 | 
						|
                            sortable = true;
 | 
						|
                        }
 | 
						|
                        "NOINDEX" => {
 | 
						|
                            no_index = true;
 | 
						|
                        }
 | 
						|
                        _ => {}
 | 
						|
                    }
 | 
						|
                    i += 1;
 | 
						|
                }
 | 
						|
                FieldDef::Text {
 | 
						|
                    stored: true,
 | 
						|
                    indexed: !no_index,
 | 
						|
                    tokenized: true,
 | 
						|
                    fast: sortable,
 | 
						|
                }
 | 
						|
            }
 | 
						|
            "NUMERIC" => {
 | 
						|
                // default to F64
 | 
						|
                let mut sortable = false;
 | 
						|
                for opt in &options {
 | 
						|
                    if opt.to_uppercase() == "SORTABLE" {
 | 
						|
                        sortable = true;
 | 
						|
                    }
 | 
						|
                }
 | 
						|
                FieldDef::Numeric {
 | 
						|
                    stored: true,
 | 
						|
                    indexed: true,
 | 
						|
                    fast: sortable,
 | 
						|
                    precision: NumericType::F64,
 | 
						|
                }
 | 
						|
            }
 | 
						|
            "TAG" => {
 | 
						|
                let mut separator = ",".to_string();
 | 
						|
                let mut case_sensitive = false;
 | 
						|
                let mut i = 0;
 | 
						|
                while i < options.len() {
 | 
						|
                    match options[i].to_uppercase().as_str() {
 | 
						|
                        "SEPARATOR" => {
 | 
						|
                            if i + 1 < options.len() {
 | 
						|
                                separator = options[i + 1].clone();
 | 
						|
                                i += 2;
 | 
						|
                                continue;
 | 
						|
                            }
 | 
						|
                        }
 | 
						|
                        "CASESENSITIVE" => {
 | 
						|
                            case_sensitive = true;
 | 
						|
                        }
 | 
						|
                        _ => {}
 | 
						|
                    }
 | 
						|
                    i += 1;
 | 
						|
                }
 | 
						|
                FieldDef::Tag {
 | 
						|
                    stored: true,
 | 
						|
                    separator,
 | 
						|
                    case_sensitive,
 | 
						|
                }
 | 
						|
            }
 | 
						|
            "GEO" => FieldDef::Geo { stored: true },
 | 
						|
            _ => {
 | 
						|
                return Err(DBError(format!("Unknown field type: {}", field_type)));
 | 
						|
            }
 | 
						|
        };
 | 
						|
        field_definitions.push((field_name, field_def));
 | 
						|
    }
 | 
						|
 | 
						|
    // Create the search index
 | 
						|
    let search_path = server.search_index_path();
 | 
						|
    let config = IndexConfig::default();
 | 
						|
    let search_index = TantivySearch::new_with_schema(
 | 
						|
        search_path,
 | 
						|
        index_name.clone(),
 | 
						|
        field_definitions,
 | 
						|
        Some(config),
 | 
						|
    )?;
 | 
						|
 | 
						|
    // Store in registry
 | 
						|
    let mut indexes = server.search_indexes.write().unwrap();
 | 
						|
    indexes.insert(index_name, Arc::new(search_index));
 | 
						|
 | 
						|
    Ok(Protocol::SimpleString("OK".to_string()))
 | 
						|
}
 | 
						|
 | 
						|
pub async fn ft_add_cmd(
 | 
						|
    server: &Server,
 | 
						|
    index_name: String,
 | 
						|
    doc_id: String,
 | 
						|
    _score: f64,
 | 
						|
    fields: HashMap<String, String>,
 | 
						|
) -> Result<Protocol, DBError> {
 | 
						|
    if server.selected_db == 0 {
 | 
						|
        return Ok(Protocol::err("FT commands are not allowed on DB 0"));
 | 
						|
    }
 | 
						|
    // Enforce Tantivy backend for selected DB
 | 
						|
    let is_tantivy = crate::admin_meta::get_database_backend(
 | 
						|
        &server.option.dir,
 | 
						|
        server.option.backend.clone(),
 | 
						|
        &server.option.admin_secret,
 | 
						|
        server.selected_db,
 | 
						|
    )
 | 
						|
    .ok()
 | 
						|
    .flatten()
 | 
						|
    .map(|b| matches!(b, crate::options::BackendType::Tantivy))
 | 
						|
    .unwrap_or(false);
 | 
						|
    if !is_tantivy {
 | 
						|
        return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed"));
 | 
						|
    }
 | 
						|
    if !server.has_write_permission() {
 | 
						|
        return Ok(Protocol::err("ERR write permission denied"));
 | 
						|
    }
 | 
						|
    let indexes = server.search_indexes.read().unwrap();
 | 
						|
    let search_index = indexes
 | 
						|
        .get(&index_name)
 | 
						|
        .ok_or_else(|| DBError(format!("Index '{}' not found", index_name)))?;
 | 
						|
    search_index.add_document_with_fields(&doc_id, fields)?;
 | 
						|
    Ok(Protocol::SimpleString("OK".to_string()))
 | 
						|
}
 | 
						|
 | 
						|
pub async fn ft_search_cmd(
 | 
						|
    server: &Server,
 | 
						|
    index_name: String,
 | 
						|
    query: String,
 | 
						|
    filters: Vec<(String, String)>,
 | 
						|
    limit: Option<usize>,
 | 
						|
    offset: Option<usize>,
 | 
						|
    return_fields: Option<Vec<String>>,
 | 
						|
) -> Result<Protocol, DBError> {
 | 
						|
    if server.selected_db == 0 {
 | 
						|
        return Ok(Protocol::err("FT commands are not allowed on DB 0"));
 | 
						|
    }
 | 
						|
    // Enforce Tantivy backend for selected DB
 | 
						|
    let is_tantivy = crate::admin_meta::get_database_backend(
 | 
						|
        &server.option.dir,
 | 
						|
        server.option.backend.clone(),
 | 
						|
        &server.option.admin_secret,
 | 
						|
        server.selected_db,
 | 
						|
    )
 | 
						|
    .ok()
 | 
						|
    .flatten()
 | 
						|
    .map(|b| matches!(b, crate::options::BackendType::Tantivy))
 | 
						|
    .unwrap_or(false);
 | 
						|
    if !is_tantivy {
 | 
						|
        return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed"));
 | 
						|
    }
 | 
						|
    if !server.has_read_permission() {
 | 
						|
        return Ok(Protocol::err("ERR read permission denied"));
 | 
						|
    }
 | 
						|
    let indexes = server.search_indexes.read().unwrap();
 | 
						|
    let search_index = indexes
 | 
						|
        .get(&index_name)
 | 
						|
        .ok_or_else(|| DBError(format!("Index '{}' not found", index_name)))?;
 | 
						|
 | 
						|
    let search_filters = filters
 | 
						|
        .into_iter()
 | 
						|
        .map(|(field, value)| Filter {
 | 
						|
            field,
 | 
						|
            filter_type: FilterType::Equals(value),
 | 
						|
        })
 | 
						|
        .collect();
 | 
						|
 | 
						|
    let options = SearchOptions {
 | 
						|
        limit: limit.unwrap_or(10),
 | 
						|
        offset: offset.unwrap_or(0),
 | 
						|
        filters: search_filters,
 | 
						|
        sort_by: None,
 | 
						|
        return_fields,
 | 
						|
        highlight: false,
 | 
						|
    };
 | 
						|
 | 
						|
    let results = search_index.search_with_options(&query, options)?;
 | 
						|
 
 | 
						|
    // Format results as a flattened Redis protocol array to match client expectations:
 | 
						|
    // [ total, doc_id, score, field, value, field, value, ... , doc_id, score, ... ]
 | 
						|
    let mut response = Vec::new();
 | 
						|
    // First element is the total count
 | 
						|
    response.push(Protocol::BulkString(results.total.to_string()));
 | 
						|
    // Then each document flattened
 | 
						|
    for mut doc in results.documents {
 | 
						|
        // Add document ID if it exists
 | 
						|
        if let Some(id) = doc.fields.get("_id") {
 | 
						|
            response.push(Protocol::BulkString(id.clone()));
 | 
						|
        }
 | 
						|
        // Add score
 | 
						|
        response.push(Protocol::BulkString(doc.score.to_string()));
 | 
						|
        // Add fields as key-value pairs
 | 
						|
        for (field_name, field_value) in std::mem::take(&mut doc.fields) {
 | 
						|
            if field_name != "_id" {
 | 
						|
                response.push(Protocol::BulkString(field_name));
 | 
						|
                response.push(Protocol::BulkString(field_value));
 | 
						|
            }
 | 
						|
        }
 | 
						|
    }
 | 
						|
 
 | 
						|
    Ok(Protocol::Array(response))
 | 
						|
}
 | 
						|
 | 
						|
pub async fn ft_del_cmd(
 | 
						|
    server: &Server,
 | 
						|
    index_name: String,
 | 
						|
    doc_id: String,
 | 
						|
) -> Result<Protocol, DBError> {
 | 
						|
    if server.selected_db == 0 {
 | 
						|
        return Ok(Protocol::err("FT commands are not allowed on DB 0"));
 | 
						|
    }
 | 
						|
    // Enforce Tantivy backend for selected DB
 | 
						|
    let is_tantivy = crate::admin_meta::get_database_backend(
 | 
						|
        &server.option.dir,
 | 
						|
        server.option.backend.clone(),
 | 
						|
        &server.option.admin_secret,
 | 
						|
        server.selected_db,
 | 
						|
    )
 | 
						|
    .ok()
 | 
						|
    .flatten()
 | 
						|
    .map(|b| matches!(b, crate::options::BackendType::Tantivy))
 | 
						|
    .unwrap_or(false);
 | 
						|
    if !is_tantivy {
 | 
						|
        return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed"));
 | 
						|
    }
 | 
						|
    if !server.has_write_permission() {
 | 
						|
        return Ok(Protocol::err("ERR write permission denied"));
 | 
						|
    }
 | 
						|
    let indexes = server.search_indexes.read().unwrap();
 | 
						|
    let search_index = indexes
 | 
						|
        .get(&index_name)
 | 
						|
        .ok_or_else(|| DBError(format!("Index '{}' not found", index_name)))?;
 | 
						|
    let existed = search_index.delete_document_by_id(&doc_id)?;
 | 
						|
    Ok(Protocol::SimpleString(if existed { "1".to_string() } else { "0".to_string() }))
 | 
						|
}
 | 
						|
 | 
						|
pub async fn ft_info_cmd(server: &Server, index_name: String) -> Result<Protocol, DBError> {
 | 
						|
    if server.selected_db == 0 {
 | 
						|
        return Ok(Protocol::err("FT commands are not allowed on DB 0"));
 | 
						|
    }
 | 
						|
    // Enforce Tantivy backend for selected DB
 | 
						|
    let is_tantivy = crate::admin_meta::get_database_backend(
 | 
						|
        &server.option.dir,
 | 
						|
        server.option.backend.clone(),
 | 
						|
        &server.option.admin_secret,
 | 
						|
        server.selected_db,
 | 
						|
    )
 | 
						|
    .ok()
 | 
						|
    .flatten()
 | 
						|
    .map(|b| matches!(b, crate::options::BackendType::Tantivy))
 | 
						|
    .unwrap_or(false);
 | 
						|
    if !is_tantivy {
 | 
						|
        return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed"));
 | 
						|
    }
 | 
						|
    if !server.has_read_permission() {
 | 
						|
        return Ok(Protocol::err("ERR read permission denied"));
 | 
						|
    }
 | 
						|
    let indexes = server.search_indexes.read().unwrap();
 | 
						|
    let search_index = indexes
 | 
						|
        .get(&index_name)
 | 
						|
        .ok_or_else(|| DBError(format!("Index '{}' not found", index_name)))?;
 | 
						|
    let info = search_index.get_info()?;
 | 
						|
 | 
						|
    // Format info as Redis protocol
 | 
						|
    let mut response = Vec::new();
 | 
						|
    response.push(Protocol::BulkString("index_name".to_string()));
 | 
						|
    response.push(Protocol::BulkString(info.name));
 | 
						|
    response.push(Protocol::BulkString("num_docs".to_string()));
 | 
						|
    response.push(Protocol::BulkString(info.num_docs.to_string()));
 | 
						|
    response.push(Protocol::BulkString("num_fields".to_string()));
 | 
						|
    response.push(Protocol::BulkString(info.fields.len().to_string()));
 | 
						|
    response.push(Protocol::BulkString("fields".to_string()));
 | 
						|
    let fields_str = info
 | 
						|
        .fields
 | 
						|
        .iter()
 | 
						|
        .map(|f| format!("{}:{}", f.name, f.field_type))
 | 
						|
        .collect::<Vec<_>>()
 | 
						|
        .join(", ");
 | 
						|
    response.push(Protocol::BulkString(fields_str));
 | 
						|
    Ok(Protocol::Array(response))
 | 
						|
}
 | 
						|
 | 
						|
pub async fn ft_drop_cmd(server: &Server, index_name: String) -> Result<Protocol, DBError> {
 | 
						|
    if server.selected_db == 0 {
 | 
						|
        return Ok(Protocol::err("FT commands are not allowed on DB 0"));
 | 
						|
    }
 | 
						|
    // Enforce Tantivy backend for selected DB
 | 
						|
    let is_tantivy = crate::admin_meta::get_database_backend(
 | 
						|
        &server.option.dir,
 | 
						|
        server.option.backend.clone(),
 | 
						|
        &server.option.admin_secret,
 | 
						|
        server.selected_db,
 | 
						|
    )
 | 
						|
    .ok()
 | 
						|
    .flatten()
 | 
						|
    .map(|b| matches!(b, crate::options::BackendType::Tantivy))
 | 
						|
    .unwrap_or(false);
 | 
						|
    if !is_tantivy {
 | 
						|
        return Ok(Protocol::err("ERR DB backend is not Tantivy; FT.* commands are not allowed"));
 | 
						|
    }
 | 
						|
 | 
						|
    if !server.has_write_permission() {
 | 
						|
        return Ok(Protocol::err("ERR write permission denied"));
 | 
						|
    }
 | 
						|
 | 
						|
    // Remove from registry and files; report error if nothing to drop
 | 
						|
    let mut existed = false;
 | 
						|
    {
 | 
						|
        let mut indexes = server.search_indexes.write().unwrap();
 | 
						|
        if indexes.remove(&index_name).is_some() {
 | 
						|
            existed = true;
 | 
						|
        }
 | 
						|
    }
 | 
						|
 | 
						|
    // Remove the index files from disk
 | 
						|
    let index_path = server.search_index_path().join(&index_name);
 | 
						|
    if index_path.exists() {
 | 
						|
        std::fs::remove_dir_all(&index_path)
 | 
						|
            .map_err(|e| DBError(format!("Failed to remove index files: {}", e)))?;
 | 
						|
        existed = true;
 | 
						|
    }
 | 
						|
 | 
						|
    if !existed {
 | 
						|
        return Ok(Protocol::err(&format!("ERR Index '{}' not found", index_name)));
 | 
						|
    }
 | 
						|
 | 
						|
    Ok(Protocol::SimpleString("OK".to_string()))
 | 
						|
} |