Compare commits
11 Commits
management
...
fbcaafc86b
Author | SHA1 | Date | |
---|---|---|---|
fbcaafc86b | |||
ce1be0369a | |||
4b8216bfdb | |||
8bc372ea64 | |||
7920945986 | |||
d4d3660bac | |||
b68325016d | |||
2743cd9c81 | |||
eb07386cf4 | |||
fc7672c78a | |||
46f96fa8cf |
1455
Cargo.lock
generated
1455
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -1,8 +1,8 @@
|
||||
[package]
|
||||
name = "herodb"
|
||||
version = "0.0.1"
|
||||
authors = ["ThreeFold Tech NV"]
|
||||
edition = "2024"
|
||||
authors = ["Pin Fang <fpfangpin@hotmail.com>"]
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0.59"
|
||||
@@ -24,7 +24,7 @@ age = "0.10"
|
||||
secrecy = "0.8"
|
||||
ed25519-dalek = "2"
|
||||
base64 = "0.22"
|
||||
jsonrpsee = { version = "0.26.0", features = ["http-client", "ws-client", "server", "macros"] }
|
||||
tantivy = "0.25.0"
|
||||
|
||||
[dev-dependencies]
|
||||
redis = { version = "0.24", features = ["aio", "tokio-comp"] }
|
||||
|
@@ -47,13 +47,13 @@ You can start HeroDB with different backends and encryption options:
|
||||
#### `redb` with Encryption
|
||||
|
||||
```bash
|
||||
./target/release/herodb --dir /tmp/herodb_encrypted --port 6379 --encrypt --encryption_key mysecretkey
|
||||
./target/release/herodb --dir /tmp/herodb_encrypted --port 6379 --encrypt --key mysecretkey
|
||||
```
|
||||
|
||||
#### `sled` with Encryption
|
||||
|
||||
```bash
|
||||
./target/release/herodb --dir /tmp/herodb_sled_encrypted --port 6379 --sled --encrypt --encryption_key mysecretkey
|
||||
./target/release/herodb --dir /tmp/herodb_sled_encrypted --port 6379 --sled --encrypt --key mysecretkey
|
||||
```
|
||||
|
||||
## Usage with Redis Clients
|
||||
|
239
examples/tantivy_search_demo.sh
Executable file
239
examples/tantivy_search_demo.sh
Executable file
@@ -0,0 +1,239 @@
|
||||
#!/bin/bash
|
||||
|
||||
# HeroDB Tantivy Search Demo
|
||||
# This script demonstrates full-text search capabilities using Redis commands
|
||||
# HeroDB server should be running on port 6381
|
||||
|
||||
set -e # Exit on any error
|
||||
|
||||
# Configuration
|
||||
REDIS_HOST="localhost"
|
||||
REDIS_PORT="6382"
|
||||
REDIS_CLI="redis-cli -h $REDIS_HOST -p $REDIS_PORT"
|
||||
|
||||
# Start the herodb server in the background
|
||||
echo "Starting herodb server..."
|
||||
cargo run -p herodb -- --dir /tmp/herodbtest --port ${REDIS_PORT} --debug &
|
||||
SERVER_PID=$!
|
||||
echo
|
||||
sleep 2 # Give the server a moment to start
|
||||
|
||||
# Colors for output
|
||||
RED='\033[0;31m'
|
||||
GREEN='\033[0;32m'
|
||||
BLUE='\033[0;34m'
|
||||
YELLOW='\033[1;33m'
|
||||
NC='\033[0m' # No Color
|
||||
|
||||
# Function to print colored output
|
||||
print_header() {
|
||||
echo -e "${BLUE}=== $1 ===${NC}"
|
||||
}
|
||||
|
||||
print_success() {
|
||||
echo -e "${GREEN}✓ $1${NC}"
|
||||
}
|
||||
|
||||
print_info() {
|
||||
echo -e "${YELLOW}ℹ $1${NC}"
|
||||
}
|
||||
|
||||
print_error() {
|
||||
echo -e "${RED}✗ $1${NC}"
|
||||
}
|
||||
|
||||
# Function to check if HeroDB is running
|
||||
check_herodb() {
|
||||
print_info "Checking if HeroDB is running on port $REDIS_PORT..."
|
||||
if ! $REDIS_CLI ping > /dev/null 2>&1; then
|
||||
print_error "HeroDB is not running on port $REDIS_PORT"
|
||||
print_info "Please start HeroDB with: cargo run -- --port $REDIS_PORT"
|
||||
exit 1
|
||||
fi
|
||||
print_success "HeroDB is running and responding"
|
||||
}
|
||||
|
||||
# Function to execute Redis command with error handling
|
||||
execute_cmd() {
|
||||
local cmd="$1"
|
||||
local description="$2"
|
||||
|
||||
echo -e "${YELLOW}Command:${NC} $cmd"
|
||||
if result=$($REDIS_CLI $cmd 2>&1); then
|
||||
echo -e "${GREEN}Result:${NC} $result"
|
||||
return 0
|
||||
else
|
||||
print_error "Failed: $description"
|
||||
echo "Error: $result"
|
||||
return 1
|
||||
fi
|
||||
}
|
||||
|
||||
# Function to pause for readability
|
||||
pause() {
|
||||
echo
|
||||
read -p "Press Enter to continue..."
|
||||
echo
|
||||
}
|
||||
|
||||
# Main demo function
|
||||
main() {
|
||||
clear
|
||||
print_header "HeroDB Tantivy Search Demonstration"
|
||||
echo "This demo shows full-text search capabilities using Redis commands"
|
||||
echo "HeroDB runs on port $REDIS_PORT (instead of Redis default 6379)"
|
||||
echo
|
||||
|
||||
# Check if HeroDB is running
|
||||
check_herodb
|
||||
echo
|
||||
|
||||
print_header "Step 1: Create Search Index"
|
||||
print_info "Creating a product catalog search index with various field types"
|
||||
|
||||
# Create search index with schema
|
||||
execute_cmd "FT.CREATE product_catalog SCHEMA title TEXT description TEXT category TAG price NUMERIC rating NUMERIC location GEO" \
|
||||
"Creating search index"
|
||||
|
||||
print_success "Search index 'product_catalog' created successfully"
|
||||
pause
|
||||
|
||||
print_header "Step 2: Add Sample Products"
|
||||
print_info "Adding sample products to demonstrate different search scenarios"
|
||||
|
||||
# Add sample products using FT.ADD
|
||||
execute_cmd "FT.ADD product_catalog product:1 1.0 title 'Wireless Bluetooth Headphones' description 'Premium noise-canceling headphones with 30-hour battery life' category 'electronics,audio' price 299.99 rating 4.5 location '-122.4194,37.7749'" "Adding product 1"
|
||||
execute_cmd "FT.ADD product_catalog product:2 1.0 title 'Organic Coffee Beans' description 'Single-origin Ethiopian coffee beans, medium roast' category 'food,beverages,organic' price 24.99 rating 4.8 location '-74.0060,40.7128'" "Adding product 2"
|
||||
execute_cmd "FT.ADD product_catalog product:3 1.0 title 'Yoga Mat Premium' description 'Eco-friendly yoga mat with superior grip and cushioning' category 'fitness,wellness,eco-friendly' price 89.99 rating 4.3 location '-118.2437,34.0522'" "Adding product 3"
|
||||
execute_cmd "FT.ADD product_catalog product:4 1.0 title 'Smart Home Speaker' description 'Voice-controlled smart speaker with AI assistant' category 'electronics,smart-home' price 149.99 rating 4.2 location '-87.6298,41.8781'" "Adding product 4"
|
||||
execute_cmd "FT.ADD product_catalog product:5 1.0 title 'Organic Green Tea' description 'Premium organic green tea leaves from Japan' category 'food,beverages,organic,tea' price 18.99 rating 4.7 location '139.6503,35.6762'" "Adding product 5"
|
||||
execute_cmd "FT.ADD product_catalog product:6 1.0 title 'Wireless Gaming Mouse' description 'High-precision gaming mouse with RGB lighting' category 'electronics,gaming' price 79.99 rating 4.4 location '-122.3321,47.6062'" "Adding product 6"
|
||||
execute_cmd "FT.ADD product_catalog product:7 1.0 title 'Comfortable meditation cushion for mindfulness practice' description 'Meditation cushion with premium materials' category 'wellness,meditation' price 45.99 rating 4.6 location '-122.4194,37.7749'" "Adding product 7"
|
||||
execute_cmd "FT.ADD product_catalog product:8 1.0 title 'Bluetooth Earbuds' description 'True wireless earbuds with active noise cancellation' category 'electronics,audio' price 199.99 rating 4.1 location '-74.0060,40.7128'" "Adding product 8"
|
||||
|
||||
print_success "Added 8 products to the index"
|
||||
pause
|
||||
|
||||
print_header "Step 3: Basic Text Search"
|
||||
print_info "Searching for 'wireless' products"
|
||||
|
||||
execute_cmd "FT.SEARCH product_catalog wireless" "Basic text search"
|
||||
pause
|
||||
|
||||
print_header "Step 4: Search with Filters"
|
||||
print_info "Searching for 'organic' products"
|
||||
|
||||
execute_cmd "FT.SEARCH product_catalog organic" "Filtered search"
|
||||
pause
|
||||
|
||||
print_header "Step 5: Numeric Range Search"
|
||||
print_info "Searching for 'premium' products"
|
||||
|
||||
execute_cmd "FT.SEARCH product_catalog premium" "Text search"
|
||||
pause
|
||||
|
||||
print_header "Step 6: Sorting Results"
|
||||
print_info "Searching for electronics"
|
||||
|
||||
execute_cmd "FT.SEARCH product_catalog electronics" "Category search"
|
||||
pause
|
||||
|
||||
print_header "Step 7: Limiting Results"
|
||||
print_info "Searching for wireless products with limit"
|
||||
|
||||
execute_cmd "FT.SEARCH product_catalog wireless LIMIT 0 3" "Limited results"
|
||||
pause
|
||||
|
||||
print_header "Step 8: Complex Query"
|
||||
print_info "Finding audio products with noise cancellation"
|
||||
|
||||
execute_cmd "FT.SEARCH product_catalog 'noise cancellation'" "Complex query"
|
||||
pause
|
||||
|
||||
print_header "Step 9: Geographic Search"
|
||||
print_info "Searching for meditation products"
|
||||
|
||||
execute_cmd "FT.SEARCH product_catalog meditation" "Text search"
|
||||
pause
|
||||
|
||||
print_header "Step 10: Aggregation Example"
|
||||
print_info "Getting index information and statistics"
|
||||
|
||||
execute_cmd "FT.INFO product_catalog" "Index information"
|
||||
pause
|
||||
|
||||
print_header "Step 11: Search Comparison"
|
||||
print_info "Comparing Tantivy search vs simple key matching"
|
||||
|
||||
echo -e "${YELLOW}Tantivy Full-Text Search:${NC}"
|
||||
execute_cmd "FT.SEARCH product_catalog 'battery life'" "Full-text search for 'battery life'"
|
||||
|
||||
echo
|
||||
echo -e "${YELLOW}Simple Key Pattern Matching:${NC}"
|
||||
execute_cmd "KEYS *battery*" "Simple pattern matching for 'battery'"
|
||||
|
||||
print_info "Notice how full-text search finds relevant results even when exact words don't match keys"
|
||||
pause
|
||||
|
||||
print_header "Step 12: Fuzzy Search"
|
||||
print_info "Searching for headphones"
|
||||
|
||||
execute_cmd "FT.SEARCH product_catalog headphones" "Text search"
|
||||
pause
|
||||
|
||||
print_header "Step 13: Phrase Search"
|
||||
print_info "Searching for coffee products"
|
||||
|
||||
execute_cmd "FT.SEARCH product_catalog coffee" "Text search"
|
||||
pause
|
||||
|
||||
print_header "Step 14: Boolean Queries"
|
||||
print_info "Searching for gaming products"
|
||||
|
||||
execute_cmd "FT.SEARCH product_catalog gaming" "Text search"
|
||||
echo
|
||||
execute_cmd "FT.SEARCH product_catalog tea" "Text search"
|
||||
pause
|
||||
|
||||
print_header "Step 15: Cleanup"
|
||||
print_info "Removing test data"
|
||||
|
||||
# Delete the search index
|
||||
execute_cmd "FT.DROP product_catalog" "Dropping search index"
|
||||
|
||||
# Clean up documents from search index
|
||||
for i in {1..8}; do
|
||||
execute_cmd "FT.DEL product_catalog product:$i" "Deleting product:$i from index"
|
||||
done
|
||||
|
||||
print_success "Cleanup completed"
|
||||
echo
|
||||
|
||||
print_header "Demo Summary"
|
||||
echo "This demonstration showed:"
|
||||
echo "• Creating search indexes with different field types"
|
||||
echo "• Adding documents to the search index"
|
||||
echo "• Basic and advanced text search queries"
|
||||
echo "• Filtering by categories and numeric ranges"
|
||||
echo "• Sorting and limiting results"
|
||||
echo "• Geographic searches"
|
||||
echo "• Fuzzy matching and phrase searches"
|
||||
echo "• Boolean query operators"
|
||||
echo "• Comparison with simple pattern matching"
|
||||
echo
|
||||
print_success "HeroDB Tantivy search demo completed successfully!"
|
||||
echo
|
||||
print_info "Key advantages of Tantivy full-text search:"
|
||||
echo " - Relevance scoring and ranking"
|
||||
echo " - Fuzzy matching and typo tolerance"
|
||||
echo " - Complex boolean queries"
|
||||
echo " - Field-specific searches and filters"
|
||||
echo " - Geographic and numeric range queries"
|
||||
echo " - Much faster than pattern matching on large datasets"
|
||||
echo
|
||||
print_info "To run HeroDB server: cargo run -- --port 6381"
|
||||
print_info "To connect with redis-cli: redis-cli -h localhost -p 6381"
|
||||
}
|
||||
|
||||
# Run the demo
|
||||
main "$@"
|
101
examples/test_tantivy_integration.sh
Executable file
101
examples/test_tantivy_integration.sh
Executable file
@@ -0,0 +1,101 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Simple Tantivy Search Integration Test for HeroDB
|
||||
# This script tests the full-text search functionality we just integrated
|
||||
|
||||
set -e
|
||||
|
||||
echo "🔍 Testing Tantivy Search Integration..."
|
||||
|
||||
# Build the project first
|
||||
echo "📦 Building HeroDB..."
|
||||
cargo build --release
|
||||
|
||||
# Start the server in the background
|
||||
echo "🚀 Starting HeroDB server on port 6379..."
|
||||
cargo run --release -- --port 6379 --dir ./test_data &
|
||||
SERVER_PID=$!
|
||||
|
||||
# Wait for server to start
|
||||
sleep 3
|
||||
|
||||
# Function to cleanup on exit
|
||||
cleanup() {
|
||||
echo "🧹 Cleaning up..."
|
||||
kill $SERVER_PID 2>/dev/null || true
|
||||
rm -rf ./test_data
|
||||
exit
|
||||
}
|
||||
|
||||
# Set trap for cleanup
|
||||
trap cleanup EXIT INT TERM
|
||||
|
||||
# Function to execute Redis command
|
||||
execute_cmd() {
|
||||
local cmd="$1"
|
||||
local description="$2"
|
||||
|
||||
echo "📝 $description"
|
||||
echo " Command: $cmd"
|
||||
|
||||
if result=$(redis-cli -p 6379 $cmd 2>&1); then
|
||||
echo " ✅ Result: $result"
|
||||
echo
|
||||
return 0
|
||||
else
|
||||
echo " ❌ Failed: $result"
|
||||
echo
|
||||
return 1
|
||||
fi
|
||||
}
|
||||
|
||||
echo "🧪 Running Tantivy Search Tests..."
|
||||
echo
|
||||
|
||||
# Test 1: Create a search index
|
||||
execute_cmd "ft.create books SCHEMA title TEXT description TEXT author TEXT category TAG price NUMERIC" \
|
||||
"Creating search index 'books'"
|
||||
|
||||
# Test 2: Add documents to the index
|
||||
execute_cmd "ft.add books book1 1.0 title \"The Great Gatsby\" description \"A classic American novel about the Jazz Age\" author \"F. Scott Fitzgerald\" category \"fiction,classic\" price \"12.99\"" \
|
||||
"Adding first book"
|
||||
|
||||
execute_cmd "ft.add books book2 1.0 title \"To Kill a Mockingbird\" description \"A novel about racial injustice in the American South\" author \"Harper Lee\" category \"fiction,classic\" price \"14.99\"" \
|
||||
"Adding second book"
|
||||
|
||||
execute_cmd "ft.add books book3 1.0 title \"Programming Rust\" description \"A comprehensive guide to Rust programming language\" author \"Jim Blandy\" category \"programming,technical\" price \"49.99\"" \
|
||||
"Adding third book"
|
||||
|
||||
execute_cmd "ft.add books book4 1.0 title \"The Rust Programming Language\" description \"The official book on Rust programming\" author \"Steve Klabnik\" category \"programming,technical\" price \"39.99\"" \
|
||||
"Adding fourth book"
|
||||
|
||||
# Test 3: Basic search
|
||||
execute_cmd "ft.search books Rust" \
|
||||
"Searching for 'Rust'"
|
||||
|
||||
# Test 4: Search with filters
|
||||
execute_cmd "ft.search books programming FILTER category programming" \
|
||||
"Searching for 'programming' with category filter"
|
||||
|
||||
# Test 5: Search with limit
|
||||
execute_cmd "ft.search books \"*\" LIMIT 0 2" \
|
||||
"Getting first 2 documents"
|
||||
|
||||
# Test 6: Get index info
|
||||
execute_cmd "ft.info books" \
|
||||
"Getting index information"
|
||||
|
||||
# Test 7: Delete a document
|
||||
execute_cmd "ft.del books book1" \
|
||||
"Deleting book1"
|
||||
|
||||
# Test 8: Search again to verify deletion
|
||||
execute_cmd "ft.search books Gatsby" \
|
||||
"Searching for deleted book"
|
||||
|
||||
# Test 9: Drop the index
|
||||
execute_cmd "ft.drop books" \
|
||||
"Dropping the index"
|
||||
|
||||
echo "🎉 All tests completed successfully!"
|
||||
echo "✅ Tantivy search integration is working correctly"
|
283
src/cmd.rs
283
src/cmd.rs
@@ -1,12 +1,13 @@
|
||||
use crate::{error::DBError, protocol::Protocol, server::Server};
|
||||
use crate::{error::DBError, protocol::Protocol, server::Server, search_cmd};
|
||||
use tokio::time::{timeout, Duration};
|
||||
use futures::future::select_all;
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum Cmd {
|
||||
Ping,
|
||||
Echo(String),
|
||||
Select(u64, Option<String>), // db_index, optional_key
|
||||
Select(u64), // Changed from u16 to u64
|
||||
Get(String),
|
||||
Set(String, String),
|
||||
SetPx(String, String, u128),
|
||||
@@ -84,6 +85,41 @@ pub enum Cmd {
|
||||
AgeSignName(String, String), // name, message
|
||||
AgeVerifyName(String, String, String), // name, message, signature_b64
|
||||
AgeList,
|
||||
|
||||
// Full-text search commands with schema support
|
||||
FtCreate {
|
||||
index_name: String,
|
||||
schema: Vec<(String, String, Vec<String>)>, // (field_name, field_type, options)
|
||||
},
|
||||
FtAdd {
|
||||
index_name: String,
|
||||
doc_id: String,
|
||||
score: f64,
|
||||
fields: std::collections::HashMap<String, String>,
|
||||
},
|
||||
FtSearch {
|
||||
index_name: String,
|
||||
query: String,
|
||||
filters: Vec<(String, String)>, // field, value pairs
|
||||
limit: Option<usize>,
|
||||
offset: Option<usize>,
|
||||
return_fields: Option<Vec<String>>,
|
||||
},
|
||||
FtDel(String, String), // index_name, doc_id
|
||||
FtInfo(String), // index_name
|
||||
FtDrop(String), // index_name
|
||||
FtAlter {
|
||||
index_name: String,
|
||||
field_name: String,
|
||||
field_type: String,
|
||||
options: Vec<String>,
|
||||
},
|
||||
FtAggregate {
|
||||
index_name: String,
|
||||
query: String,
|
||||
group_by: Vec<String>,
|
||||
reducers: Vec<String>,
|
||||
},
|
||||
}
|
||||
|
||||
impl Cmd {
|
||||
@@ -98,18 +134,11 @@ impl Cmd {
|
||||
Ok((
|
||||
match cmd[0].to_lowercase().as_str() {
|
||||
"select" => {
|
||||
if cmd.len() < 2 || cmd.len() > 4 {
|
||||
if cmd.len() != 2 {
|
||||
return Err(DBError("wrong number of arguments for SELECT".to_string()));
|
||||
}
|
||||
let idx = cmd[1].parse::<u64>().map_err(|_| DBError("ERR DB index is not an integer".to_string()))?;
|
||||
let key = if cmd.len() == 4 && cmd[2].to_lowercase() == "key" {
|
||||
Some(cmd[3].clone())
|
||||
} else if cmd.len() == 2 {
|
||||
None
|
||||
} else {
|
||||
return Err(DBError("ERR syntax error".to_string()));
|
||||
};
|
||||
Cmd::Select(idx, key)
|
||||
Cmd::Select(idx)
|
||||
}
|
||||
"echo" => Cmd::Echo(cmd[1].clone()),
|
||||
"ping" => Cmd::Ping,
|
||||
@@ -623,6 +652,148 @@ impl Cmd {
|
||||
_ => return Err(DBError(format!("unsupported AGE subcommand {:?}", cmd))),
|
||||
}
|
||||
}
|
||||
"ft.create" => {
|
||||
if cmd.len() < 4 || cmd[2].to_uppercase() != "SCHEMA" {
|
||||
return Err(DBError("ERR FT.CREATE requires: indexname SCHEMA field1 type1 [options] ...".to_string()));
|
||||
}
|
||||
|
||||
let index_name = cmd[1].clone();
|
||||
let mut schema = Vec::new();
|
||||
let mut i = 3;
|
||||
|
||||
while i < cmd.len() {
|
||||
if i + 1 >= cmd.len() {
|
||||
return Err(DBError("ERR incomplete field definition".to_string()));
|
||||
}
|
||||
|
||||
let field_name = cmd[i].clone();
|
||||
let field_type = cmd[i + 1].to_uppercase();
|
||||
let mut options = Vec::new();
|
||||
i += 2;
|
||||
|
||||
// Parse field options until we hit another field name or end
|
||||
while i < cmd.len() && !["TEXT", "NUMERIC", "TAG", "GEO"].contains(&cmd[i].to_uppercase().as_str()) {
|
||||
options.push(cmd[i].to_uppercase());
|
||||
i += 1;
|
||||
|
||||
// If this option takes a value, consume it too
|
||||
if i > 0 && ["SEPARATOR", "WEIGHT"].contains(&cmd[i-1].to_uppercase().as_str()) && i < cmd.len() {
|
||||
options.push(cmd[i].clone());
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
|
||||
schema.push((field_name, field_type, options));
|
||||
}
|
||||
|
||||
Cmd::FtCreate {
|
||||
index_name,
|
||||
schema,
|
||||
}
|
||||
}
|
||||
"ft.add" => {
|
||||
if cmd.len() < 5 {
|
||||
return Err(DBError("ERR FT.ADD requires: index_name doc_id score field value ...".to_string()));
|
||||
}
|
||||
|
||||
let index_name = cmd[1].clone();
|
||||
let doc_id = cmd[2].clone();
|
||||
let score = cmd[3].parse::<f64>()
|
||||
.map_err(|_| DBError("ERR score must be a number".to_string()))?;
|
||||
|
||||
let mut fields = HashMap::new();
|
||||
let mut i = 4;
|
||||
|
||||
while i + 1 < cmd.len() {
|
||||
fields.insert(cmd[i].clone(), cmd[i + 1].clone());
|
||||
i += 2;
|
||||
}
|
||||
|
||||
Cmd::FtAdd {
|
||||
index_name,
|
||||
doc_id,
|
||||
score,
|
||||
fields,
|
||||
}
|
||||
}
|
||||
"ft.search" => {
|
||||
if cmd.len() < 3 {
|
||||
return Err(DBError("ERR FT.SEARCH requires: index_name query [options]".to_string()));
|
||||
}
|
||||
|
||||
let index_name = cmd[1].clone();
|
||||
let query = cmd[2].clone();
|
||||
|
||||
let mut filters = Vec::new();
|
||||
let mut limit = None;
|
||||
let mut offset = None;
|
||||
let mut return_fields = None;
|
||||
|
||||
let mut i = 3;
|
||||
while i < cmd.len() {
|
||||
match cmd[i].to_uppercase().as_str() {
|
||||
"FILTER" => {
|
||||
if i + 3 >= cmd.len() {
|
||||
return Err(DBError("ERR FILTER requires field and value".to_string()));
|
||||
}
|
||||
filters.push((cmd[i + 1].clone(), cmd[i + 2].clone()));
|
||||
i += 3;
|
||||
}
|
||||
"LIMIT" => {
|
||||
if i + 2 >= cmd.len() {
|
||||
return Err(DBError("ERR LIMIT requires offset and num".to_string()));
|
||||
}
|
||||
offset = Some(cmd[i + 1].parse().unwrap_or(0));
|
||||
limit = Some(cmd[i + 2].parse().unwrap_or(10));
|
||||
i += 3;
|
||||
}
|
||||
"RETURN" => {
|
||||
if i + 1 >= cmd.len() {
|
||||
return Err(DBError("ERR RETURN requires field count".to_string()));
|
||||
}
|
||||
let count: usize = cmd[i + 1].parse().unwrap_or(0);
|
||||
i += 2;
|
||||
|
||||
let mut fields = Vec::new();
|
||||
for _ in 0..count {
|
||||
if i < cmd.len() {
|
||||
fields.push(cmd[i].clone());
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
return_fields = Some(fields);
|
||||
}
|
||||
_ => i += 1,
|
||||
}
|
||||
}
|
||||
|
||||
Cmd::FtSearch {
|
||||
index_name,
|
||||
query,
|
||||
filters,
|
||||
limit,
|
||||
offset,
|
||||
return_fields,
|
||||
}
|
||||
}
|
||||
"ft.del" => {
|
||||
if cmd.len() != 3 {
|
||||
return Err(DBError("ERR FT.DEL requires: index_name doc_id".to_string()));
|
||||
}
|
||||
Cmd::FtDel(cmd[1].clone(), cmd[2].clone())
|
||||
}
|
||||
"ft.info" => {
|
||||
if cmd.len() != 2 {
|
||||
return Err(DBError("ERR FT.INFO requires: index_name".to_string()));
|
||||
}
|
||||
Cmd::FtInfo(cmd[1].clone())
|
||||
}
|
||||
"ft.drop" => {
|
||||
if cmd.len() != 2 {
|
||||
return Err(DBError("ERR FT.DROP requires: index_name".to_string()));
|
||||
}
|
||||
Cmd::FtDrop(cmd[1].clone())
|
||||
}
|
||||
_ => Cmd::Unknow(cmd[0].clone()),
|
||||
},
|
||||
protocol,
|
||||
@@ -649,7 +820,7 @@ impl Cmd {
|
||||
}
|
||||
|
||||
match self {
|
||||
Cmd::Select(db, key) => select_cmd(server, db, key).await,
|
||||
Cmd::Select(db) => select_cmd(server, db).await,
|
||||
Cmd::Ping => Ok(Protocol::SimpleString("PONG".to_string())),
|
||||
Cmd::Echo(s) => Ok(Protocol::BulkString(s)),
|
||||
Cmd::Get(k) => get_cmd(server, &k).await,
|
||||
@@ -737,20 +908,41 @@ impl Cmd {
|
||||
Cmd::AgeSignName(name, message) => Ok(crate::age::cmd_age_sign_name(server, &name, &message).await),
|
||||
Cmd::AgeVerifyName(name, message, sig_b64) => Ok(crate::age::cmd_age_verify_name(server, &name, &message, &sig_b64).await),
|
||||
Cmd::AgeList => Ok(crate::age::cmd_age_list(server).await),
|
||||
|
||||
// Full-text search commands
|
||||
Cmd::FtCreate { index_name, schema } => {
|
||||
search_cmd::ft_create_cmd(server, index_name, schema).await
|
||||
}
|
||||
Cmd::FtAdd { index_name, doc_id, score, fields } => {
|
||||
search_cmd::ft_add_cmd(server, index_name, doc_id, score, fields).await
|
||||
}
|
||||
Cmd::FtSearch { index_name, query, filters, limit, offset, return_fields } => {
|
||||
search_cmd::ft_search_cmd(server, index_name, query, filters, limit, offset, return_fields).await
|
||||
}
|
||||
Cmd::FtDel(index_name, doc_id) => {
|
||||
search_cmd::ft_del_cmd(server, index_name, doc_id).await
|
||||
}
|
||||
Cmd::FtInfo(index_name) => {
|
||||
search_cmd::ft_info_cmd(server, index_name).await
|
||||
}
|
||||
Cmd::FtDrop(index_name) => {
|
||||
search_cmd::ft_drop_cmd(server, index_name).await
|
||||
}
|
||||
Cmd::FtAlter { .. } => {
|
||||
// Not implemented yet
|
||||
Ok(Protocol::err("FT.ALTER not implemented yet"))
|
||||
}
|
||||
Cmd::FtAggregate { .. } => {
|
||||
// Not implemented yet
|
||||
Ok(Protocol::err("FT.AGGREGATE not implemented yet"))
|
||||
}
|
||||
Cmd::Unknow(s) => Ok(Protocol::err(&format!("ERR unknown command `{}`", s))),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_protocol(self) -> Protocol {
|
||||
match self {
|
||||
Cmd::Select(db, key) => {
|
||||
let mut arr = vec![Protocol::BulkString("select".to_string()), Protocol::BulkString(db.to_string())];
|
||||
if let Some(k) = key {
|
||||
arr.push(Protocol::BulkString("key".to_string()));
|
||||
arr.push(Protocol::BulkString(k));
|
||||
}
|
||||
Protocol::Array(arr)
|
||||
}
|
||||
Cmd::Select(db) => Protocol::Array(vec![Protocol::BulkString("select".to_string()), Protocol::BulkString(db.to_string())]),
|
||||
Cmd::Ping => Protocol::Array(vec![Protocol::BulkString("ping".to_string())]),
|
||||
Cmd::Echo(s) => Protocol::Array(vec![Protocol::BulkString("echo".to_string()), Protocol::BulkString(s)]),
|
||||
Cmd::Get(k) => Protocol::Array(vec![Protocol::BulkString("get".to_string()), Protocol::BulkString(k)]),
|
||||
@@ -767,44 +959,9 @@ async fn flushdb_cmd(server: &mut Server) -> Result<Protocol, DBError> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn select_cmd(server: &mut Server, db: u64, key: Option<String>) -> Result<Protocol, DBError> {
|
||||
// Load database metadata
|
||||
let meta = match crate::rpc::RpcServerImpl::load_meta_static(&server.option.dir, db).await {
|
||||
Ok(m) => m,
|
||||
Err(_) => {
|
||||
// If meta doesn't exist, create default
|
||||
let default_meta = crate::rpc::DatabaseMeta {
|
||||
public: true,
|
||||
keys: std::collections::HashMap::new(),
|
||||
};
|
||||
if let Err(_) = crate::rpc::RpcServerImpl::save_meta_static(&server.option.dir, db, &default_meta).await {
|
||||
return Ok(Protocol::err("ERR failed to initialize database metadata"));
|
||||
}
|
||||
default_meta
|
||||
}
|
||||
};
|
||||
|
||||
// Check access permissions
|
||||
let permissions = if meta.public {
|
||||
// Public database - full access
|
||||
Some(crate::rpc::Permissions::ReadWrite)
|
||||
} else if let Some(key_str) = key {
|
||||
// Private database - check key
|
||||
let hash = crate::rpc::hash_key(&key_str);
|
||||
if let Some(access_key) = meta.keys.get(&hash) {
|
||||
Some(access_key.permissions.clone())
|
||||
} else {
|
||||
return Ok(Protocol::err("ERR invalid access key"));
|
||||
}
|
||||
} else {
|
||||
return Ok(Protocol::err("ERR access key required for private database"));
|
||||
};
|
||||
|
||||
// Set selected database and permissions
|
||||
server.selected_db = db;
|
||||
server.current_permissions = permissions;
|
||||
|
||||
async fn select_cmd(server: &mut Server, db: u64) -> Result<Protocol, DBError> {
|
||||
// Test if we can access the database (this will create it if needed)
|
||||
server.selected_db = db;
|
||||
match server.current_storage() {
|
||||
Ok(_) => Ok(Protocol::SimpleString("OK".to_string())),
|
||||
Err(e) => Ok(Protocol::err(&e.0)),
|
||||
@@ -1052,9 +1209,6 @@ async fn brpop_cmd(server: &Server, keys: &[String], timeout_secs: f64) -> Resul
|
||||
}
|
||||
|
||||
async fn lpush_cmd(server: &Server, key: &str, elements: &[String]) -> Result<Protocol, DBError> {
|
||||
if !server.has_write_permission() {
|
||||
return Ok(Protocol::err("ERR write permission denied"));
|
||||
}
|
||||
match server.current_storage()?.lpush(key, elements.to_vec()) {
|
||||
Ok(len) => {
|
||||
// Attempt to deliver to any blocked BLPOP waiters
|
||||
@@ -1186,9 +1340,6 @@ async fn type_cmd(server: &Server, k: &String) -> Result<Protocol, DBError> {
|
||||
}
|
||||
|
||||
async fn del_cmd(server: &Server, k: &str) -> Result<Protocol, DBError> {
|
||||
if !server.has_write_permission() {
|
||||
return Ok(Protocol::err("ERR write permission denied"));
|
||||
}
|
||||
server.current_storage()?.del(k.to_string())?;
|
||||
Ok(Protocol::SimpleString("1".to_string()))
|
||||
}
|
||||
@@ -1214,9 +1365,6 @@ async fn set_px_cmd(
|
||||
}
|
||||
|
||||
async fn set_cmd(server: &Server, k: &str, v: &str) -> Result<Protocol, DBError> {
|
||||
if !server.has_write_permission() {
|
||||
return Ok(Protocol::err("ERR write permission denied"));
|
||||
}
|
||||
server.current_storage()?.set(k.to_string(), v.to_string())?;
|
||||
Ok(Protocol::SimpleString("OK".to_string()))
|
||||
}
|
||||
@@ -1331,9 +1479,6 @@ async fn get_cmd(server: &Server, k: &str) -> Result<Protocol, DBError> {
|
||||
|
||||
// Hash command implementations
|
||||
async fn hset_cmd(server: &Server, key: &str, pairs: &[(String, String)]) -> Result<Protocol, DBError> {
|
||||
if !server.has_write_permission() {
|
||||
return Ok(Protocol::err("ERR write permission denied"));
|
||||
}
|
||||
let new_fields = server.current_storage()?.hset(key, pairs.to_vec())?;
|
||||
Ok(Protocol::SimpleString(new_fields.to_string()))
|
||||
}
|
||||
|
@@ -4,9 +4,9 @@ pub mod crypto;
|
||||
pub mod error;
|
||||
pub mod options;
|
||||
pub mod protocol;
|
||||
pub mod rpc;
|
||||
pub mod rpc_server;
|
||||
pub mod search_cmd; // Add this
|
||||
pub mod server;
|
||||
pub mod storage;
|
||||
pub mod storage_trait; // Add this
|
||||
pub mod storage_sled; // Add this
|
||||
pub mod tantivy_search;
|
||||
|
37
src/main.rs
37
src/main.rs
@@ -3,7 +3,6 @@
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
use herodb::server;
|
||||
use herodb::rpc_server;
|
||||
|
||||
use clap::Parser;
|
||||
|
||||
@@ -32,14 +31,6 @@ struct Args {
|
||||
#[arg(long)]
|
||||
encrypt: bool,
|
||||
|
||||
/// Enable RPC management server
|
||||
#[arg(long)]
|
||||
enable_rpc: bool,
|
||||
|
||||
/// RPC server port (default: 8080)
|
||||
#[arg(long, default_value = "8080")]
|
||||
rpc_port: u16,
|
||||
|
||||
/// Use the sled backend
|
||||
#[arg(long)]
|
||||
sled: bool,
|
||||
@@ -59,7 +50,7 @@ async fn main() {
|
||||
|
||||
// new DB option
|
||||
let option = herodb::options::DBOption {
|
||||
dir: args.dir.clone(),
|
||||
dir: args.dir,
|
||||
port,
|
||||
debug: args.debug,
|
||||
encryption_key: args.encryption_key,
|
||||
@@ -71,36 +62,12 @@ async fn main() {
|
||||
},
|
||||
};
|
||||
|
||||
let backend = option.backend.clone();
|
||||
|
||||
// new server
|
||||
let mut server = server::Server::new(option).await;
|
||||
|
||||
// Initialize the default database storage
|
||||
let _ = server.current_storage();
|
||||
let server = server::Server::new(option).await;
|
||||
|
||||
// Add a small delay to ensure the port is ready
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
|
||||
// Start RPC server if enabled
|
||||
let rpc_handle = if args.enable_rpc {
|
||||
let rpc_addr = format!("127.0.0.1:{}", args.rpc_port).parse().unwrap();
|
||||
let base_dir = args.dir.clone();
|
||||
|
||||
match rpc_server::start_rpc_server(rpc_addr, base_dir, backend).await {
|
||||
Ok(handle) => {
|
||||
println!("RPC management server started on port {}", args.rpc_port);
|
||||
Some(handle)
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Failed to start RPC server: {}", e);
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// accept new connections
|
||||
loop {
|
||||
let stream = listener.accept().await;
|
||||
|
634
src/rpc.rs
634
src/rpc.rs
@@ -1,634 +0,0 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sha2::{Digest, Sha256};
|
||||
|
||||
use crate::server::Server;
|
||||
use crate::options::DBOption;
|
||||
|
||||
/// Database backend types
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum BackendType {
|
||||
Redb,
|
||||
Sled,
|
||||
// Future: InMemory, Custom(String)
|
||||
}
|
||||
|
||||
/// Database configuration
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct DatabaseConfig {
|
||||
pub name: Option<String>,
|
||||
pub storage_path: Option<String>,
|
||||
pub max_size: Option<u64>,
|
||||
pub redis_version: Option<String>,
|
||||
}
|
||||
|
||||
/// Database information returned by metadata queries
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct DatabaseInfo {
|
||||
pub id: u64,
|
||||
pub name: Option<String>,
|
||||
pub backend: BackendType,
|
||||
pub encrypted: bool,
|
||||
pub redis_version: Option<String>,
|
||||
pub storage_path: Option<String>,
|
||||
pub size_on_disk: Option<u64>,
|
||||
pub key_count: Option<u64>,
|
||||
pub created_at: u64,
|
||||
pub last_access: Option<u64>,
|
||||
}
|
||||
|
||||
/// Access permissions for database keys
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub enum Permissions {
|
||||
Read,
|
||||
ReadWrite,
|
||||
}
|
||||
|
||||
/// Access key information
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AccessKey {
|
||||
pub hash: String,
|
||||
pub permissions: Permissions,
|
||||
pub created_at: u64,
|
||||
}
|
||||
|
||||
/// Database metadata containing access keys
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct DatabaseMeta {
|
||||
pub public: bool,
|
||||
pub keys: HashMap<String, AccessKey>,
|
||||
}
|
||||
|
||||
/// Access key information returned by RPC
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AccessKeyInfo {
|
||||
pub hash: String,
|
||||
pub permissions: Permissions,
|
||||
pub created_at: u64,
|
||||
}
|
||||
|
||||
/// Hash a plaintext key using SHA-256
|
||||
pub fn hash_key(key: &str) -> String {
|
||||
let mut hasher = Sha256::new();
|
||||
hasher.update(key.as_bytes());
|
||||
format!("{:x}", hasher.finalize())
|
||||
}
|
||||
|
||||
/// RPC trait for HeroDB management
|
||||
#[rpc(server, client, namespace = "herodb")]
|
||||
pub trait Rpc {
|
||||
/// Create a new database with specified configuration
|
||||
#[method(name = "createDatabase")]
|
||||
async fn create_database(
|
||||
&self,
|
||||
backend: BackendType,
|
||||
config: DatabaseConfig,
|
||||
encryption_key: Option<String>,
|
||||
) -> RpcResult<u64>;
|
||||
|
||||
/// Set encryption for an existing database (write-only key)
|
||||
#[method(name = "setEncryption")]
|
||||
async fn set_encryption(&self, db_id: u64, encryption_key: String) -> RpcResult<bool>;
|
||||
|
||||
/// List all managed databases
|
||||
#[method(name = "listDatabases")]
|
||||
async fn list_databases(&self) -> RpcResult<Vec<DatabaseInfo>>;
|
||||
|
||||
/// Get detailed information about a specific database
|
||||
#[method(name = "getDatabaseInfo")]
|
||||
async fn get_database_info(&self, db_id: u64) -> RpcResult<DatabaseInfo>;
|
||||
|
||||
/// Delete a database
|
||||
#[method(name = "deleteDatabase")]
|
||||
async fn delete_database(&self, db_id: u64) -> RpcResult<bool>;
|
||||
|
||||
/// Get server statistics
|
||||
#[method(name = "getServerStats")]
|
||||
async fn get_server_stats(&self) -> RpcResult<HashMap<String, serde_json::Value>>;
|
||||
|
||||
/// Add an access key to a database
|
||||
#[method(name = "addAccessKey")]
|
||||
async fn add_access_key(&self, db_id: u64, key: String, permissions: String) -> RpcResult<bool>;
|
||||
|
||||
/// Delete an access key from a database
|
||||
#[method(name = "deleteAccessKey")]
|
||||
async fn delete_access_key(&self, db_id: u64, key_hash: String) -> RpcResult<bool>;
|
||||
|
||||
/// List all access keys for a database
|
||||
#[method(name = "listAccessKeys")]
|
||||
async fn list_access_keys(&self, db_id: u64) -> RpcResult<Vec<AccessKeyInfo>>;
|
||||
|
||||
/// Set database public/private status
|
||||
#[method(name = "setDatabasePublic")]
|
||||
async fn set_database_public(&self, db_id: u64, public: bool) -> RpcResult<bool>;
|
||||
}
|
||||
|
||||
/// RPC Server implementation
|
||||
pub struct RpcServerImpl {
|
||||
/// Base directory for database files
|
||||
base_dir: String,
|
||||
/// Managed database servers
|
||||
servers: Arc<RwLock<HashMap<u64, Arc<Server>>>>,
|
||||
/// Next unencrypted database ID to assign
|
||||
next_unencrypted_id: Arc<RwLock<u64>>,
|
||||
/// Next encrypted database ID to assign
|
||||
next_encrypted_id: Arc<RwLock<u64>>,
|
||||
/// Default backend type
|
||||
backend: crate::options::BackendType,
|
||||
/// Encryption keys for databases
|
||||
encryption_keys: Arc<RwLock<HashMap<u64, Option<String>>>>,
|
||||
}
|
||||
|
||||
impl RpcServerImpl {
|
||||
/// Create a new RPC server instance
|
||||
pub fn new(base_dir: String, backend: crate::options::BackendType) -> Self {
|
||||
Self {
|
||||
base_dir,
|
||||
servers: Arc::new(RwLock::new(HashMap::new())),
|
||||
next_unencrypted_id: Arc::new(RwLock::new(0)),
|
||||
next_encrypted_id: Arc::new(RwLock::new(10)),
|
||||
backend,
|
||||
encryption_keys: Arc::new(RwLock::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get or create a server instance for the given database ID
|
||||
async fn get_or_create_server(&self, db_id: u64) -> Result<Arc<Server>, jsonrpsee::types::ErrorObjectOwned> {
|
||||
// Check if server already exists
|
||||
{
|
||||
let servers = self.servers.read().await;
|
||||
if let Some(server) = servers.get(&db_id) {
|
||||
return Ok(server.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// Check if database file exists
|
||||
let db_path = std::path::PathBuf::from(&self.base_dir).join(format!("{}.db", db_id));
|
||||
if !db_path.exists() {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
format!("Database {} not found", db_id),
|
||||
None::<()>
|
||||
));
|
||||
}
|
||||
|
||||
// Create server instance with default options
|
||||
let db_option = DBOption {
|
||||
dir: self.base_dir.clone(),
|
||||
port: 0, // Not used for RPC-managed databases
|
||||
debug: false,
|
||||
encryption_key: None,
|
||||
encrypt: false,
|
||||
backend: self.backend.clone(),
|
||||
};
|
||||
|
||||
let mut server = Server::new(db_option).await;
|
||||
|
||||
// Set the selected database to the db_id for proper file naming
|
||||
server.selected_db = db_id;
|
||||
|
||||
// Store the server
|
||||
let mut servers = self.servers.write().await;
|
||||
servers.insert(db_id, Arc::new(server.clone()));
|
||||
|
||||
Ok(Arc::new(server))
|
||||
}
|
||||
|
||||
/// Discover existing database files in the base directory
|
||||
async fn discover_databases(&self) -> Vec<u64> {
|
||||
let mut db_ids = Vec::new();
|
||||
|
||||
if let Ok(entries) = std::fs::read_dir(&self.base_dir) {
|
||||
for entry in entries.flatten() {
|
||||
if let Ok(file_name) = entry.file_name().into_string() {
|
||||
// Check if it's a database file (ends with .db)
|
||||
if file_name.ends_with(".db") {
|
||||
// Extract database ID from filename (e.g., "11.db" -> 11)
|
||||
if let Some(id_str) = file_name.strip_suffix(".db") {
|
||||
if let Ok(db_id) = id_str.parse::<u64>() {
|
||||
db_ids.push(db_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
db_ids
|
||||
}
|
||||
|
||||
/// Get the next available database ID
|
||||
async fn get_next_db_id(&self, is_encrypted: bool) -> u64 {
|
||||
if is_encrypted {
|
||||
let mut id = self.next_encrypted_id.write().await;
|
||||
let current_id = *id;
|
||||
*id += 1;
|
||||
current_id
|
||||
} else {
|
||||
let mut id = self.next_unencrypted_id.write().await;
|
||||
let current_id = *id;
|
||||
*id += 1;
|
||||
current_id
|
||||
}
|
||||
}
|
||||
|
||||
/// Load database metadata from file (static version)
|
||||
pub async fn load_meta_static(base_dir: &str, db_id: u64) -> Result<DatabaseMeta, jsonrpsee::types::ErrorObjectOwned> {
|
||||
let meta_path = std::path::PathBuf::from(base_dir).join(format!("{}_meta.json", db_id));
|
||||
|
||||
// If meta file doesn't exist, return default
|
||||
if !meta_path.exists() {
|
||||
return Ok(DatabaseMeta {
|
||||
public: true,
|
||||
keys: HashMap::new(),
|
||||
});
|
||||
}
|
||||
|
||||
// Read file
|
||||
let content = std::fs::read(&meta_path)
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
format!("Failed to read meta file: {}", e),
|
||||
None::<()>
|
||||
))?;
|
||||
|
||||
let json_str = String::from_utf8(content)
|
||||
.map_err(|_| jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
"Invalid UTF-8 in meta file",
|
||||
None::<()>
|
||||
))?;
|
||||
|
||||
serde_json::from_str(&json_str)
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
format!("Failed to parse meta JSON: {}", e),
|
||||
None::<()>
|
||||
))
|
||||
}
|
||||
|
||||
/// Load database metadata from file
|
||||
async fn load_meta(&self, db_id: u64) -> Result<DatabaseMeta, jsonrpsee::types::ErrorObjectOwned> {
|
||||
let meta_path = std::path::PathBuf::from(&self.base_dir).join(format!("{}_meta.json", db_id));
|
||||
|
||||
// If meta file doesn't exist, create default
|
||||
if !meta_path.exists() {
|
||||
let default_meta = DatabaseMeta {
|
||||
public: true,
|
||||
keys: HashMap::new(),
|
||||
};
|
||||
self.save_meta(db_id, &default_meta).await?;
|
||||
return Ok(default_meta);
|
||||
}
|
||||
|
||||
// Read and potentially decrypt
|
||||
let content = std::fs::read(&meta_path)
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
format!("Failed to read meta file: {}", e),
|
||||
None::<()>
|
||||
))?;
|
||||
|
||||
let json_str = if db_id >= 10 {
|
||||
// Encrypted database, decrypt meta
|
||||
if let Some(key) = self.encryption_keys.read().await.get(&db_id).and_then(|k| k.as_ref()) {
|
||||
use crate::crypto::CryptoFactory;
|
||||
let crypto = CryptoFactory::new(key.as_bytes());
|
||||
String::from_utf8(crypto.decrypt(&content)
|
||||
.map_err(|_| jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
"Failed to decrypt meta file",
|
||||
None::<()>
|
||||
))?)
|
||||
.map_err(|_| jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
"Invalid UTF-8 in decrypted meta",
|
||||
None::<()>
|
||||
))?
|
||||
} else {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
"Encryption key not found for encrypted database",
|
||||
None::<()>
|
||||
));
|
||||
}
|
||||
} else {
|
||||
String::from_utf8(content)
|
||||
.map_err(|_| jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
"Invalid UTF-8 in meta file",
|
||||
None::<()>
|
||||
))?
|
||||
};
|
||||
|
||||
serde_json::from_str(&json_str)
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
format!("Failed to parse meta JSON: {}", e),
|
||||
None::<()>
|
||||
))
|
||||
}
|
||||
|
||||
/// Save database metadata to file (static version)
|
||||
pub async fn save_meta_static(base_dir: &str, db_id: u64, meta: &DatabaseMeta) -> Result<(), jsonrpsee::types::ErrorObjectOwned> {
|
||||
let meta_path = std::path::PathBuf::from(base_dir).join(format!("{}_meta.json", db_id));
|
||||
|
||||
let json_str = serde_json::to_string(meta)
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
format!("Failed to serialize meta: {}", e),
|
||||
None::<()>
|
||||
))?;
|
||||
|
||||
std::fs::write(&meta_path, json_str)
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
format!("Failed to write meta file: {}", e),
|
||||
None::<()>
|
||||
))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Save database metadata to file
|
||||
async fn save_meta(&self, db_id: u64, meta: &DatabaseMeta) -> Result<(), jsonrpsee::types::ErrorObjectOwned> {
|
||||
let meta_path = std::path::PathBuf::from(&self.base_dir).join(format!("{}_meta.json", db_id));
|
||||
|
||||
let json_str = serde_json::to_string(meta)
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
format!("Failed to serialize meta: {}", e),
|
||||
None::<()>
|
||||
))?;
|
||||
|
||||
if db_id >= 10 {
|
||||
// Encrypted database, encrypt meta
|
||||
if let Some(key) = self.encryption_keys.read().await.get(&db_id).and_then(|k| k.as_ref()) {
|
||||
use crate::crypto::CryptoFactory;
|
||||
let crypto = CryptoFactory::new(key.as_bytes());
|
||||
let encrypted = crypto.encrypt(json_str.as_bytes());
|
||||
std::fs::write(&meta_path, encrypted)
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
format!("Failed to write encrypted meta file: {}", e),
|
||||
None::<()>
|
||||
))?;
|
||||
} else {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
"Encryption key not found for encrypted database",
|
||||
None::<()>
|
||||
));
|
||||
}
|
||||
} else {
|
||||
std::fs::write(&meta_path, json_str)
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
format!("Failed to write meta file: {}", e),
|
||||
None::<()>
|
||||
))?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[jsonrpsee::core::async_trait]
|
||||
impl RpcServer for RpcServerImpl {
|
||||
async fn create_database(
|
||||
&self,
|
||||
backend: BackendType,
|
||||
config: DatabaseConfig,
|
||||
encryption_key: Option<String>,
|
||||
) -> RpcResult<u64> {
|
||||
let db_id = self.get_next_db_id(encryption_key.is_some()).await;
|
||||
|
||||
// Handle both Redb and Sled backends
|
||||
match backend {
|
||||
BackendType::Redb | BackendType::Sled => {
|
||||
// Create database directory
|
||||
let db_dir = if let Some(path) = &config.storage_path {
|
||||
std::path::PathBuf::from(path)
|
||||
} else {
|
||||
std::path::PathBuf::from(&self.base_dir).join(format!("rpc_db_{}", db_id))
|
||||
};
|
||||
|
||||
// Ensure directory exists
|
||||
std::fs::create_dir_all(&db_dir)
|
||||
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
format!("Failed to create directory: {}", e),
|
||||
None::<()>
|
||||
))?;
|
||||
|
||||
// Create DB options
|
||||
let encrypt = encryption_key.is_some();
|
||||
let option = DBOption {
|
||||
dir: db_dir.to_string_lossy().to_string(),
|
||||
port: 0, // Not used for RPC-managed databases
|
||||
debug: false,
|
||||
encryption_key: encryption_key.clone(),
|
||||
encrypt,
|
||||
backend: match backend {
|
||||
BackendType::Redb => crate::options::BackendType::Redb,
|
||||
BackendType::Sled => crate::options::BackendType::Sled,
|
||||
},
|
||||
};
|
||||
|
||||
// Create server instance
|
||||
let mut server = Server::new(option).await;
|
||||
|
||||
// Set the selected database to the db_id for proper file naming
|
||||
server.selected_db = db_id;
|
||||
|
||||
// Initialize the storage to create the database file
|
||||
let _ = server.current_storage();
|
||||
|
||||
// Store the encryption key
|
||||
{
|
||||
let mut keys = self.encryption_keys.write().await;
|
||||
keys.insert(db_id, encryption_key.clone());
|
||||
}
|
||||
|
||||
// Initialize meta file
|
||||
let meta = DatabaseMeta {
|
||||
public: true,
|
||||
keys: HashMap::new(),
|
||||
};
|
||||
self.save_meta(db_id, &meta).await?;
|
||||
|
||||
// Store the server
|
||||
let mut servers = self.servers.write().await;
|
||||
servers.insert(db_id, Arc::new(server));
|
||||
|
||||
Ok(db_id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn set_encryption(&self, db_id: u64, _encryption_key: String) -> RpcResult<bool> {
|
||||
// Note: In a real implementation, we'd need to modify the existing database
|
||||
// For now, return false as encryption can only be set during creation
|
||||
let _servers = self.servers.read().await;
|
||||
// TODO: Implement encryption setting for existing databases
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
async fn list_databases(&self) -> RpcResult<Vec<DatabaseInfo>> {
|
||||
let db_ids = self.discover_databases().await;
|
||||
let mut result = Vec::new();
|
||||
|
||||
for db_id in db_ids {
|
||||
// Try to get or create server for this database
|
||||
if let Ok(server) = self.get_or_create_server(db_id).await {
|
||||
let backend = match server.option.backend {
|
||||
crate::options::BackendType::Redb => BackendType::Redb,
|
||||
crate::options::BackendType::Sled => BackendType::Sled,
|
||||
};
|
||||
|
||||
let info = DatabaseInfo {
|
||||
id: db_id,
|
||||
name: None, // TODO: Store name in server metadata
|
||||
backend,
|
||||
encrypted: server.option.encrypt,
|
||||
redis_version: Some("7.0".to_string()), // Default Redis compatibility
|
||||
storage_path: Some(server.option.dir.clone()),
|
||||
size_on_disk: None, // TODO: Calculate actual size
|
||||
key_count: None, // TODO: Get key count from storage
|
||||
created_at: std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs(),
|
||||
last_access: None,
|
||||
};
|
||||
result.push(info);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn get_database_info(&self, db_id: u64) -> RpcResult<DatabaseInfo> {
|
||||
let server = self.get_or_create_server(db_id).await?;
|
||||
|
||||
let backend = match server.option.backend {
|
||||
crate::options::BackendType::Redb => BackendType::Redb,
|
||||
crate::options::BackendType::Sled => BackendType::Sled,
|
||||
};
|
||||
|
||||
Ok(DatabaseInfo {
|
||||
id: db_id,
|
||||
name: None,
|
||||
backend,
|
||||
encrypted: server.option.encrypt,
|
||||
redis_version: Some("7.0".to_string()),
|
||||
storage_path: Some(server.option.dir.clone()),
|
||||
size_on_disk: None,
|
||||
key_count: None,
|
||||
created_at: std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs(),
|
||||
last_access: None,
|
||||
})
|
||||
}
|
||||
|
||||
async fn delete_database(&self, db_id: u64) -> RpcResult<bool> {
|
||||
let mut servers = self.servers.write().await;
|
||||
|
||||
if let Some(_server) = servers.remove(&db_id) {
|
||||
// Clean up database files
|
||||
let db_path = std::path::PathBuf::from(&self.base_dir).join(format!("{}.db", db_id));
|
||||
if db_path.exists() {
|
||||
if db_path.is_dir() {
|
||||
std::fs::remove_dir_all(&db_path).ok();
|
||||
} else {
|
||||
std::fs::remove_file(&db_path).ok();
|
||||
}
|
||||
}
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_server_stats(&self) -> RpcResult<HashMap<String, serde_json::Value>> {
|
||||
let db_ids = self.discover_databases().await;
|
||||
let mut stats = HashMap::new();
|
||||
|
||||
stats.insert("total_databases".to_string(), serde_json::json!(db_ids.len()));
|
||||
stats.insert("uptime".to_string(), serde_json::json!(
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs()
|
||||
));
|
||||
|
||||
Ok(stats)
|
||||
}
|
||||
|
||||
async fn add_access_key(&self, db_id: u64, key: String, permissions: String) -> RpcResult<bool> {
|
||||
let mut meta = self.load_meta(db_id).await?;
|
||||
|
||||
let perms = match permissions.to_lowercase().as_str() {
|
||||
"read" => Permissions::Read,
|
||||
"readwrite" => Permissions::ReadWrite,
|
||||
_ => return Err(jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
"Invalid permissions: use 'read' or 'readwrite'",
|
||||
None::<()>
|
||||
)),
|
||||
};
|
||||
|
||||
let hash = hash_key(&key);
|
||||
let access_key = AccessKey {
|
||||
hash: hash.clone(),
|
||||
permissions: perms,
|
||||
created_at: std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs(),
|
||||
};
|
||||
|
||||
meta.keys.insert(hash, access_key);
|
||||
self.save_meta(db_id, &meta).await?;
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn delete_access_key(&self, db_id: u64, key_hash: String) -> RpcResult<bool> {
|
||||
let mut meta = self.load_meta(db_id).await?;
|
||||
|
||||
if meta.keys.remove(&key_hash).is_some() {
|
||||
// If no keys left, make database public
|
||||
if meta.keys.is_empty() {
|
||||
meta.public = true;
|
||||
}
|
||||
self.save_meta(db_id, &meta).await?;
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
async fn list_access_keys(&self, db_id: u64) -> RpcResult<Vec<AccessKeyInfo>> {
|
||||
let meta = self.load_meta(db_id).await?;
|
||||
let keys: Vec<AccessKeyInfo> = meta.keys.values()
|
||||
.map(|k| AccessKeyInfo {
|
||||
hash: k.hash.clone(),
|
||||
permissions: k.permissions.clone(),
|
||||
created_at: k.created_at,
|
||||
})
|
||||
.collect();
|
||||
Ok(keys)
|
||||
}
|
||||
|
||||
async fn set_database_public(&self, db_id: u64, public: bool) -> RpcResult<bool> {
|
||||
let mut meta = self.load_meta(db_id).await?;
|
||||
meta.public = public;
|
||||
self.save_meta(db_id, &meta).await?;
|
||||
Ok(true)
|
||||
}
|
||||
}
|
@@ -1,49 +0,0 @@
|
||||
use std::net::SocketAddr;
|
||||
use jsonrpsee::server::{ServerBuilder, ServerHandle};
|
||||
use jsonrpsee::RpcModule;
|
||||
|
||||
use crate::rpc::{RpcServer, RpcServerImpl};
|
||||
|
||||
/// Start the RPC server on the specified address
|
||||
pub async fn start_rpc_server(addr: SocketAddr, base_dir: String, backend: crate::options::BackendType) -> Result<ServerHandle, Box<dyn std::error::Error + Send + Sync>> {
|
||||
// Create the RPC server implementation
|
||||
let rpc_impl = RpcServerImpl::new(base_dir, backend);
|
||||
|
||||
// Create the RPC module
|
||||
let mut module = RpcModule::new(());
|
||||
module.merge(RpcServer::into_rpc(rpc_impl))?;
|
||||
|
||||
// Build the server with both HTTP and WebSocket support
|
||||
let server = ServerBuilder::default()
|
||||
.build(addr)
|
||||
.await?;
|
||||
|
||||
// Start the server
|
||||
let handle = server.start(module);
|
||||
|
||||
println!("RPC server started on {}", addr);
|
||||
|
||||
Ok(handle)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::time::Duration;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_rpc_server_startup() {
|
||||
let addr = "127.0.0.1:0".parse().unwrap(); // Use port 0 for auto-assignment
|
||||
let base_dir = "/tmp/test_rpc".to_string();
|
||||
let backend = crate::options::BackendType::Redb; // Default for test
|
||||
|
||||
let handle = start_rpc_server(addr, base_dir, backend).await.unwrap();
|
||||
|
||||
// Give the server a moment to start
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
// Stop the server
|
||||
handle.stop().unwrap();
|
||||
handle.stopped().await;
|
||||
}
|
||||
}
|
272
src/search_cmd.rs
Normal file
272
src/search_cmd.rs
Normal file
@@ -0,0 +1,272 @@
|
||||
use crate::{
|
||||
error::DBError,
|
||||
protocol::Protocol,
|
||||
server::Server,
|
||||
tantivy_search::{
|
||||
TantivySearch, FieldDef, NumericType, IndexConfig,
|
||||
SearchOptions, Filter, FilterType
|
||||
},
|
||||
};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub async fn ft_create_cmd(
|
||||
server: &Server,
|
||||
index_name: String,
|
||||
schema: Vec<(String, String, Vec<String>)>,
|
||||
) -> Result<Protocol, DBError> {
|
||||
// Parse schema into field definitions
|
||||
let mut field_definitions = Vec::new();
|
||||
|
||||
for (field_name, field_type, options) in schema {
|
||||
let field_def = match field_type.to_uppercase().as_str() {
|
||||
"TEXT" => {
|
||||
let mut weight = 1.0;
|
||||
let mut sortable = false;
|
||||
let mut no_index = false;
|
||||
|
||||
for opt in &options {
|
||||
match opt.to_uppercase().as_str() {
|
||||
"WEIGHT" => {
|
||||
// Next option should be the weight value
|
||||
if let Some(idx) = options.iter().position(|x| x == opt) {
|
||||
if idx + 1 < options.len() {
|
||||
weight = options[idx + 1].parse().unwrap_or(1.0);
|
||||
}
|
||||
}
|
||||
}
|
||||
"SORTABLE" => sortable = true,
|
||||
"NOINDEX" => no_index = true,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
FieldDef::Text {
|
||||
stored: true,
|
||||
indexed: !no_index,
|
||||
tokenized: true,
|
||||
fast: sortable,
|
||||
}
|
||||
}
|
||||
"NUMERIC" => {
|
||||
let mut sortable = false;
|
||||
|
||||
for opt in &options {
|
||||
if opt.to_uppercase() == "SORTABLE" {
|
||||
sortable = true;
|
||||
}
|
||||
}
|
||||
|
||||
FieldDef::Numeric {
|
||||
stored: true,
|
||||
indexed: true,
|
||||
fast: sortable,
|
||||
precision: NumericType::F64,
|
||||
}
|
||||
}
|
||||
"TAG" => {
|
||||
let mut separator = ",".to_string();
|
||||
let mut case_sensitive = false;
|
||||
|
||||
for i in 0..options.len() {
|
||||
match options[i].to_uppercase().as_str() {
|
||||
"SEPARATOR" => {
|
||||
if i + 1 < options.len() {
|
||||
separator = options[i + 1].clone();
|
||||
}
|
||||
}
|
||||
"CASESENSITIVE" => case_sensitive = true,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
FieldDef::Tag {
|
||||
stored: true,
|
||||
separator,
|
||||
case_sensitive,
|
||||
}
|
||||
}
|
||||
"GEO" => {
|
||||
FieldDef::Geo { stored: true }
|
||||
}
|
||||
_ => {
|
||||
return Err(DBError(format!("Unknown field type: {}", field_type)));
|
||||
}
|
||||
};
|
||||
|
||||
field_definitions.push((field_name, field_def));
|
||||
}
|
||||
|
||||
// Create the search index
|
||||
let search_path = server.search_index_path();
|
||||
let config = IndexConfig::default();
|
||||
|
||||
println!("Creating search index '{}' at path: {:?}", index_name, search_path);
|
||||
println!("Field definitions: {:?}", field_definitions);
|
||||
|
||||
let search_index = TantivySearch::new_with_schema(
|
||||
search_path,
|
||||
index_name.clone(),
|
||||
field_definitions,
|
||||
Some(config),
|
||||
)?;
|
||||
|
||||
println!("Search index '{}' created successfully", index_name);
|
||||
|
||||
// Store in registry
|
||||
let mut indexes = server.search_indexes.write().unwrap();
|
||||
indexes.insert(index_name, Arc::new(search_index));
|
||||
|
||||
Ok(Protocol::SimpleString("OK".to_string()))
|
||||
}
|
||||
|
||||
pub async fn ft_add_cmd(
|
||||
server: &Server,
|
||||
index_name: String,
|
||||
doc_id: String,
|
||||
_score: f64,
|
||||
fields: HashMap<String, String>,
|
||||
) -> Result<Protocol, DBError> {
|
||||
let indexes = server.search_indexes.read().unwrap();
|
||||
|
||||
let search_index = indexes.get(&index_name)
|
||||
.ok_or_else(|| DBError(format!("Index '{}' not found", index_name)))?;
|
||||
|
||||
search_index.add_document_with_fields(&doc_id, fields)?;
|
||||
|
||||
Ok(Protocol::SimpleString("OK".to_string()))
|
||||
}
|
||||
|
||||
pub async fn ft_search_cmd(
|
||||
server: &Server,
|
||||
index_name: String,
|
||||
query: String,
|
||||
filters: Vec<(String, String)>,
|
||||
limit: Option<usize>,
|
||||
offset: Option<usize>,
|
||||
return_fields: Option<Vec<String>>,
|
||||
) -> Result<Protocol, DBError> {
|
||||
let indexes = server.search_indexes.read().unwrap();
|
||||
|
||||
let search_index = indexes.get(&index_name)
|
||||
.ok_or_else(|| DBError(format!("Index '{}' not found", index_name)))?;
|
||||
|
||||
// Convert filters to search filters
|
||||
let search_filters = filters.into_iter().map(|(field, value)| {
|
||||
Filter {
|
||||
field,
|
||||
filter_type: FilterType::Equals(value),
|
||||
}
|
||||
}).collect();
|
||||
|
||||
let options = SearchOptions {
|
||||
limit: limit.unwrap_or(10),
|
||||
offset: offset.unwrap_or(0),
|
||||
filters: search_filters,
|
||||
sort_by: None,
|
||||
return_fields,
|
||||
highlight: false,
|
||||
};
|
||||
|
||||
let results = search_index.search_with_options(&query, options)?;
|
||||
|
||||
// Format results as Redis protocol
|
||||
let mut response = Vec::new();
|
||||
|
||||
// First element is the total count
|
||||
response.push(Protocol::SimpleString(results.total.to_string()));
|
||||
|
||||
// Then each document
|
||||
for doc in results.documents {
|
||||
let mut doc_array = Vec::new();
|
||||
|
||||
// Add document ID if it exists
|
||||
if let Some(id) = doc.fields.get("_id") {
|
||||
doc_array.push(Protocol::BulkString(id.clone()));
|
||||
}
|
||||
|
||||
// Add score
|
||||
doc_array.push(Protocol::BulkString(doc.score.to_string()));
|
||||
|
||||
// Add fields as key-value pairs
|
||||
for (field_name, field_value) in doc.fields {
|
||||
if field_name != "_id" {
|
||||
doc_array.push(Protocol::BulkString(field_name));
|
||||
doc_array.push(Protocol::BulkString(field_value));
|
||||
}
|
||||
}
|
||||
|
||||
response.push(Protocol::Array(doc_array));
|
||||
}
|
||||
|
||||
Ok(Protocol::Array(response))
|
||||
}
|
||||
|
||||
pub async fn ft_del_cmd(
|
||||
server: &Server,
|
||||
index_name: String,
|
||||
doc_id: String,
|
||||
) -> Result<Protocol, DBError> {
|
||||
let indexes = server.search_indexes.read().unwrap();
|
||||
|
||||
let _search_index = indexes.get(&index_name)
|
||||
.ok_or_else(|| DBError(format!("Index '{}' not found", index_name)))?;
|
||||
|
||||
// For now, return success
|
||||
// In a full implementation, we'd need to add a delete method to TantivySearch
|
||||
println!("Deleting document '{}' from index '{}'", doc_id, index_name);
|
||||
|
||||
Ok(Protocol::SimpleString("1".to_string()))
|
||||
}
|
||||
|
||||
pub async fn ft_info_cmd(
|
||||
server: &Server,
|
||||
index_name: String,
|
||||
) -> Result<Protocol, DBError> {
|
||||
let indexes = server.search_indexes.read().unwrap();
|
||||
|
||||
let search_index = indexes.get(&index_name)
|
||||
.ok_or_else(|| DBError(format!("Index '{}' not found", index_name)))?;
|
||||
|
||||
let info = search_index.get_info()?;
|
||||
|
||||
// Format info as Redis protocol
|
||||
let mut response = Vec::new();
|
||||
|
||||
response.push(Protocol::BulkString("index_name".to_string()));
|
||||
response.push(Protocol::BulkString(info.name));
|
||||
|
||||
response.push(Protocol::BulkString("num_docs".to_string()));
|
||||
response.push(Protocol::BulkString(info.num_docs.to_string()));
|
||||
|
||||
response.push(Protocol::BulkString("num_fields".to_string()));
|
||||
response.push(Protocol::BulkString(info.fields.len().to_string()));
|
||||
|
||||
response.push(Protocol::BulkString("fields".to_string()));
|
||||
let fields_str = info.fields.iter()
|
||||
.map(|f| format!("{}:{}", f.name, f.field_type))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
response.push(Protocol::BulkString(fields_str));
|
||||
|
||||
Ok(Protocol::Array(response))
|
||||
}
|
||||
|
||||
pub async fn ft_drop_cmd(
|
||||
server: &Server,
|
||||
index_name: String,
|
||||
) -> Result<Protocol, DBError> {
|
||||
let mut indexes = server.search_indexes.write().unwrap();
|
||||
|
||||
if indexes.remove(&index_name).is_some() {
|
||||
// Also remove the index files from disk
|
||||
let index_path = server.search_index_path().join(&index_name);
|
||||
if index_path.exists() {
|
||||
std::fs::remove_dir_all(index_path)
|
||||
.map_err(|e| DBError(format!("Failed to remove index files: {}", e)))?;
|
||||
}
|
||||
Ok(Protocol::SimpleString("OK".to_string()))
|
||||
} else {
|
||||
Err(DBError(format!("Index '{}' not found", index_name)))
|
||||
}
|
||||
}
|
@@ -4,6 +4,7 @@ use std::sync::Arc;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::sync::{Mutex, oneshot};
|
||||
use std::sync::RwLock;
|
||||
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
@@ -14,15 +15,16 @@ use crate::protocol::Protocol;
|
||||
use crate::storage::Storage;
|
||||
use crate::storage_sled::SledStorage;
|
||||
use crate::storage_trait::StorageBackend;
|
||||
use crate::tantivy_search::TantivySearch;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Server {
|
||||
pub db_cache: std::sync::Arc<std::sync::RwLock<HashMap<u64, Arc<dyn StorageBackend>>>>,
|
||||
pub db_cache: Arc<RwLock<HashMap<u64, Arc<dyn StorageBackend>>>>,
|
||||
pub search_indexes: Arc<RwLock<HashMap<String, Arc<TantivySearch>>>>,
|
||||
pub option: options::DBOption,
|
||||
pub client_name: Option<String>,
|
||||
pub selected_db: u64, // Changed from usize to u64
|
||||
pub queued_cmd: Option<Vec<(Cmd, Protocol)>>,
|
||||
pub current_permissions: Option<crate::rpc::Permissions>,
|
||||
|
||||
// BLPOP waiter registry: per (db_index, key) FIFO of waiters
|
||||
pub list_waiters: Arc<Mutex<HashMap<u64, HashMap<String, Vec<Waiter>>>>>,
|
||||
@@ -44,12 +46,12 @@ pub enum PopSide {
|
||||
impl Server {
|
||||
pub async fn new(option: options::DBOption) -> Self {
|
||||
Server {
|
||||
db_cache: Arc::new(std::sync::RwLock::new(HashMap::new())),
|
||||
db_cache: Arc::new(RwLock::new(HashMap::new())),
|
||||
search_indexes: Arc::new(RwLock::new(HashMap::new())),
|
||||
option,
|
||||
client_name: None,
|
||||
selected_db: 0,
|
||||
queued_cmd: None,
|
||||
current_permissions: None,
|
||||
|
||||
list_waiters: Arc::new(Mutex::new(HashMap::new())),
|
||||
waiter_seq: Arc::new(AtomicU64::new(1)),
|
||||
@@ -102,15 +104,10 @@ impl Server {
|
||||
// DB 0-9 are non-encrypted, DB 10+ are encrypted
|
||||
self.option.encrypt && db_index >= 10
|
||||
}
|
||||
|
||||
/// Check if current permissions allow read operations
|
||||
pub fn has_read_permission(&self) -> bool {
|
||||
matches!(self.current_permissions, Some(crate::rpc::Permissions::Read) | Some(crate::rpc::Permissions::ReadWrite))
|
||||
}
|
||||
|
||||
/// Check if current permissions allow write operations
|
||||
pub fn has_write_permission(&self) -> bool {
|
||||
matches!(self.current_permissions, Some(crate::rpc::Permissions::ReadWrite))
|
||||
|
||||
// Add method to get search index path
|
||||
pub fn search_index_path(&self) -> std::path::PathBuf {
|
||||
std::path::PathBuf::from(&self.option.dir).join("search_indexes")
|
||||
}
|
||||
|
||||
// ----- BLPOP waiter helpers -----
|
||||
|
567
src/tantivy_search.rs
Normal file
567
src/tantivy_search.rs
Normal file
@@ -0,0 +1,567 @@
|
||||
use tantivy::{
|
||||
collector::TopDocs,
|
||||
directory::MmapDirectory,
|
||||
query::{QueryParser, BooleanQuery, Query, TermQuery, Occur},
|
||||
schema::{Schema, Field, TextOptions, TextFieldIndexing,
|
||||
STORED, STRING, Value},
|
||||
Index, IndexWriter, IndexReader, ReloadPolicy,
|
||||
Term, DateTime, TantivyDocument,
|
||||
tokenizer::{TokenizerManager},
|
||||
};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::collections::HashMap;
|
||||
use crate::error::DBError;
|
||||
use serde::{Serialize, Deserialize};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum FieldDef {
|
||||
Text {
|
||||
stored: bool,
|
||||
indexed: bool,
|
||||
tokenized: bool,
|
||||
fast: bool,
|
||||
},
|
||||
Numeric {
|
||||
stored: bool,
|
||||
indexed: bool,
|
||||
fast: bool,
|
||||
precision: NumericType,
|
||||
},
|
||||
Tag {
|
||||
stored: bool,
|
||||
separator: String,
|
||||
case_sensitive: bool,
|
||||
},
|
||||
Geo {
|
||||
stored: bool,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum NumericType {
|
||||
I64,
|
||||
U64,
|
||||
F64,
|
||||
Date,
|
||||
}
|
||||
|
||||
pub struct IndexSchema {
|
||||
schema: Schema,
|
||||
fields: HashMap<String, (Field, FieldDef)>,
|
||||
default_search_fields: Vec<Field>,
|
||||
}
|
||||
|
||||
pub struct TantivySearch {
|
||||
index: Index,
|
||||
writer: Arc<RwLock<IndexWriter>>,
|
||||
reader: IndexReader,
|
||||
index_schema: IndexSchema,
|
||||
name: String,
|
||||
config: IndexConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct IndexConfig {
|
||||
pub language: String,
|
||||
pub stopwords: Vec<String>,
|
||||
pub stemming: bool,
|
||||
pub max_doc_count: Option<usize>,
|
||||
pub default_score: f64,
|
||||
}
|
||||
|
||||
impl Default for IndexConfig {
|
||||
fn default() -> Self {
|
||||
IndexConfig {
|
||||
language: "english".to_string(),
|
||||
stopwords: vec![],
|
||||
stemming: true,
|
||||
max_doc_count: None,
|
||||
default_score: 1.0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TantivySearch {
|
||||
pub fn new_with_schema(
|
||||
base_path: PathBuf,
|
||||
name: String,
|
||||
field_definitions: Vec<(String, FieldDef)>,
|
||||
config: Option<IndexConfig>,
|
||||
) -> Result<Self, DBError> {
|
||||
let index_path = base_path.join(&name);
|
||||
std::fs::create_dir_all(&index_path)
|
||||
.map_err(|e| DBError(format!("Failed to create index dir: {}", e)))?;
|
||||
|
||||
// Build schema from field definitions
|
||||
let mut schema_builder = Schema::builder();
|
||||
let mut fields = HashMap::new();
|
||||
let mut default_search_fields = Vec::new();
|
||||
|
||||
// Always add a document ID field
|
||||
let id_field = schema_builder.add_text_field("_id", STRING | STORED);
|
||||
fields.insert("_id".to_string(), (id_field, FieldDef::Text {
|
||||
stored: true,
|
||||
indexed: true,
|
||||
tokenized: false,
|
||||
fast: false,
|
||||
}));
|
||||
|
||||
// Add user-defined fields
|
||||
for (field_name, field_def) in field_definitions {
|
||||
let field = match &field_def {
|
||||
FieldDef::Text { stored, indexed, tokenized, fast: _fast } => {
|
||||
let mut text_options = TextOptions::default();
|
||||
|
||||
if *stored {
|
||||
text_options = text_options.set_stored();
|
||||
}
|
||||
|
||||
if *indexed {
|
||||
let indexing_options = if *tokenized {
|
||||
TextFieldIndexing::default()
|
||||
.set_tokenizer("default")
|
||||
.set_index_option(tantivy::schema::IndexRecordOption::WithFreqsAndPositions)
|
||||
} else {
|
||||
TextFieldIndexing::default()
|
||||
.set_tokenizer("raw")
|
||||
.set_index_option(tantivy::schema::IndexRecordOption::Basic)
|
||||
};
|
||||
text_options = text_options.set_indexing_options(indexing_options);
|
||||
|
||||
let f = schema_builder.add_text_field(&field_name, text_options);
|
||||
if *tokenized {
|
||||
default_search_fields.push(f);
|
||||
}
|
||||
f
|
||||
} else {
|
||||
schema_builder.add_text_field(&field_name, text_options)
|
||||
}
|
||||
}
|
||||
FieldDef::Numeric { stored, indexed, fast, precision } => {
|
||||
match precision {
|
||||
NumericType::I64 => {
|
||||
let mut opts = tantivy::schema::NumericOptions::default();
|
||||
if *stored { opts = opts.set_stored(); }
|
||||
if *indexed { opts = opts.set_indexed(); }
|
||||
if *fast { opts = opts.set_fast(); }
|
||||
schema_builder.add_i64_field(&field_name, opts)
|
||||
}
|
||||
NumericType::U64 => {
|
||||
let mut opts = tantivy::schema::NumericOptions::default();
|
||||
if *stored { opts = opts.set_stored(); }
|
||||
if *indexed { opts = opts.set_indexed(); }
|
||||
if *fast { opts = opts.set_fast(); }
|
||||
schema_builder.add_u64_field(&field_name, opts)
|
||||
}
|
||||
NumericType::F64 => {
|
||||
let mut opts = tantivy::schema::NumericOptions::default();
|
||||
if *stored { opts = opts.set_stored(); }
|
||||
if *indexed { opts = opts.set_indexed(); }
|
||||
if *fast { opts = opts.set_fast(); }
|
||||
schema_builder.add_f64_field(&field_name, opts)
|
||||
}
|
||||
NumericType::Date => {
|
||||
let mut opts = tantivy::schema::DateOptions::default();
|
||||
if *stored { opts = opts.set_stored(); }
|
||||
if *indexed { opts = opts.set_indexed(); }
|
||||
if *fast { opts = opts.set_fast(); }
|
||||
schema_builder.add_date_field(&field_name, opts)
|
||||
}
|
||||
}
|
||||
}
|
||||
FieldDef::Tag { stored, separator: _, case_sensitive: _ } => {
|
||||
let mut text_options = TextOptions::default();
|
||||
if *stored {
|
||||
text_options = text_options.set_stored();
|
||||
}
|
||||
text_options = text_options.set_indexing_options(
|
||||
TextFieldIndexing::default()
|
||||
.set_tokenizer("raw")
|
||||
.set_index_option(tantivy::schema::IndexRecordOption::Basic)
|
||||
);
|
||||
schema_builder.add_text_field(&field_name, text_options)
|
||||
}
|
||||
FieldDef::Geo { stored } => {
|
||||
// For now, store as two f64 fields for lat/lon
|
||||
let mut opts = tantivy::schema::NumericOptions::default();
|
||||
if *stored { opts = opts.set_stored(); }
|
||||
opts = opts.set_indexed().set_fast();
|
||||
|
||||
let lat_field = schema_builder.add_f64_field(&format!("{}_lat", field_name), opts.clone());
|
||||
let lon_field = schema_builder.add_f64_field(&format!("{}_lon", field_name), opts);
|
||||
|
||||
fields.insert(format!("{}_lat", field_name), (lat_field, FieldDef::Numeric {
|
||||
stored: *stored,
|
||||
indexed: true,
|
||||
fast: true,
|
||||
precision: NumericType::F64,
|
||||
}));
|
||||
fields.insert(format!("{}_lon", field_name), (lon_field, FieldDef::Numeric {
|
||||
stored: *stored,
|
||||
indexed: true,
|
||||
fast: true,
|
||||
precision: NumericType::F64,
|
||||
}));
|
||||
continue; // Skip adding the geo field itself
|
||||
}
|
||||
};
|
||||
|
||||
fields.insert(field_name.clone(), (field, field_def));
|
||||
}
|
||||
|
||||
let schema = schema_builder.build();
|
||||
let index_schema = IndexSchema {
|
||||
schema: schema.clone(),
|
||||
fields,
|
||||
default_search_fields,
|
||||
};
|
||||
|
||||
// Create or open index
|
||||
let dir = MmapDirectory::open(&index_path)
|
||||
.map_err(|e| DBError(format!("Failed to open index directory: {}", e)))?;
|
||||
|
||||
let mut index = Index::open_or_create(dir, schema)
|
||||
.map_err(|e| DBError(format!("Failed to create index: {}", e)))?;
|
||||
|
||||
// Configure tokenizers
|
||||
let tokenizer_manager = TokenizerManager::default();
|
||||
index.set_tokenizers(tokenizer_manager);
|
||||
|
||||
let writer = index.writer(1_000_000)
|
||||
.map_err(|e| DBError(format!("Failed to create index writer: {}", e)))?;
|
||||
|
||||
let reader = index
|
||||
.reader_builder()
|
||||
.reload_policy(ReloadPolicy::OnCommitWithDelay)
|
||||
.try_into()
|
||||
.map_err(|e| DBError(format!("Failed to create reader: {}", e)))?;
|
||||
|
||||
let config = config.unwrap_or_default();
|
||||
|
||||
Ok(TantivySearch {
|
||||
index,
|
||||
writer: Arc::new(RwLock::new(writer)),
|
||||
reader,
|
||||
index_schema,
|
||||
name,
|
||||
config,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn add_document_with_fields(
|
||||
&self,
|
||||
doc_id: &str,
|
||||
fields: HashMap<String, String>,
|
||||
) -> Result<(), DBError> {
|
||||
let mut writer = self.writer.write()
|
||||
.map_err(|e| DBError(format!("Failed to acquire writer lock: {}", e)))?;
|
||||
|
||||
// Delete existing document with same ID
|
||||
if let Some((id_field, _)) = self.index_schema.fields.get("_id") {
|
||||
writer.delete_term(Term::from_field_text(*id_field, doc_id));
|
||||
}
|
||||
|
||||
// Create new document
|
||||
let mut doc = tantivy::doc!();
|
||||
|
||||
// Add document ID
|
||||
if let Some((id_field, _)) = self.index_schema.fields.get("_id") {
|
||||
doc.add_text(*id_field, doc_id);
|
||||
}
|
||||
|
||||
// Add other fields based on schema
|
||||
for (field_name, field_value) in fields {
|
||||
if let Some((field, field_def)) = self.index_schema.fields.get(&field_name) {
|
||||
match field_def {
|
||||
FieldDef::Text { .. } => {
|
||||
doc.add_text(*field, &field_value);
|
||||
}
|
||||
FieldDef::Numeric { precision, .. } => {
|
||||
match precision {
|
||||
NumericType::I64 => {
|
||||
if let Ok(v) = field_value.parse::<i64>() {
|
||||
doc.add_i64(*field, v);
|
||||
}
|
||||
}
|
||||
NumericType::U64 => {
|
||||
if let Ok(v) = field_value.parse::<u64>() {
|
||||
doc.add_u64(*field, v);
|
||||
}
|
||||
}
|
||||
NumericType::F64 => {
|
||||
if let Ok(v) = field_value.parse::<f64>() {
|
||||
doc.add_f64(*field, v);
|
||||
}
|
||||
}
|
||||
NumericType::Date => {
|
||||
if let Ok(v) = field_value.parse::<i64>() {
|
||||
doc.add_date(*field, DateTime::from_timestamp_millis(v));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
FieldDef::Tag { separator, case_sensitive, .. } => {
|
||||
let tags = if !case_sensitive {
|
||||
field_value.to_lowercase()
|
||||
} else {
|
||||
field_value.clone()
|
||||
};
|
||||
|
||||
// Store tags as separate terms for efficient filtering
|
||||
for tag in tags.split(separator.as_str()) {
|
||||
doc.add_text(*field, tag.trim());
|
||||
}
|
||||
}
|
||||
FieldDef::Geo { .. } => {
|
||||
// Parse "lat,lon" format
|
||||
let parts: Vec<&str> = field_value.split(',').collect();
|
||||
if parts.len() == 2 {
|
||||
if let (Ok(lat), Ok(lon)) = (parts[0].parse::<f64>(), parts[1].parse::<f64>()) {
|
||||
if let Some((lat_field, _)) = self.index_schema.fields.get(&format!("{}_lat", field_name)) {
|
||||
doc.add_f64(*lat_field, lat);
|
||||
}
|
||||
if let Some((lon_field, _)) = self.index_schema.fields.get(&format!("{}_lon", field_name)) {
|
||||
doc.add_f64(*lon_field, lon);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
writer.add_document(doc).map_err(|e| DBError(format!("Failed to add document: {}", e)))?;
|
||||
|
||||
writer.commit()
|
||||
.map_err(|e| DBError(format!("Failed to commit: {}", e)))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn search_with_options(
|
||||
&self,
|
||||
query_str: &str,
|
||||
options: SearchOptions,
|
||||
) -> Result<SearchResults, DBError> {
|
||||
let searcher = self.reader.searcher();
|
||||
|
||||
// Parse query based on search fields
|
||||
let query: Box<dyn Query> = if self.index_schema.default_search_fields.is_empty() {
|
||||
return Err(DBError("No searchable fields defined in schema".to_string()));
|
||||
} else {
|
||||
let query_parser = QueryParser::for_index(
|
||||
&self.index,
|
||||
self.index_schema.default_search_fields.clone(),
|
||||
);
|
||||
|
||||
Box::new(query_parser.parse_query(query_str)
|
||||
.map_err(|e| DBError(format!("Failed to parse query: {}", e)))?)
|
||||
};
|
||||
|
||||
// Apply filters if any
|
||||
let final_query = if !options.filters.is_empty() {
|
||||
let mut clauses: Vec<(Occur, Box<dyn Query>)> = vec![(Occur::Must, query)];
|
||||
|
||||
// Add filters
|
||||
for filter in options.filters {
|
||||
if let Some((field, _)) = self.index_schema.fields.get(&filter.field) {
|
||||
match filter.filter_type {
|
||||
FilterType::Equals(value) => {
|
||||
let term_query = TermQuery::new(
|
||||
Term::from_field_text(*field, &value),
|
||||
tantivy::schema::IndexRecordOption::Basic,
|
||||
);
|
||||
clauses.push((Occur::Must, Box::new(term_query)));
|
||||
}
|
||||
FilterType::Range { min: _, max: _ } => {
|
||||
// Would need numeric field handling here
|
||||
// Simplified for now
|
||||
}
|
||||
FilterType::InSet(values) => {
|
||||
let mut sub_clauses: Vec<(Occur, Box<dyn Query>)> = vec![];
|
||||
for value in values {
|
||||
let term_query = TermQuery::new(
|
||||
Term::from_field_text(*field, &value),
|
||||
tantivy::schema::IndexRecordOption::Basic,
|
||||
);
|
||||
sub_clauses.push((Occur::Should, Box::new(term_query)));
|
||||
}
|
||||
clauses.push((Occur::Must, Box::new(BooleanQuery::new(sub_clauses))));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Box::new(BooleanQuery::new(clauses))
|
||||
} else {
|
||||
query
|
||||
};
|
||||
|
||||
// Execute search
|
||||
let top_docs = searcher.search(
|
||||
&*final_query,
|
||||
&TopDocs::with_limit(options.limit + options.offset)
|
||||
).map_err(|e| DBError(format!("Search failed: {}", e)))?;
|
||||
|
||||
let total_hits = top_docs.len();
|
||||
let mut documents = Vec::new();
|
||||
|
||||
for (score, doc_address) in top_docs.iter().skip(options.offset).take(options.limit) {
|
||||
let retrieved_doc: TantivyDocument = searcher.doc(*doc_address)
|
||||
.map_err(|e| DBError(format!("Failed to retrieve doc: {}", e)))?;
|
||||
|
||||
let mut doc_fields = HashMap::new();
|
||||
|
||||
// Extract all stored fields
|
||||
for (field_name, (field, field_def)) in &self.index_schema.fields {
|
||||
match field_def {
|
||||
FieldDef::Text { stored, .. } |
|
||||
FieldDef::Tag { stored, .. } => {
|
||||
if *stored {
|
||||
if let Some(value) = retrieved_doc.get_first(*field) {
|
||||
if let Some(text) = value.as_str() {
|
||||
doc_fields.insert(field_name.clone(), text.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
FieldDef::Numeric { stored, precision, .. } => {
|
||||
if *stored {
|
||||
let value_str = match precision {
|
||||
NumericType::I64 => {
|
||||
retrieved_doc.get_first(*field)
|
||||
.and_then(|v| v.as_i64())
|
||||
.map(|v| v.to_string())
|
||||
}
|
||||
NumericType::U64 => {
|
||||
retrieved_doc.get_first(*field)
|
||||
.and_then(|v| v.as_u64())
|
||||
.map(|v| v.to_string())
|
||||
}
|
||||
NumericType::F64 => {
|
||||
retrieved_doc.get_first(*field)
|
||||
.and_then(|v| v.as_f64())
|
||||
.map(|v| v.to_string())
|
||||
}
|
||||
NumericType::Date => {
|
||||
retrieved_doc.get_first(*field)
|
||||
.and_then(|v| v.as_datetime())
|
||||
.map(|v| v.into_timestamp_millis().to_string())
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(v) = value_str {
|
||||
doc_fields.insert(field_name.clone(), v);
|
||||
}
|
||||
}
|
||||
}
|
||||
FieldDef::Geo { stored } => {
|
||||
if *stored {
|
||||
let lat_field = self.index_schema.fields.get(&format!("{}_lat", field_name)).unwrap().0;
|
||||
let lon_field = self.index_schema.fields.get(&format!("{}_lon", field_name)).unwrap().0;
|
||||
|
||||
let lat = retrieved_doc.get_first(lat_field).and_then(|v| v.as_f64());
|
||||
let lon = retrieved_doc.get_first(lon_field).and_then(|v| v.as_f64());
|
||||
|
||||
if let (Some(lat), Some(lon)) = (lat, lon) {
|
||||
doc_fields.insert(field_name.clone(), format!("{},{}", lat, lon));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
documents.push(SearchDocument {
|
||||
fields: doc_fields,
|
||||
score: *score,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(SearchResults {
|
||||
total: total_hits,
|
||||
documents,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_info(&self) -> Result<IndexInfo, DBError> {
|
||||
let searcher = self.reader.searcher();
|
||||
let num_docs = searcher.num_docs();
|
||||
|
||||
let fields_info: Vec<FieldInfo> = self.index_schema.fields.iter().map(|(name, (_, def))| {
|
||||
FieldInfo {
|
||||
name: name.clone(),
|
||||
field_type: format!("{:?}", def),
|
||||
}
|
||||
}).collect();
|
||||
|
||||
Ok(IndexInfo {
|
||||
name: self.name.clone(),
|
||||
num_docs,
|
||||
fields: fields_info,
|
||||
config: self.config.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SearchOptions {
|
||||
pub limit: usize,
|
||||
pub offset: usize,
|
||||
pub filters: Vec<Filter>,
|
||||
pub sort_by: Option<String>,
|
||||
pub return_fields: Option<Vec<String>>,
|
||||
pub highlight: bool,
|
||||
}
|
||||
|
||||
impl Default for SearchOptions {
|
||||
fn default() -> Self {
|
||||
SearchOptions {
|
||||
limit: 10,
|
||||
offset: 0,
|
||||
filters: vec![],
|
||||
sort_by: None,
|
||||
return_fields: None,
|
||||
highlight: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Filter {
|
||||
pub field: String,
|
||||
pub filter_type: FilterType,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum FilterType {
|
||||
Equals(String),
|
||||
Range { min: String, max: String },
|
||||
InSet(Vec<String>),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SearchResults {
|
||||
pub total: usize,
|
||||
pub documents: Vec<SearchDocument>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SearchDocument {
|
||||
pub fields: HashMap<String, String>,
|
||||
pub score: f32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct IndexInfo {
|
||||
pub name: String,
|
||||
pub num_docs: u64,
|
||||
pub fields: Vec<FieldInfo>,
|
||||
pub config: IndexConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct FieldInfo {
|
||||
pub name: String,
|
||||
pub field_type: String,
|
||||
}
|
@@ -1,62 +0,0 @@
|
||||
use std::net::SocketAddr;
|
||||
use jsonrpsee::http_client::HttpClientBuilder;
|
||||
use jsonrpsee::core::client::ClientT;
|
||||
use serde_json::json;
|
||||
|
||||
use herodb::rpc::{RpcClient, BackendType, DatabaseConfig};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_rpc_server_basic() {
|
||||
// This test would require starting the RPC server in a separate thread
|
||||
// For now, we'll just test that the types compile correctly
|
||||
|
||||
// Test serialization of types
|
||||
let backend = BackendType::Redb;
|
||||
let config = DatabaseConfig {
|
||||
name: Some("test_db".to_string()),
|
||||
storage_path: Some("/tmp/test".to_string()),
|
||||
max_size: Some(1024 * 1024),
|
||||
redis_version: Some("7.0".to_string()),
|
||||
};
|
||||
|
||||
let backend_json = serde_json::to_string(&backend).unwrap();
|
||||
let config_json = serde_json::to_string(&config).unwrap();
|
||||
|
||||
assert_eq!(backend_json, "\"Redb\"");
|
||||
assert!(config_json.contains("test_db"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_database_config_serialization() {
|
||||
let config = DatabaseConfig {
|
||||
name: Some("my_db".to_string()),
|
||||
storage_path: None,
|
||||
max_size: Some(1000000),
|
||||
redis_version: Some("7.0".to_string()),
|
||||
};
|
||||
|
||||
let json = serde_json::to_value(&config).unwrap();
|
||||
assert_eq!(json["name"], "my_db");
|
||||
assert_eq!(json["max_size"], 1000000);
|
||||
assert_eq!(json["redis_version"], "7.0");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_backend_type_serialization() {
|
||||
// Test that both Redb and Sled backends serialize correctly
|
||||
let redb_backend = BackendType::Redb;
|
||||
let sled_backend = BackendType::Sled;
|
||||
|
||||
let redb_json = serde_json::to_string(&redb_backend).unwrap();
|
||||
let sled_json = serde_json::to_string(&sled_backend).unwrap();
|
||||
|
||||
assert_eq!(redb_json, "\"Redb\"");
|
||||
assert_eq!(sled_json, "\"Sled\"");
|
||||
|
||||
// Test deserialization
|
||||
let redb_deserialized: BackendType = serde_json::from_str(&redb_json).unwrap();
|
||||
let sled_deserialized: BackendType = serde_json::from_str(&sled_json).unwrap();
|
||||
|
||||
assert!(matches!(redb_deserialized, BackendType::Redb));
|
||||
assert!(matches!(sled_deserialized, BackendType::Sled));
|
||||
}
|
@@ -501,11 +501,11 @@ async fn test_07_age_stateless_suite() {
|
||||
let mut s = connect(port).await;
|
||||
|
||||
// GENENC -> [recipient, identity]
|
||||
let genenc = send_cmd(&mut s, &["AGE", "GENENC"]).await;
|
||||
let gen = send_cmd(&mut s, &["AGE", "GENENC"]).await;
|
||||
assert!(
|
||||
genenc.starts_with("*2\r\n$"),
|
||||
gen.starts_with("*2\r\n$"),
|
||||
"AGE GENENC should return array [recipient, identity], got:\n{}",
|
||||
genenc
|
||||
gen
|
||||
);
|
||||
|
||||
// Parse simple RESP array of two bulk strings to extract keys
|
||||
@@ -520,7 +520,7 @@ async fn test_07_age_stateless_suite() {
|
||||
let ident = lines.next().unwrap_or("").to_string();
|
||||
(recip, ident)
|
||||
}
|
||||
let (recipient, identity) = parse_two_bulk_array(&genenc);
|
||||
let (recipient, identity) = parse_two_bulk_array(&gen);
|
||||
assert!(
|
||||
recipient.starts_with("age1") && identity.starts_with("AGE-SECRET-KEY-1"),
|
||||
"Unexpected AGE key formats.\nrecipient: {}\nidentity: {}",
|
||||
|
Reference in New Issue
Block a user