8 Commits

Author SHA1 Message Date
9410176684 ... 2025-08-25 06:00:08 +02:00
ab56fad635 ... 2025-08-23 05:46:38 +02:00
a1127b72da ... 2025-08-23 05:20:42 +02:00
3850df89be ... 2025-08-23 05:15:45 +02:00
45195d403e ... 2025-08-23 05:12:17 +02:00
f17b441ca1 ... 2025-08-23 05:07:45 +02:00
ff4ea1d844 ... 2025-08-23 05:04:37 +02:00
c9e1dcdb6c ... 2025-08-23 04:57:47 +02:00
53 changed files with 17588 additions and 84 deletions

5723
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,9 +1,41 @@
[workspace]
members = ["herodb"]
resolver = "2"
[package]
name = "herodb"
version = "0.0.1"
authors = ["Pin Fang <fpfangpin@hotmail.com>"]
edition = "2021"
# You can define shared profiles for all workspace members here
[profile.release]
lto = true
codegen-units = 1
strip = true
[dependencies]
anyhow = "1.0.59"
bytes = "1.3.0"
thiserror = "1.0.32"
tokio = { version = "1.23.0", features = ["full"] }
clap = { version = "4.5.20", features = ["derive"] }
byteorder = "1.4.3"
futures = "0.3"
sled = "0.34"
redb = "2.1.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
bincode = "1.3"
chacha20poly1305 = "0.10.1"
rand = "0.8"
sha2 = "0.10"
age = "0.10"
secrecy = "0.8"
ed25519-dalek = "2"
base64 = "0.22"
# Lance vector database dependencies
lance = "0.33"
lance-index = "0.33"
lance-linalg = "0.33"
# Use Arrow version compatible with Lance 0.33
arrow = "55.2"
arrow-array = "55.2"
arrow-schema = "55.2"
parquet = "55.2"
uuid = { version = "1.10", features = ["v4"] }
reqwest = { version = "0.11", features = ["json"] }
image = "0.25"
[dev-dependencies]
redis = { version = "0.24", features = ["aio", "tokio-comp"] }

View File

@@ -70,6 +70,15 @@ MULTI/EXEC/DISCARD | ✅ | ❌ | Only supported in redb |
**Encryption** | | | |
Data-at-rest encryption | ✅ | ✅ | Both support [age](age.tech) encryption |
AGE commands | ✅ | ✅ | Both support AGE crypto commands |
**Full-Text Search** | | | |
FT.CREATE | ✅ | ✅ | Create search index with schema |
FT.ADD | ✅ | ✅ | Add document to search index |
FT.SEARCH | ✅ | ✅ | Search documents with query |
FT.DEL | ✅ | ✅ | Delete document from index |
FT.INFO | ✅ | ✅ | Get index information |
FT.DROP | ✅ | ✅ | Drop search index |
FT.ALTER | ✅ | ✅ | Alter index schema |
FT.AGGREGATE | ✅ | ✅ | Aggregate search results |
### Performance Considerations

454
docs/lance_vector_db.md Normal file
View File

@@ -0,0 +1,454 @@
# Lance Vector Database Operations
HeroDB includes a powerful vector database integration using Lance, enabling high-performance vector storage, search, and multimodal data management. By default, it uses Ollama for local text embeddings, with support for custom external embedding services.
## Overview
The Lance vector database integration provides:
- **High-performance vector storage** using Lance's columnar format
- **Local Ollama integration** for text embeddings (default, no external dependencies)
- **Custom embedding service support** for advanced use cases
- **Text embedding support** (images via custom services)
- **Vector similarity search** with configurable parameters
- **Scalable indexing** with IVF_PQ (Inverted File with Product Quantization)
- **Redis-compatible command interface**
## Architecture
```
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐
│ HeroDB │ │ External │ │ Lance │
│ Redis Server │◄──►│ Embedding │ │ Vector Store │
│ │ │ Service │ │ │
└─────────────────┘ └──────────────────┘ └─────────────────┘
│ │ │
│ │ │
Redis Protocol HTTP API Arrow/Parquet
Commands JSON Requests Columnar Storage
```
### Key Components
1. **Lance Store**: High-performance columnar vector storage
2. **Ollama Integration**: Local embedding service (default)
3. **Custom Embedding Service**: Optional HTTP API for advanced use cases
4. **Redis Command Interface**: Familiar Redis-style commands
5. **Arrow Schema**: Flexible schema definition for metadata
## Configuration
### Default Setup (Ollama)
HeroDB uses Ollama by default for text embeddings. No configuration is required if Ollama is running locally:
```bash
# Install Ollama (if not already installed)
# Visit: https://ollama.ai
# Pull the embedding model
ollama pull nomic-embed-text
# Ollama automatically runs on localhost:11434
# HeroDB will use this by default
```
**Default Configuration:**
- **URL**: `http://localhost:11434`
- **Model**: `nomic-embed-text`
- **Dimensions**: 768 (for nomic-embed-text)
### Custom Embedding Service (Optional)
To use a custom embedding service instead of Ollama:
```bash
# Set custom embedding service URL
redis-cli HSET config:core:aiembed url "http://your-embedding-service:8080/embed"
# Optional: Set authentication if required
redis-cli HSET config:core:aiembed token "your-api-token"
```
### Embedding Service API Contracts
#### Ollama API (Default)
HeroDB calls Ollama using this format:
```bash
POST http://localhost:11434/api/embeddings
Content-Type: application/json
{
"model": "nomic-embed-text",
"prompt": "Your text to embed"
}
```
Response:
```json
{
"embedding": [0.1, 0.2, 0.3, ...]
}
```
#### Custom Service API
Your custom embedding service should accept POST requests with this JSON format:
```json
{
"texts": ["text1", "text2"], // Optional: array of texts
"images": ["base64_image1", "base64_image2"], // Optional: base64 encoded images
"model": "your-model-name" // Optional: model specification
}
```
And return responses in this format:
```json
{
"embeddings": [[0.1, 0.2, ...], [0.3, 0.4, ...]], // Array of embedding vectors
"model": "model-name", // Model used
"usage": { // Optional usage stats
"tokens": 100,
"requests": 2
}
}
```
## Commands Reference
### Dataset Management
#### LANCE CREATE
Create a new vector dataset with specified dimensions and optional schema.
```bash
LANCE CREATE <dataset> DIM <dimension> [SCHEMA field:type ...]
```
**Parameters:**
- `dataset`: Name of the dataset
- `dimension`: Vector dimension (e.g., 384, 768, 1536)
- `field:type`: Optional metadata fields (string, int, float, bool)
**Examples:**
```bash
# Create a simple dataset for 384-dimensional vectors
LANCE CREATE documents DIM 384
# Create dataset with metadata schema
LANCE CREATE products DIM 768 SCHEMA category:string price:float available:bool
```
#### LANCE LIST
List all available datasets.
```bash
LANCE LIST
```
**Returns:** Array of dataset names
#### LANCE INFO
Get information about a specific dataset.
```bash
LANCE INFO <dataset>
```
**Returns:** Dataset metadata including name, version, row count, and schema
#### LANCE DROP
Delete a dataset and all its data.
```bash
LANCE DROP <dataset>
```
### Data Operations
#### LANCE STORE
Store multimodal data (text/images) with automatic embedding generation.
```bash
LANCE STORE <dataset> [TEXT <text>] [IMAGE <base64>] [key value ...]
```
**Parameters:**
- `dataset`: Target dataset name
- `TEXT`: Text content to embed
- `IMAGE`: Base64-encoded image to embed
- `key value`: Metadata key-value pairs
**Examples:**
```bash
# Store text with metadata
LANCE STORE documents TEXT "Machine learning is transforming industries" category "AI" author "John Doe"
# Store image with metadata
LANCE STORE images IMAGE "iVBORw0KGgoAAAANSUhEUgAA..." category "nature" tags "landscape,mountains"
# Store both text and image
LANCE STORE multimodal TEXT "Beautiful sunset" IMAGE "base64data..." location "California"
```
**Returns:** Unique ID of the stored item
### Search Operations
#### LANCE SEARCH
Search using a raw vector.
```bash
LANCE SEARCH <dataset> VECTOR <vector> K <k> [NPROBES <n>] [REFINE <r>]
```
**Parameters:**
- `dataset`: Dataset to search
- `vector`: Comma-separated vector values (e.g., "0.1,0.2,0.3")
- `k`: Number of results to return
- `NPROBES`: Number of partitions to search (optional)
- `REFINE`: Refine factor for better accuracy (optional)
**Example:**
```bash
LANCE SEARCH documents VECTOR "0.1,0.2,0.3,0.4" K 5 NPROBES 10
```
#### LANCE SEARCH.TEXT
Search using text query (automatically embedded).
```bash
LANCE SEARCH.TEXT <dataset> <query_text> K <k> [NPROBES <n>] [REFINE <r>]
```
**Parameters:**
- `dataset`: Dataset to search
- `query_text`: Text query to search for
- `k`: Number of results to return
- `NPROBES`: Number of partitions to search (optional)
- `REFINE`: Refine factor for better accuracy (optional)
**Example:**
```bash
LANCE SEARCH.TEXT documents "artificial intelligence applications" K 10 NPROBES 20
```
**Returns:** Array of results with distance scores and metadata
### Embedding Operations
#### LANCE EMBED.TEXT
Generate embeddings for text without storing.
```bash
LANCE EMBED.TEXT <text1> [text2] [text3] ...
```
**Example:**
```bash
LANCE EMBED.TEXT "Hello world" "Machine learning" "Vector database"
```
**Returns:** Array of embedding vectors
### Index Management
#### LANCE CREATE.INDEX
Create a vector index for faster search performance.
```bash
LANCE CREATE.INDEX <dataset> <index_type> [PARTITIONS <n>] [SUBVECTORS <n>]
```
**Parameters:**
- `dataset`: Dataset to index
- `index_type`: Index type (currently supports "IVF_PQ")
- `PARTITIONS`: Number of partitions (default: 256)
- `SUBVECTORS`: Number of sub-vectors for PQ (default: 16)
**Example:**
```bash
LANCE CREATE.INDEX documents IVF_PQ PARTITIONS 512 SUBVECTORS 32
```
## Usage Patterns
### 1. Document Search System
```bash
# Setup
LANCE CREATE documents DIM 384 SCHEMA title:string content:string category:string
# Store documents
LANCE STORE documents TEXT "Introduction to machine learning algorithms" title "ML Basics" category "education"
LANCE STORE documents TEXT "Deep learning neural networks explained" title "Deep Learning" category "education"
LANCE STORE documents TEXT "Building scalable web applications" title "Web Dev" category "programming"
# Create index for better performance
LANCE CREATE.INDEX documents IVF_PQ PARTITIONS 256
# Search
LANCE SEARCH.TEXT documents "neural networks" K 5
```
### 2. Image Similarity Search
```bash
# Setup
LANCE CREATE images DIM 512 SCHEMA filename:string tags:string
# Store images (base64 encoded)
LANCE STORE images IMAGE "iVBORw0KGgoAAAANSUhEUgAA..." filename "sunset.jpg" tags "nature,landscape"
LANCE STORE images IMAGE "iVBORw0KGgoAAAANSUhEUgBB..." filename "city.jpg" tags "urban,architecture"
# Search by image
LANCE STORE temp_search IMAGE "query_image_base64..."
# Then use the returned ID to get embedding and search
```
### 3. Multimodal Content Management
```bash
# Setup
LANCE CREATE content DIM 768 SCHEMA type:string source:string
# Store mixed content
LANCE STORE content TEXT "Product description for smartphone" type "product" source "catalog"
LANCE STORE content IMAGE "product_image_base64..." type "product_image" source "catalog"
# Search across all content types
LANCE SEARCH.TEXT content "smartphone features" K 10
```
## Performance Considerations
### Vector Dimensions
- **384**: Good for general text (e.g., sentence-transformers)
- **768**: Standard for BERT-like models
- **1536**: OpenAI text-embedding-ada-002
- **Higher dimensions**: Better accuracy but slower search
### Index Configuration
- **More partitions**: Better for larger datasets (>100K vectors)
- **More sub-vectors**: Better compression but slower search
- **NPROBES**: Higher values = better accuracy, slower search
### Best Practices
1. **Create indexes** for datasets with >1000 vectors
2. **Use appropriate dimensions** based on your embedding model
3. **Configure NPROBES** based on accuracy vs speed requirements
4. **Batch operations** when possible for better performance
5. **Monitor embedding service** response times and rate limits
## Error Handling
Common error scenarios and solutions:
### Embedding Service Errors
```bash
# Error: Embedding service not configured
ERR Embedding service URL not configured. Set it with: HSET config:core:aiembed url <YOUR_EMBEDDING_SERVICE_URL>
# Error: Service unavailable
ERR Embedding service returned error 404 Not Found
```
**Solution:** Ensure embedding service is running and URL is correct.
### Dataset Errors
```bash
# Error: Dataset doesn't exist
ERR Dataset 'mydata' does not exist
# Error: Dimension mismatch
ERR Vector dimension mismatch: expected 384, got 768
```
**Solution:** Create dataset first or check vector dimensions.
### Search Errors
```bash
# Error: Invalid vector format
ERR Invalid vector format
# Error: No index available
ERR No index available for fast search
```
**Solution:** Check vector format or create an index.
## Integration Examples
### With Python
```python
import redis
import json
r = redis.Redis(host='localhost', port=6379)
# Create dataset
r.execute_command('LANCE', 'CREATE', 'docs', 'DIM', '384')
# Store document
result = r.execute_command('LANCE', 'STORE', 'docs',
'TEXT', 'Machine learning tutorial',
'category', 'education')
print(f"Stored with ID: {result}")
# Search
results = r.execute_command('LANCE', 'SEARCH.TEXT', 'docs',
'machine learning', 'K', '5')
print(f"Search results: {results}")
```
### With Node.js
```javascript
const redis = require('redis');
const client = redis.createClient();
// Create dataset
await client.sendCommand(['LANCE', 'CREATE', 'docs', 'DIM', '384']);
// Store document
const id = await client.sendCommand(['LANCE', 'STORE', 'docs',
'TEXT', 'Deep learning guide',
'category', 'AI']);
// Search
const results = await client.sendCommand(['LANCE', 'SEARCH.TEXT', 'docs',
'deep learning', 'K', '10']);
```
## Monitoring and Maintenance
### Health Checks
```bash
# Check if Lance store is available
LANCE LIST
# Check dataset health
LANCE INFO mydataset
# Test embedding service
LANCE EMBED.TEXT "test"
```
### Maintenance Operations
```bash
# Backup: Use standard Redis backup procedures
# The Lance data is stored separately in the data directory
# Cleanup: Remove unused datasets
LANCE DROP old_dataset
# Reindex: Drop and recreate indexes if needed
LANCE DROP dataset_name
LANCE CREATE dataset_name DIM 384
# Re-import data
LANCE CREATE.INDEX dataset_name IVF_PQ
```
This integration provides a powerful foundation for building AI-powered applications with vector search capabilities while maintaining the familiar Redis interface.

397
docs/search.md Normal file
View File

@@ -0,0 +1,397 @@
# Full-Text Search with Tantivy
HeroDB includes powerful full-text search capabilities powered by [Tantivy](https://github.com/quickwit-oss/tantivy), a fast full-text search engine library written in Rust. This provides Redis-compatible search commands similar to RediSearch.
## Overview
The search functionality allows you to:
- Create search indexes with custom schemas
- Index documents with multiple field types
- Perform complex queries with filters
- Support for text, numeric, date, and geographic data
- Real-time search with high performance
## Search Commands
### FT.CREATE - Create Search Index
Create a new search index with a defined schema.
```bash
FT.CREATE index_name SCHEMA field_name field_type [options] [field_name field_type [options] ...]
```
**Field Types:**
- `TEXT` - Full-text searchable text fields
- `NUMERIC` - Numeric fields (integers, floats)
- `TAG` - Tag fields for exact matching
- `GEO` - Geographic coordinates (lat,lon)
- `DATE` - Date/timestamp fields
**Field Options:**
- `STORED` - Store field value for retrieval
- `INDEXED` - Make field searchable
- `TOKENIZED` - Enable tokenization for text fields
- `FAST` - Enable fast access for numeric fields
**Example:**
```bash
# Create a product search index
FT.CREATE products SCHEMA
title TEXT STORED INDEXED TOKENIZED
description TEXT STORED INDEXED TOKENIZED
price NUMERIC STORED INDEXED FAST
category TAG STORED
location GEO STORED
created_date DATE STORED INDEXED
```
### FT.ADD - Add Document to Index
Add a document to a search index.
```bash
FT.ADD index_name doc_id [SCORE score] FIELDS field_name field_value [field_name field_value ...]
```
**Example:**
```bash
# Add a product document
FT.ADD products product:1 SCORE 1.0 FIELDS
title "Wireless Headphones"
description "High-quality wireless headphones with noise cancellation"
price 199.99
category "electronics"
location "37.7749,-122.4194"
created_date 1640995200000
```
### FT.SEARCH - Search Documents
Search for documents in an index.
```bash
FT.SEARCH index_name query [LIMIT offset count] [FILTER field min max] [RETURN field [field ...]]
```
**Query Syntax:**
- Simple terms: `wireless headphones`
- Phrase queries: `"noise cancellation"`
- Field-specific: `title:wireless`
- Boolean operators: `wireless AND headphones`
- Wildcards: `head*`
**Examples:**
```bash
# Simple text search
FT.SEARCH products "wireless headphones"
# Search with filters
FT.SEARCH products "headphones" FILTER price 100 300 LIMIT 0 10
# Field-specific search
FT.SEARCH products "title:wireless AND category:electronics"
# Return specific fields only
FT.SEARCH products "*" RETURN title price
```
### FT.DEL - Delete Document
Remove a document from the search index.
```bash
FT.DEL index_name doc_id
```
**Example:**
```bash
FT.DEL products product:1
```
### FT.INFO - Get Index Information
Get information about a search index.
```bash
FT.INFO index_name
```
**Returns:**
- Index name and document count
- Field definitions and types
- Index configuration
**Example:**
```bash
FT.INFO products
```
### FT.DROP - Drop Index
Delete an entire search index.
```bash
FT.DROP index_name
```
**Example:**
```bash
FT.DROP products
```
### FT.ALTER - Alter Index Schema
Add new fields to an existing index.
```bash
FT.ALTER index_name SCHEMA ADD field_name field_type [options]
```
**Example:**
```bash
FT.ALTER products SCHEMA ADD brand TAG STORED
```
### FT.AGGREGATE - Aggregate Search Results
Perform aggregations on search results.
```bash
FT.AGGREGATE index_name query [GROUPBY field] [REDUCE function field AS alias]
```
**Example:**
```bash
# Group products by category and count
FT.AGGREGATE products "*" GROUPBY category REDUCE COUNT 0 AS count
```
## Field Types in Detail
### TEXT Fields
- **Purpose**: Full-text search on natural language content
- **Features**: Tokenization, stemming, stop-word removal
- **Options**: `STORED`, `INDEXED`, `TOKENIZED`
- **Example**: Product titles, descriptions, content
### NUMERIC Fields
- **Purpose**: Numeric data for range queries and sorting
- **Types**: I64, U64, F64
- **Options**: `STORED`, `INDEXED`, `FAST`
- **Example**: Prices, quantities, ratings
### TAG Fields
- **Purpose**: Exact-match categorical data
- **Features**: No tokenization, exact string matching
- **Options**: `STORED`, case sensitivity control
- **Example**: Categories, brands, status values
### GEO Fields
- **Purpose**: Geographic coordinates
- **Format**: "latitude,longitude" (e.g., "37.7749,-122.4194")
- **Features**: Geographic distance queries
- **Options**: `STORED`
### DATE Fields
- **Purpose**: Timestamp and date data
- **Format**: Unix timestamp in milliseconds
- **Features**: Range queries, temporal filtering
- **Options**: `STORED`, `INDEXED`, `FAST`
## Search Query Syntax
### Basic Queries
```bash
# Single term
FT.SEARCH products "wireless"
# Multiple terms (AND by default)
FT.SEARCH products "wireless headphones"
# Phrase query
FT.SEARCH products "\"noise cancellation\""
```
### Field-Specific Queries
```bash
# Search in specific field
FT.SEARCH products "title:wireless"
# Multiple field queries
FT.SEARCH products "title:wireless AND description:bluetooth"
```
### Boolean Operators
```bash
# AND operator
FT.SEARCH products "wireless AND headphones"
# OR operator
FT.SEARCH products "wireless OR bluetooth"
# NOT operator
FT.SEARCH products "headphones NOT wired"
```
### Wildcards and Fuzzy Search
```bash
# Wildcard search
FT.SEARCH products "head*"
# Fuzzy search (approximate matching)
FT.SEARCH products "%headphone%"
```
### Range Queries
```bash
# Numeric range in query
FT.SEARCH products "@price:[100 300]"
# Date range
FT.SEARCH products "@created_date:[1640995200000 1672531200000]"
```
## Filtering and Sorting
### FILTER Clause
```bash
# Numeric filter
FT.SEARCH products "headphones" FILTER price 100 300
# Multiple filters
FT.SEARCH products "*" FILTER price 100 500 FILTER rating 4 5
```
### LIMIT Clause
```bash
# Pagination
FT.SEARCH products "wireless" LIMIT 0 10 # First 10 results
FT.SEARCH products "wireless" LIMIT 10 10 # Next 10 results
```
### RETURN Clause
```bash
# Return specific fields
FT.SEARCH products "*" RETURN title price
# Return all stored fields (default)
FT.SEARCH products "*"
```
## Performance Considerations
### Indexing Strategy
- Only index fields you need to search on
- Use `FAST` option for frequently filtered numeric fields
- Consider storage vs. search performance trade-offs
### Query Optimization
- Use specific field queries when possible
- Combine filters with text queries for better performance
- Use pagination with LIMIT for large result sets
### Memory Usage
- Tantivy indexes are memory-mapped for performance
- Index size depends on document count and field configuration
- Monitor disk space for index storage
## Integration with Redis Commands
Search indexes work alongside regular Redis data:
```bash
# Store product data in Redis hash
HSET product:1 title "Wireless Headphones" price "199.99"
# Index the same data for search
FT.ADD products product:1 FIELDS title "Wireless Headphones" price 199.99
# Search returns document IDs that can be used with Redis commands
FT.SEARCH products "wireless"
# Returns: product:1
# Retrieve full data using Redis
HGETALL product:1
```
## Example Use Cases
### E-commerce Product Search
```bash
# Create product catalog index
FT.CREATE catalog SCHEMA
name TEXT STORED INDEXED TOKENIZED
description TEXT INDEXED TOKENIZED
price NUMERIC STORED INDEXED FAST
category TAG STORED
brand TAG STORED
rating NUMERIC STORED FAST
# Add products
FT.ADD catalog prod:1 FIELDS name "iPhone 14" price 999 category "phones" brand "apple" rating 4.5
FT.ADD catalog prod:2 FIELDS name "Samsung Galaxy" price 899 category "phones" brand "samsung" rating 4.3
# Search queries
FT.SEARCH catalog "iPhone"
FT.SEARCH catalog "phones" FILTER price 800 1000
FT.SEARCH catalog "@brand:apple"
```
### Content Management
```bash
# Create content index
FT.CREATE content SCHEMA
title TEXT STORED INDEXED TOKENIZED
body TEXT INDEXED TOKENIZED
author TAG STORED
published DATE STORED INDEXED
tags TAG STORED
# Search content
FT.SEARCH content "machine learning"
FT.SEARCH content "@author:john AND @tags:ai"
FT.SEARCH content "*" FILTER published 1640995200000 1672531200000
```
### Geographic Search
```bash
# Create location-based index
FT.CREATE places SCHEMA
name TEXT STORED INDEXED TOKENIZED
location GEO STORED
type TAG STORED
# Add locations
FT.ADD places place:1 FIELDS name "Golden Gate Bridge" location "37.8199,-122.4783" type "landmark"
# Geographic queries (future feature)
FT.SEARCH places "@location:[37.7749 -122.4194 10 km]"
```
## Error Handling
Common error responses:
- `ERR index not found` - Index doesn't exist
- `ERR field not found` - Field not defined in schema
- `ERR invalid query syntax` - Malformed query
- `ERR document not found` - Document ID doesn't exist
## Best Practices
1. **Schema Design**: Plan your schema carefully - changes require reindexing
2. **Field Selection**: Only store and index fields you actually need
3. **Batch Operations**: Add multiple documents efficiently
4. **Query Testing**: Test queries for performance with realistic data
5. **Monitoring**: Monitor index size and query performance
6. **Backup**: Include search indexes in backup strategies
## Future Enhancements
Planned features:
- Geographic distance queries
- Advanced aggregations and faceting
- Highlighting of search results
- Synonyms and custom analyzers
- Real-time suggestions and autocomplete
- Index replication and sharding

356
examples/README.md Normal file
View File

@@ -0,0 +1,356 @@
# HeroDB Examples
This directory contains examples demonstrating HeroDB's capabilities including full-text search powered by Tantivy and vector database operations using Lance.
## Available Examples
1. **[Tantivy Search Demo](#tantivy-search-demo-bash-script)** - Full-text search capabilities
2. **[Lance Vector Database Demo](#lance-vector-database-demo-bash-script)** - Vector database and AI operations
3. **[AGE Encryption Demo](age_bash_demo.sh)** - Cryptographic operations
4. **[Simple Demo](simple_demo.sh)** - Basic Redis operations
---
## Lance Vector Database Demo (Bash Script)
### Overview
The `lance_vector_demo.sh` script provides a comprehensive demonstration of HeroDB's vector database capabilities using Lance. It showcases vector storage, similarity search, multimodal data handling, and AI-powered operations with external embedding services.
### Prerequisites
1. **HeroDB Server**: The server must be running (default port 6379)
2. **Redis CLI**: The `redis-cli` tool must be installed and available in your PATH
3. **Embedding Service** (optional): For full functionality, set up an external embedding service
### Running the Demo
#### Step 1: Start HeroDB Server
```bash
# From the project root directory
cargo run -- --dir ./test_data --port 6379
```
#### Step 2: Run the Demo (in a new terminal)
```bash
# From the project root directory
./examples/lance_vector_demo.sh
```
### What the Demo Covers
The script demonstrates comprehensive vector database operations:
1. **Dataset Management**
- Creating vector datasets with custom dimensions
- Defining schemas with metadata fields
- Listing and inspecting datasets
- Dataset information and statistics
2. **Embedding Operations**
- Text embedding generation via external services
- Multimodal embedding support (text + images)
- Batch embedding operations
3. **Data Storage**
- Storing text documents with automatic embedding
- Storing images with metadata
- Multimodal content storage
- Rich metadata support
4. **Vector Search**
- Similarity search with raw vectors
- Text-based semantic search
- Configurable search parameters (K, NPROBES, REFINE)
- Cross-modal search capabilities
5. **Index Management**
- Creating IVF_PQ indexes for performance
- Custom index parameters
- Performance optimization
6. **Advanced Features**
- Error handling and recovery
- Performance testing concepts
- Monitoring and maintenance
- Cleanup operations
### Key Lance Commands Demonstrated
#### Dataset Management
```bash
# Create vector dataset
LANCE CREATE documents DIM 384
# Create dataset with schema
LANCE CREATE products DIM 768 SCHEMA category:string price:float available:bool
# List datasets
LANCE LIST
# Get dataset information
LANCE INFO documents
```
#### Data Operations
```bash
# Store text with metadata
LANCE STORE documents TEXT "Machine learning tutorial" category "education" author "John Doe"
# Store image with metadata
LANCE STORE images IMAGE "base64_encoded_image..." filename "photo.jpg" tags "nature,landscape"
# Store multimodal content
LANCE STORE content TEXT "Product description" IMAGE "base64_image..." type "product"
```
#### Search Operations
```bash
# Search with raw vector
LANCE SEARCH documents VECTOR "0.1,0.2,0.3,0.4" K 5
# Semantic text search
LANCE SEARCH.TEXT documents "artificial intelligence" K 10 NPROBES 20
# Generate embeddings
LANCE EMBED.TEXT "Hello world" "Machine learning"
```
#### Index Management
```bash
# Create performance index
LANCE CREATE.INDEX documents IVF_PQ PARTITIONS 256 SUBVECTORS 16
# Drop dataset
LANCE DROP old_dataset
```
### Configuration
#### Setting Up Embedding Service
```bash
# Configure embedding service URL
redis-cli HSET config:core:aiembed url "http://your-embedding-service:8080/embed"
# Optional: Set authentication token
redis-cli HSET config:core:aiembed token "your-api-token"
```
#### Embedding Service API
Your embedding service should accept POST requests:
```json
{
"texts": ["text1", "text2"],
"images": ["base64_image1", "base64_image2"],
"model": "your-model-name"
}
```
And return responses:
```json
{
"embeddings": [[0.1, 0.2, ...], [0.3, 0.4, ...]],
"model": "model-name",
"usage": {"tokens": 100, "requests": 2}
}
```
### Interactive Features
The demo script includes:
- **Colored output** for better readability
- **Step-by-step execution** with explanations
- **Error handling** demonstrations
- **Automatic cleanup** options
- **Performance testing** concepts
- **Real-world usage** examples
### Use Cases Demonstrated
1. **Document Search System**
- Semantic document retrieval
- Metadata filtering
- Relevance ranking
2. **Image Similarity Search**
- Visual content matching
- Tag-based filtering
- Multimodal queries
3. **Product Recommendations**
- Feature-based similarity
- Category filtering
- Price range queries
4. **Content Management**
- Mixed media storage
- Cross-modal search
- Rich metadata support
---
## Tantivy Search Demo (Bash Script)
### Overview
The `tantivy_search_demo.sh` script provides a comprehensive demonstration of HeroDB's search functionality using Redis commands. It showcases various search scenarios including basic text search, filtering, sorting, geographic queries, and more.
### Prerequisites
1. **HeroDB Server**: The server must be running on port 6381
2. **Redis CLI**: The `redis-cli` tool must be installed and available in your PATH
### Running the Demo
#### Step 1: Start HeroDB Server
```bash
# From the project root directory
cargo run -- --port 6381
```
#### Step 2: Run the Demo (in a new terminal)
```bash
# From the project root directory
./examples/tantivy_search_demo.sh
```
### What the Demo Covers
The script demonstrates 15 different search scenarios:
1. **Index Creation** - Creating a search index with various field types
2. **Data Insertion** - Adding sample products to the index
3. **Basic Text Search** - Simple keyword searches
4. **Filtered Search** - Combining text search with category filters
5. **Numeric Range Search** - Finding products within price ranges
6. **Sorting Results** - Ordering results by different fields
7. **Limited Results** - Pagination and result limiting
8. **Complex Queries** - Multi-field searches with sorting
9. **Geographic Search** - Location-based queries
10. **Index Information** - Getting statistics about the search index
11. **Search Comparison** - Tantivy vs simple pattern matching
12. **Fuzzy Search** - Typo tolerance and approximate matching
13. **Phrase Search** - Exact phrase matching
14. **Boolean Queries** - AND, OR, NOT operators
15. **Cleanup** - Removing test data
### Sample Data
The demo uses a product catalog with the following fields:
- **title** (TEXT) - Product name with higher search weight
- **description** (TEXT) - Detailed product description
- **category** (TAG) - Comma-separated categories
- **price** (NUMERIC) - Product price for range queries
- **rating** (NUMERIC) - Customer rating for sorting
- **location** (GEO) - Geographic coordinates for location searches
### Key Redis Commands Demonstrated
#### Index Management
```bash
# Create search index
FT.CREATE product_catalog ON HASH PREFIX 1 product: SCHEMA title TEXT WEIGHT 2.0 SORTABLE description TEXT category TAG SEPARATOR , price NUMERIC SORTABLE rating NUMERIC SORTABLE location GEO
# Get index information
FT.INFO product_catalog
# Drop index
FT.DROPINDEX product_catalog
```
#### Search Queries
```bash
# Basic text search
FT.SEARCH product_catalog wireless
# Filtered search
FT.SEARCH product_catalog 'organic @category:{food}'
# Numeric range
FT.SEARCH product_catalog '@price:[50 150]'
# Sorted results
FT.SEARCH product_catalog '@category:{electronics}' SORTBY price ASC
# Geographic search
FT.SEARCH product_catalog '@location:[37.7749 -122.4194 50 km]'
# Boolean queries
FT.SEARCH product_catalog 'wireless AND audio'
FT.SEARCH product_catalog 'coffee OR tea'
# Phrase search
FT.SEARCH product_catalog '"noise canceling"'
```
### Interactive Features
The demo script includes:
- **Colored output** for better readability
- **Pause between steps** to review results
- **Error handling** with clear error messages
- **Automatic cleanup** of test data
- **Progress indicators** showing what each step demonstrates
### Troubleshooting
#### HeroDB Not Running
```
✗ HeroDB is not running on port 6381
Please start HeroDB with: cargo run -- --port 6381
```
**Solution**: Start the HeroDB server in a separate terminal.
#### Redis CLI Not Found
```
redis-cli: command not found
```
**Solution**: Install Redis tools or use an alternative Redis client.
#### Connection Refused
```
Could not connect to Redis at localhost:6381: Connection refused
```
**Solution**: Ensure HeroDB is running and listening on the correct port.
### Manual Testing
You can also run individual commands manually:
```bash
# Connect to HeroDB
redis-cli -h localhost -p 6381
# Create a simple index
FT.CREATE myindex ON HASH SCHEMA title TEXT description TEXT
# Add a document
HSET doc:1 title "Hello World" description "This is a test document"
# Search
FT.SEARCH myindex hello
```
### Performance Notes
- **Indexing**: Documents are indexed in real-time as they're added
- **Search Speed**: Full-text search is much faster than pattern matching on large datasets
- **Memory Usage**: Tantivy indexes are memory-efficient and disk-backed
- **Scalability**: Supports millions of documents with sub-second search times
### Advanced Features
The demo showcases advanced Tantivy features:
- **Relevance Scoring** - Results ranked by relevance
- **Fuzzy Matching** - Handles typos and approximate matches
- **Field Weighting** - Title field has higher search weight
- **Multi-field Search** - Search across multiple fields simultaneously
- **Geographic Queries** - Distance-based location searches
- **Numeric Ranges** - Efficient range queries on numeric fields
- **Tag Filtering** - Fast categorical filtering
### Next Steps
After running the demo, explore:
1. **Custom Schemas** - Define your own field types and configurations
2. **Large Datasets** - Test with thousands or millions of documents
3. **Real Applications** - Integrate search into your applications
4. **Performance Tuning** - Optimize for your specific use case
For more information, see the [search documentation](../herodb/docs/search.md).

426
examples/lance_vector_demo.sh Executable file
View File

@@ -0,0 +1,426 @@
#!/bin/bash
# Lance Vector Database Demo Script
# This script demonstrates all Lance vector database operations in HeroDB
set -e # Exit on any error
# Configuration
REDIS_HOST="localhost"
REDIS_PORT="6379"
REDIS_CLI="redis-cli -h $REDIS_HOST -p $REDIS_PORT"
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Helper functions
log_info() {
echo -e "${BLUE}[INFO]${NC} $1"
}
log_success() {
echo -e "${GREEN}[SUCCESS]${NC} $1"
}
log_warning() {
echo -e "${YELLOW}[WARNING]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
execute_command() {
local cmd="$1"
local description="$2"
echo
log_info "Executing: $description"
echo "Command: $cmd"
if result=$($cmd 2>&1); then
log_success "Result: $result"
else
log_error "Failed: $result"
return 1
fi
}
# Check if HeroDB is running
check_herodb() {
log_info "Checking if HeroDB is running..."
if ! $REDIS_CLI ping > /dev/null 2>&1; then
log_error "HeroDB is not running. Please start it first:"
echo " cargo run -- --dir ./test_data --port $REDIS_PORT"
exit 1
fi
log_success "HeroDB is running"
}
# Setup embedding service configuration
setup_embedding_service() {
log_info "Setting up embedding service configuration..."
# Note: This is a mock URL for demonstration
# In production, replace with your actual embedding service
execute_command \
"$REDIS_CLI HSET config:core:aiembed url 'http://localhost:8080/embed'" \
"Configure embedding service URL"
# Optional: Set authentication token
# execute_command \
# "$REDIS_CLI HSET config:core:aiembed token 'your-api-token'" \
# "Configure embedding service token"
log_warning "Note: Embedding service at http://localhost:8080/embed is not running."
log_warning "Some operations will fail, but this demonstrates the command structure."
}
# Dataset Management Operations
demo_dataset_management() {
echo
echo "=========================================="
echo " DATASET MANAGEMENT DEMO"
echo "=========================================="
# List datasets (should be empty initially)
execute_command \
"$REDIS_CLI LANCE LIST" \
"List all datasets (initially empty)"
# Create a simple dataset
execute_command \
"$REDIS_CLI LANCE CREATE documents DIM 384" \
"Create a simple document dataset with 384 dimensions"
# Create a dataset with schema
execute_command \
"$REDIS_CLI LANCE CREATE products DIM 768 SCHEMA category:string price:float available:bool description:string" \
"Create products dataset with custom schema"
# Create an image dataset
execute_command \
"$REDIS_CLI LANCE CREATE images DIM 512 SCHEMA filename:string tags:string width:int height:int" \
"Create images dataset for multimodal content"
# List datasets again
execute_command \
"$REDIS_CLI LANCE LIST" \
"List all datasets (should show 3 datasets)"
# Get info about datasets
execute_command \
"$REDIS_CLI LANCE INFO documents" \
"Get information about documents dataset"
execute_command \
"$REDIS_CLI LANCE INFO products" \
"Get information about products dataset"
}
# Embedding Operations
demo_embedding_operations() {
echo
echo "=========================================="
echo " EMBEDDING OPERATIONS DEMO"
echo "=========================================="
log_warning "The following operations will fail because no embedding service is running."
log_warning "This demonstrates the command structure and error handling."
# Try to embed text (will fail without embedding service)
execute_command \
"$REDIS_CLI LANCE EMBED.TEXT 'Hello world'" \
"Generate embedding for single text" || true
# Try to embed multiple texts
execute_command \
"$REDIS_CLI LANCE EMBED.TEXT 'Machine learning' 'Artificial intelligence' 'Deep learning'" \
"Generate embeddings for multiple texts" || true
}
# Data Storage Operations
demo_data_storage() {
echo
echo "=========================================="
echo " DATA STORAGE DEMO"
echo "=========================================="
log_warning "Storage operations will fail without embedding service, but show command structure."
# Store text documents
execute_command \
"$REDIS_CLI LANCE STORE documents TEXT 'Introduction to machine learning algorithms and their applications in modern AI systems' category 'education' author 'John Doe' difficulty 'beginner'" \
"Store a document with text and metadata" || true
execute_command \
"$REDIS_CLI LANCE STORE documents TEXT 'Deep learning neural networks for computer vision tasks' category 'research' author 'Jane Smith' difficulty 'advanced'" \
"Store another document" || true
# Store product information
execute_command \
"$REDIS_CLI LANCE STORE products TEXT 'High-performance laptop with 16GB RAM and SSD storage' category 'electronics' price '1299.99' available 'true'" \
"Store product with text description" || true
# Store image with metadata (using placeholder base64)
execute_command \
"$REDIS_CLI LANCE STORE images IMAGE 'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8/5+hHgAHggJ/PchI7wAAAABJRU5ErkJggg==' filename 'sample.png' tags 'test,demo' width '1' height '1'" \
"Store image with metadata (1x1 pixel PNG)" || true
# Store multimodal content
execute_command \
"$REDIS_CLI LANCE STORE images TEXT 'Beautiful sunset over mountains' IMAGE 'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8/5+hHgAHggJ/PchI7wAAAABJRU5ErkJggg==' filename 'sunset.png' tags 'nature,landscape' location 'California'" \
"Store multimodal content (text + image)" || true
}
# Search Operations
demo_search_operations() {
echo
echo "=========================================="
echo " SEARCH OPERATIONS DEMO"
echo "=========================================="
log_warning "Search operations will fail without data, but show command structure."
# Search with raw vector
execute_command \
"$REDIS_CLI LANCE SEARCH documents VECTOR '0.1,0.2,0.3,0.4,0.5' K 5" \
"Search with raw vector (5 results)" || true
# Search with vector and parameters
execute_command \
"$REDIS_CLI LANCE SEARCH documents VECTOR '0.1,0.2,0.3,0.4,0.5' K 10 NPROBES 20 REFINE 2" \
"Search with vector and advanced parameters" || true
# Text-based search
execute_command \
"$REDIS_CLI LANCE SEARCH.TEXT documents 'machine learning algorithms' K 5" \
"Search using text query" || true
# Text search with parameters
execute_command \
"$REDIS_CLI LANCE SEARCH.TEXT products 'laptop computer' K 3 NPROBES 10" \
"Search products using text with parameters" || true
# Search in image dataset
execute_command \
"$REDIS_CLI LANCE SEARCH.TEXT images 'sunset landscape' K 5" \
"Search images using text description" || true
}
# Index Management Operations
demo_index_management() {
echo
echo "=========================================="
echo " INDEX MANAGEMENT DEMO"
echo "=========================================="
# Create indexes for better search performance
execute_command \
"$REDIS_CLI LANCE CREATE.INDEX documents IVF_PQ" \
"Create default IVF_PQ index for documents"
execute_command \
"$REDIS_CLI LANCE CREATE.INDEX products IVF_PQ PARTITIONS 512 SUBVECTORS 32" \
"Create IVF_PQ index with custom parameters for products"
execute_command \
"$REDIS_CLI LANCE CREATE.INDEX images IVF_PQ PARTITIONS 256 SUBVECTORS 16" \
"Create IVF_PQ index for images dataset"
log_success "Indexes created successfully"
}
# Advanced Usage Examples
demo_advanced_usage() {
echo
echo "=========================================="
echo " ADVANCED USAGE EXAMPLES"
echo "=========================================="
# Create a specialized dataset for semantic search
execute_command \
"$REDIS_CLI LANCE CREATE semantic_search DIM 1536 SCHEMA title:string content:string url:string timestamp:string source:string" \
"Create dataset for semantic search with rich metadata"
# Demonstrate batch operations concept
log_info "Batch operations example (would store multiple items):"
echo " for doc in documents:"
echo " LANCE STORE semantic_search TEXT \"\$doc_content\" title \"\$title\" url \"\$url\""
# Show monitoring commands
log_info "Monitoring and maintenance commands:"
execute_command \
"$REDIS_CLI LANCE LIST" \
"List all datasets for monitoring"
# Show dataset statistics
for dataset in documents products images semantic_search; do
execute_command \
"$REDIS_CLI LANCE INFO $dataset" \
"Get statistics for $dataset" || true
done
}
# Cleanup Operations
demo_cleanup() {
echo
echo "=========================================="
echo " CLEANUP OPERATIONS DEMO"
echo "=========================================="
log_info "Demonstrating cleanup operations..."
# Drop individual datasets
execute_command \
"$REDIS_CLI LANCE DROP semantic_search" \
"Drop semantic_search dataset"
# List remaining datasets
execute_command \
"$REDIS_CLI LANCE LIST" \
"List remaining datasets"
# Ask user if they want to clean up all test data
echo
read -p "Do you want to clean up all test datasets? (y/N): " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]; then
execute_command \
"$REDIS_CLI LANCE DROP documents" \
"Drop documents dataset"
execute_command \
"$REDIS_CLI LANCE DROP products" \
"Drop products dataset"
execute_command \
"$REDIS_CLI LANCE DROP images" \
"Drop images dataset"
execute_command \
"$REDIS_CLI LANCE LIST" \
"Verify all datasets are cleaned up"
log_success "All test datasets cleaned up"
else
log_info "Keeping test datasets for further experimentation"
fi
}
# Error Handling Demo
demo_error_handling() {
echo
echo "=========================================="
echo " ERROR HANDLING DEMO"
echo "=========================================="
log_info "Demonstrating various error conditions..."
# Try to access non-existent dataset
execute_command \
"$REDIS_CLI LANCE INFO nonexistent_dataset" \
"Try to get info for non-existent dataset" || true
# Try to search non-existent dataset
execute_command \
"$REDIS_CLI LANCE SEARCH nonexistent_dataset VECTOR '0.1,0.2' K 5" \
"Try to search non-existent dataset" || true
# Try to drop non-existent dataset
execute_command \
"$REDIS_CLI LANCE DROP nonexistent_dataset" \
"Try to drop non-existent dataset" || true
# Try invalid vector format
execute_command \
"$REDIS_CLI LANCE SEARCH documents VECTOR 'invalid,vector,format' K 5" \
"Try search with invalid vector format" || true
log_info "Error handling demonstration complete"
}
# Performance Testing Demo
demo_performance_testing() {
echo
echo "=========================================="
echo " PERFORMANCE TESTING DEMO"
echo "=========================================="
log_info "Creating performance test dataset..."
execute_command \
"$REDIS_CLI LANCE CREATE perf_test DIM 128 SCHEMA batch_id:string item_id:string" \
"Create performance test dataset"
log_info "Performance testing would involve:"
echo " 1. Bulk loading thousands of vectors"
echo " 2. Creating indexes with different parameters"
echo " 3. Measuring search latency with various K values"
echo " 4. Testing different NPROBES settings"
echo " 5. Monitoring memory usage"
log_info "Example performance test commands:"
echo " # Test search speed with different parameters"
echo " time redis-cli LANCE SEARCH.TEXT perf_test 'query' K 10"
echo " time redis-cli LANCE SEARCH.TEXT perf_test 'query' K 10 NPROBES 50"
echo " time redis-cli LANCE SEARCH.TEXT perf_test 'query' K 100 NPROBES 100"
# Clean up performance test dataset
execute_command \
"$REDIS_CLI LANCE DROP perf_test" \
"Clean up performance test dataset"
}
# Main execution
main() {
echo "=========================================="
echo " LANCE VECTOR DATABASE DEMO SCRIPT"
echo "=========================================="
echo
echo "This script demonstrates all Lance vector database operations."
echo "Note: Some operations will fail without a running embedding service."
echo "This is expected and demonstrates error handling."
echo
# Check prerequisites
check_herodb
# Setup
setup_embedding_service
# Run demos
demo_dataset_management
demo_embedding_operations
demo_data_storage
demo_search_operations
demo_index_management
demo_advanced_usage
demo_error_handling
demo_performance_testing
# Cleanup
demo_cleanup
echo
echo "=========================================="
echo " DEMO COMPLETE"
echo "=========================================="
echo
log_success "Lance vector database demo completed successfully!"
echo
echo "Next steps:"
echo "1. Set up a real embedding service (OpenAI, Hugging Face, etc.)"
echo "2. Update the embedding service URL configuration"
echo "3. Try storing and searching real data"
echo "4. Experiment with different vector dimensions and index parameters"
echo "5. Build your AI-powered application!"
echo
echo "For more information, see docs/lance_vector_db.md"
}
# Run the demo
main "$@"

186
examples/simple_demo.sh Normal file
View File

@@ -0,0 +1,186 @@
#!/bin/bash
# Simple HeroDB Demo - Basic Redis Commands
# This script demonstrates basic Redis functionality that's currently implemented
set -e # Exit on any error
# Configuration
REDIS_HOST="localhost"
REDIS_PORT="6381"
REDIS_CLI="redis-cli -h $REDIS_HOST -p $REDIS_PORT"
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
BLUE='\033[0;34m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# Function to print colored output
print_header() {
echo -e "${BLUE}=== $1 ===${NC}"
}
print_success() {
echo -e "${GREEN}$1${NC}"
}
print_info() {
echo -e "${YELLOW} $1${NC}"
}
print_error() {
echo -e "${RED}$1${NC}"
}
# Function to check if HeroDB is running
check_herodb() {
print_info "Checking if HeroDB is running on port $REDIS_PORT..."
if ! $REDIS_CLI ping > /dev/null 2>&1; then
print_error "HeroDB is not running on port $REDIS_PORT"
print_info "Please start HeroDB with: cargo run -- --port $REDIS_PORT"
exit 1
fi
print_success "HeroDB is running and responding"
}
# Function to execute Redis command with error handling
execute_cmd() {
local cmd="$1"
local description="$2"
echo -e "${YELLOW}Command:${NC} $cmd"
if result=$($REDIS_CLI $cmd 2>&1); then
echo -e "${GREEN}Result:${NC} $result"
return 0
else
print_error "Failed: $description"
echo "Error: $result"
return 1
fi
}
# Main demo function
main() {
clear
print_header "HeroDB Basic Functionality Demo"
echo "This demo shows basic Redis commands that are currently implemented"
echo "HeroDB runs on port $REDIS_PORT (instead of Redis default 6379)"
echo
# Check if HeroDB is running
check_herodb
echo
print_header "Step 1: Basic Key-Value Operations"
execute_cmd "SET greeting 'Hello HeroDB!'" "Setting a simple key-value pair"
echo
execute_cmd "GET greeting" "Getting the value"
echo
execute_cmd "SET counter 42" "Setting a numeric value"
echo
execute_cmd "INCR counter" "Incrementing the counter"
echo
execute_cmd "GET counter" "Getting the incremented value"
echo
print_header "Step 2: Hash Operations"
execute_cmd "HSET user:1 name 'John Doe' email 'john@example.com' age 30" "Setting hash fields"
echo
execute_cmd "HGET user:1 name" "Getting a specific field"
echo
execute_cmd "HGETALL user:1" "Getting all fields"
echo
execute_cmd "HLEN user:1" "Getting hash length"
echo
print_header "Step 3: List Operations"
execute_cmd "LPUSH tasks 'Write code' 'Test code' 'Deploy code'" "Adding items to list"
echo
execute_cmd "LLEN tasks" "Getting list length"
echo
execute_cmd "LRANGE tasks 0 -1" "Getting all list items"
echo
execute_cmd "LPOP tasks" "Popping from left"
echo
execute_cmd "LRANGE tasks 0 -1" "Checking remaining items"
echo
print_header "Step 4: Key Management"
execute_cmd "KEYS *" "Listing all keys"
echo
execute_cmd "EXISTS greeting" "Checking if key exists"
echo
execute_cmd "TYPE user:1" "Getting key type"
echo
execute_cmd "DBSIZE" "Getting database size"
echo
print_header "Step 5: Expiration"
execute_cmd "SET temp_key 'temporary value'" "Setting temporary key"
echo
execute_cmd "EXPIRE temp_key 5" "Setting 5 second expiration"
echo
execute_cmd "TTL temp_key" "Checking time to live"
echo
print_info "Waiting 2 seconds..."
sleep 2
execute_cmd "TTL temp_key" "Checking TTL again"
echo
print_header "Step 6: Multiple Operations"
execute_cmd "MSET key1 'value1' key2 'value2' key3 'value3'" "Setting multiple keys"
echo
execute_cmd "MGET key1 key2 key3" "Getting multiple values"
echo
execute_cmd "DEL key1 key2" "Deleting multiple keys"
echo
execute_cmd "EXISTS key1 key2 key3" "Checking existence of multiple keys"
echo
print_header "Step 7: Search Commands (Placeholder)"
print_info "Testing FT.CREATE command (currently returns placeholder response)"
execute_cmd "FT.CREATE test_index SCHEMA title TEXT description TEXT" "Creating search index"
echo
print_header "Step 8: Server Information"
execute_cmd "INFO" "Getting server information"
echo
execute_cmd "CONFIG GET dir" "Getting configuration"
echo
print_header "Step 9: Cleanup"
execute_cmd "FLUSHDB" "Clearing database"
echo
execute_cmd "DBSIZE" "Confirming database is empty"
echo
print_header "Demo Summary"
echo "This demonstration showed:"
echo "• Basic key-value operations (GET, SET, INCR)"
echo "• Hash operations (HSET, HGET, HGETALL)"
echo "• List operations (LPUSH, LPOP, LRANGE)"
echo "• Key management (KEYS, EXISTS, TYPE, DEL)"
echo "• Expiration handling (EXPIRE, TTL)"
echo "• Multiple key operations (MSET, MGET)"
echo "• Server information commands"
echo
print_success "HeroDB basic functionality demo completed successfully!"
echo
print_info "Note: Full-text search (FT.*) commands are defined but not yet fully implemented"
print_info "To run HeroDB server: cargo run -- --port 6381"
print_info "To connect with redis-cli: redis-cli -h localhost -p 6381"
}
# Run the demo
main "$@"

View File

@@ -1,29 +0,0 @@
[package]
name = "herodb"
version = "0.0.1"
authors = ["Pin Fang <fpfangpin@hotmail.com>"]
edition = "2021"
[dependencies]
anyhow = "1.0.59"
bytes = "1.3.0"
thiserror = "1.0.32"
tokio = { version = "1.23.0", features = ["full"] }
clap = { version = "4.5.20", features = ["derive"] }
byteorder = "1.4.3"
futures = "0.3"
sled = "0.34"
redb = "2.1.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
bincode = "1.3"
chacha20poly1305 = "0.10.1"
rand = "0.8"
sha2 = "0.10"
age = "0.10"
secrecy = "0.8"
ed25519-dalek = "2"
base64 = "0.22"
[dev-dependencies]
redis = { version = "0.24", features = ["aio", "tokio-comp"] }

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,735 @@
Looking at your request to ensure embedding happens with an external embedding provider configured in Redis at `config:core:aiembed:url`, I'll modify the implementation to use an external embedding service instead of local models.
## Updated Implementation with External Embedding Provider
### 1. Update Dependencies in `Cargo.toml`
```toml
[dependencies]
# ... existing dependencies ...
lance = "0.18"
lance-index = "0.18"
lance-linalg = "0.18"
arrow = "52"
arrow-array = "52"
arrow-schema = "52"
parquet = "52"
uuid = { version = "1.10", features = ["v4"] }
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
base64 = "0.22"
image = "0.25"
```
### 2. Create Enhanced Lance Module with External Embedding
Create `src/lance_store.rs`:
```rust
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
use arrow::array::{Float32Array, StringArray, BinaryArray, ArrayRef};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use lance::dataset::{Dataset, WriteParams, WriteMode};
use lance::index::vector::VectorIndexParams;
use lance_index::vector::pq::PQBuildParams;
use lance_index::vector::ivf::IvfBuildParams;
use serde::{Deserialize, Serialize};
use crate::error::DBError;
use crate::cmd::Protocol;
#[derive(Debug, Serialize, Deserialize)]
struct EmbeddingRequest {
texts: Option<Vec<String>>,
images: Option<Vec<String>>, // base64 encoded
model: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct EmbeddingResponse {
embeddings: Vec<Vec<f32>>,
model: String,
usage: Option<HashMap<String, u32>>,
}
pub struct LanceStore {
datasets: Arc<RwLock<HashMap<String, Arc<Dataset>>>>,
data_dir: PathBuf,
http_client: reqwest::Client,
}
impl LanceStore {
pub async fn new(data_dir: PathBuf) -> Result<Self, DBError> {
// Create data directory if it doesn't exist
std::fs::create_dir_all(&data_dir)
.map_err(|e| DBError(format!("Failed to create Lance data directory: {}", e)))?;
let http_client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
.map_err(|e| DBError(format!("Failed to create HTTP client: {}", e)))?;
Ok(Self {
datasets: Arc::new(RwLock::new(HashMap::new())),
data_dir,
http_client,
})
}
/// Get embedding service URL from Redis config
async fn get_embedding_url(&self, server: &crate::server::Server) -> Result<String, DBError> {
// Get the embedding URL from Redis config
let key = "config:core:aiembed:url";
// Use HGET to retrieve the URL from Redis hash
let cmd = crate::cmd::Cmd::HGet {
key: key.to_string(),
field: "url".to_string(),
};
// Execute command to get the config
let result = cmd.run(server).await?;
match result {
Protocol::BulkString(url) => Ok(url),
Protocol::SimpleString(url) => Ok(url),
Protocol::Nil => Err(DBError(
"Embedding service URL not configured. Set it with: HSET config:core:aiembed:url url <YOUR_EMBEDDING_SERVICE_URL>".to_string()
)),
_ => Err(DBError("Invalid embedding URL configuration".to_string())),
}
}
/// Call external embedding service
async fn call_embedding_service(
&self,
server: &crate::server::Server,
texts: Option<Vec<String>>,
images: Option<Vec<String>>,
) -> Result<Vec<Vec<f32>>, DBError> {
let url = self.get_embedding_url(server).await?;
let request = EmbeddingRequest {
texts,
images,
model: None, // Let the service use its default
};
let response = self.http_client
.post(&url)
.json(&request)
.send()
.await
.map_err(|e| DBError(format!("Failed to call embedding service: {}", e)))?;
if !response.status().is_success() {
let status = response.status();
let error_text = response.text().await.unwrap_or_default();
return Err(DBError(format!(
"Embedding service returned error {}: {}",
status, error_text
)));
}
let embedding_response: EmbeddingResponse = response
.json()
.await
.map_err(|e| DBError(format!("Failed to parse embedding response: {}", e)))?;
Ok(embedding_response.embeddings)
}
pub async fn embed_text(
&self,
server: &crate::server::Server,
texts: Vec<String>
) -> Result<Vec<Vec<f32>>, DBError> {
if texts.is_empty() {
return Ok(Vec::new());
}
self.call_embedding_service(server, Some(texts), None).await
}
pub async fn embed_image(
&self,
server: &crate::server::Server,
image_bytes: Vec<u8>
) -> Result<Vec<f32>, DBError> {
// Convert image bytes to base64
let base64_image = base64::encode(&image_bytes);
let embeddings = self.call_embedding_service(
server,
None,
Some(vec![base64_image])
).await?;
embeddings.into_iter()
.next()
.ok_or_else(|| DBError("No embedding returned for image".to_string()))
}
pub async fn create_dataset(
&self,
name: &str,
schema: Schema,
) -> Result<(), DBError> {
let dataset_path = self.data_dir.join(format!("{}.lance", name));
// Create empty dataset with schema
let write_params = WriteParams {
mode: WriteMode::Create,
..Default::default()
};
// Create an empty RecordBatch with the schema
let empty_batch = RecordBatch::new_empty(Arc::new(schema));
let batches = vec![empty_batch];
let dataset = Dataset::write(
batches,
dataset_path.to_str().unwrap(),
Some(write_params)
).await
.map_err(|e| DBError(format!("Failed to create dataset: {}", e)))?;
let mut datasets = self.datasets.write().await;
datasets.insert(name.to_string(), Arc::new(dataset));
Ok(())
}
pub async fn write_vectors(
&self,
dataset_name: &str,
vectors: Vec<Vec<f32>>,
metadata: Option<HashMap<String, Vec<String>>>,
) -> Result<usize, DBError> {
let dataset_path = self.data_dir.join(format!("{}.lance", dataset_name));
// Open or get cached dataset
let dataset = self.get_or_open_dataset(dataset_name).await?;
// Build RecordBatch
let num_vectors = vectors.len();
if num_vectors == 0 {
return Ok(0);
}
let dim = vectors.first()
.ok_or_else(|| DBError("Empty vectors".to_string()))?
.len();
// Flatten vectors
let flat_vectors: Vec<f32> = vectors.into_iter().flatten().collect();
let vector_array = Float32Array::from(flat_vectors);
let vector_array = arrow::array::FixedSizeListArray::try_new_from_values(
vector_array,
dim as i32
).map_err(|e| DBError(format!("Failed to create vector array: {}", e)))?;
let mut arrays: Vec<ArrayRef> = vec![Arc::new(vector_array)];
let mut fields = vec![Field::new(
"vector",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
dim as i32
),
false
)];
// Add metadata columns if provided
if let Some(metadata) = metadata {
for (key, values) in metadata {
if values.len() != num_vectors {
return Err(DBError(format!(
"Metadata field '{}' has {} values but expected {}",
key, values.len(), num_vectors
)));
}
let array = StringArray::from(values);
arrays.push(Arc::new(array));
fields.push(Field::new(&key, DataType::Utf8, true));
}
}
let schema = Arc::new(Schema::new(fields));
let batch = RecordBatch::try_new(schema, arrays)
.map_err(|e| DBError(format!("Failed to create RecordBatch: {}", e)))?;
// Append to dataset
let write_params = WriteParams {
mode: WriteMode::Append,
..Default::default()
};
Dataset::write(
vec![batch],
dataset_path.to_str().unwrap(),
Some(write_params)
).await
.map_err(|e| DBError(format!("Failed to write to dataset: {}", e)))?;
// Refresh cached dataset
let mut datasets = self.datasets.write().await;
datasets.remove(dataset_name);
Ok(num_vectors)
}
pub async fn search_vectors(
&self,
dataset_name: &str,
query_vector: Vec<f32>,
k: usize,
nprobes: Option<usize>,
refine_factor: Option<usize>,
) -> Result<Vec<(f32, HashMap<String, String>)>, DBError> {
let dataset = self.get_or_open_dataset(dataset_name).await?;
// Build query
let mut query = dataset.scan();
query = query.nearest(
"vector",
&query_vector,
k,
).map_err(|e| DBError(format!("Failed to build search query: {}", e)))?;
if let Some(nprobes) = nprobes {
query = query.nprobes(nprobes);
}
if let Some(refine) = refine_factor {
query = query.refine_factor(refine);
}
// Execute search
let results = query
.try_into_stream()
.await
.map_err(|e| DBError(format!("Failed to execute search: {}", e)))?
.try_collect::<Vec<_>>()
.await
.map_err(|e| DBError(format!("Failed to collect results: {}", e)))?;
// Process results
let mut output = Vec::new();
for batch in results {
// Get distances
let distances = batch
.column_by_name("_distance")
.ok_or_else(|| DBError("No distance column".to_string()))?
.as_any()
.downcast_ref::<Float32Array>()
.ok_or_else(|| DBError("Invalid distance type".to_string()))?;
// Get metadata
for i in 0..batch.num_rows() {
let distance = distances.value(i);
let mut metadata = HashMap::new();
for field in batch.schema().fields() {
if field.name() != "vector" && field.name() != "_distance" {
if let Some(col) = batch.column_by_name(field.name()) {
if let Some(str_array) = col.as_any().downcast_ref::<StringArray>() {
if !str_array.is_null(i) {
metadata.insert(
field.name().to_string(),
str_array.value(i).to_string()
);
}
}
}
}
}
output.push((distance, metadata));
}
}
Ok(output)
}
pub async fn store_multimodal(
&self,
server: &crate::server::Server,
dataset_name: &str,
text: Option<String>,
image_bytes: Option<Vec<u8>>,
metadata: HashMap<String, String>,
) -> Result<String, DBError> {
// Generate ID
let id = uuid::Uuid::new_v4().to_string();
// Generate embeddings using external service
let embedding = if let Some(text) = text.as_ref() {
self.embed_text(server, vec![text.clone()]).await?
.into_iter()
.next()
.ok_or_else(|| DBError("No embedding returned".to_string()))?
} else if let Some(img) = image_bytes.as_ref() {
self.embed_image(server, img.clone()).await?
} else {
return Err(DBError("No text or image provided".to_string()));
};
// Prepare metadata
let mut full_metadata = metadata;
full_metadata.insert("id".to_string(), id.clone());
if let Some(text) = text {
full_metadata.insert("text".to_string(), text);
}
if let Some(img) = image_bytes {
full_metadata.insert("image_base64".to_string(), base64::encode(img));
}
// Convert metadata to column vectors
let mut metadata_cols = HashMap::new();
for (key, value) in full_metadata {
metadata_cols.insert(key, vec![value]);
}
// Write to dataset
self.write_vectors(dataset_name, vec![embedding], Some(metadata_cols)).await?;
Ok(id)
}
pub async fn search_with_text(
&self,
server: &crate::server::Server,
dataset_name: &str,
query_text: String,
k: usize,
nprobes: Option<usize>,
refine_factor: Option<usize>,
) -> Result<Vec<(f32, HashMap<String, String>)>, DBError> {
// Embed the query text using external service
let embeddings = self.embed_text(server, vec![query_text]).await?;
let query_vector = embeddings.into_iter()
.next()
.ok_or_else(|| DBError("No embedding returned for query".to_string()))?;
// Search with the embedding
self.search_vectors(dataset_name, query_vector, k, nprobes, refine_factor).await
}
pub async fn create_index(
&self,
dataset_name: &str,
index_type: &str,
num_partitions: Option<usize>,
num_sub_vectors: Option<usize>,
) -> Result<(), DBError> {
let dataset = self.get_or_open_dataset(dataset_name).await?;
let mut params = VectorIndexParams::default();
match index_type.to_uppercase().as_str() {
"IVF_PQ" => {
params.ivf = IvfBuildParams {
num_partitions: num_partitions.unwrap_or(256),
..Default::default()
};
params.pq = PQBuildParams {
num_sub_vectors: num_sub_vectors.unwrap_or(16),
..Default::default()
};
}
_ => return Err(DBError(format!("Unsupported index type: {}", index_type))),
}
dataset.create_index(
&["vector"],
lance::index::IndexType::Vector,
None,
&params,
true
).await
.map_err(|e| DBError(format!("Failed to create index: {}", e)))?;
Ok(())
}
async fn get_or_open_dataset(&self, name: &str) -> Result<Arc<Dataset>, DBError> {
let mut datasets = self.datasets.write().await;
if let Some(dataset) = datasets.get(name) {
return Ok(dataset.clone());
}
let dataset_path = self.data_dir.join(format!("{}.lance", name));
if !dataset_path.exists() {
return Err(DBError(format!("Dataset '{}' does not exist", name)));
}
let dataset = Dataset::open(dataset_path.to_str().unwrap())
.await
.map_err(|e| DBError(format!("Failed to open dataset: {}", e)))?;
let dataset = Arc::new(dataset);
datasets.insert(name.to_string(), dataset.clone());
Ok(dataset)
}
pub async fn list_datasets(&self) -> Result<Vec<String>, DBError> {
let mut datasets = Vec::new();
let entries = std::fs::read_dir(&self.data_dir)
.map_err(|e| DBError(format!("Failed to read data directory: {}", e)))?;
for entry in entries {
let entry = entry.map_err(|e| DBError(format!("Failed to read entry: {}", e)))?;
let path = entry.path();
if path.is_dir() {
if let Some(name) = path.file_name() {
if let Some(name_str) = name.to_str() {
if name_str.ends_with(".lance") {
let dataset_name = name_str.trim_end_matches(".lance");
datasets.push(dataset_name.to_string());
}
}
}
}
}
Ok(datasets)
}
pub async fn drop_dataset(&self, name: &str) -> Result<(), DBError> {
// Remove from cache
let mut datasets = self.datasets.write().await;
datasets.remove(name);
// Delete from disk
let dataset_path = self.data_dir.join(format!("{}.lance", name));
if dataset_path.exists() {
std::fs::remove_dir_all(dataset_path)
.map_err(|e| DBError(format!("Failed to delete dataset: {}", e)))?;
}
Ok(())
}
pub async fn get_dataset_info(&self, name: &str) -> Result<HashMap<String, String>, DBError> {
let dataset = self.get_or_open_dataset(name).await?;
let mut info = HashMap::new();
info.insert("name".to_string(), name.to_string());
info.insert("version".to_string(), dataset.version().to_string());
info.insert("num_rows".to_string(), dataset.count_rows().await?.to_string());
// Get schema info
let schema = dataset.schema();
let fields: Vec<String> = schema.fields()
.iter()
.map(|f| format!("{}:{}", f.name(), f.data_type()))
.collect();
info.insert("schema".to_string(), fields.join(", "));
Ok(info)
}
}
```
### 3. Update Command Implementations
Update the command implementations to pass the server reference for embedding service access:
```rust
// In cmd.rs, update the lance command implementations
async fn lance_store_cmd(
server: &Server,
dataset: &str,
text: Option<String>,
image_base64: Option<String>,
metadata: HashMap<String, String>,
) -> Result<Protocol, DBError> {
let lance_store = server.lance_store()?;
// Decode image if provided
let image_bytes = if let Some(b64) = image_base64 {
Some(base64::decode(b64).map_err(|e|
DBError(format!("Invalid base64 image: {}", e)))?)
} else {
None
};
// Pass server reference for embedding service access
let id = lance_store.store_multimodal(
server, // Pass server to access Redis config
dataset,
text,
image_bytes,
metadata,
).await?;
Ok(Protocol::BulkString(id))
}
async fn lance_embed_text_cmd(
server: &Server,
texts: &[String],
) -> Result<Protocol, DBError> {
let lance_store = server.lance_store()?;
// Pass server reference for embedding service access
let embeddings = lance_store.embed_text(server, texts.to_vec()).await?;
// Return as array of vectors
let mut output = Vec::new();
for embedding in embeddings {
let vector_str = format!("[{}]",
embedding.iter()
.map(|f| f.to_string())
.collect::<Vec<_>>()
.join(",")
);
output.push(Protocol::BulkString(vector_str));
}
Ok(Protocol::Array(output))
}
async fn lance_search_text_cmd(
server: &Server,
dataset: &str,
query_text: &str,
k: usize,
nprobes: Option<usize>,
refine_factor: Option<usize>,
) -> Result<Protocol, DBError> {
let lance_store = server.lance_store()?;
// Search using text query (will be embedded automatically)
let results = lance_store.search_with_text(
server,
dataset,
query_text.to_string(),
k,
nprobes,
refine_factor,
).await?;
// Format results
let mut output = Vec::new();
for (distance, metadata) in results {
let metadata_json = serde_json::to_string(&metadata)
.unwrap_or_else(|_| "{}".to_string());
output.push(Protocol::Array(vec![
Protocol::BulkString(distance.to_string()),
Protocol::BulkString(metadata_json),
]));
}
Ok(Protocol::Array(output))
}
// Add new command for text-based search
pub enum Cmd {
// ... existing commands ...
LanceSearchText {
dataset: String,
query_text: String,
k: usize,
nprobes: Option<usize>,
refine_factor: Option<usize>,
},
}
```
## Usage Examples
### 1. Configure the Embedding Service
First, users need to configure the embedding service URL:
```bash
# Configure the embedding service endpoint
redis-cli> HSET config:core:aiembed:url url "http://localhost:8000/embeddings"
OK
# Or use a cloud service
redis-cli> HSET config:core:aiembed:url url "https://api.openai.com/v1/embeddings"
OK
```
### 2. Use Lance Commands with Automatic External Embedding
```bash
# Create a dataset
redis-cli> LANCE.CREATE products DIM 1536 SCHEMA name:string price:float category:string
OK
# Store text with automatic embedding (calls external service)
redis-cli> LANCE.STORE products TEXT "Wireless noise-canceling headphones with 30-hour battery" name:AirPods price:299.99 category:Electronics
"uuid-123-456"
# Search using text query (automatically embeds the query)
redis-cli> LANCE.SEARCH.TEXT products "best headphones for travel" K 5
1) "0.92"
2) "{\"id\":\"uuid-123\",\"name\":\"AirPods\",\"price\":\"299.99\"}"
# Get embeddings directly
redis-cli> LANCE.EMBED.TEXT "This text will be embedded"
1) "[0.123, 0.456, 0.789, ...]"
```
## External Embedding Service API Specification
The external embedding service should accept POST requests with this format:
```json
// Request
{
"texts": ["text1", "text2"], // Optional
"images": ["base64_img1"], // Optional
"model": "text-embedding-ada-002" // Optional
}
// Response
{
"embeddings": [[0.1, 0.2, ...], [0.3, 0.4, ...]],
"model": "text-embedding-ada-002",
"usage": {
"prompt_tokens": 100,
"total_tokens": 100
}
}
```
## Error Handling
The implementation includes comprehensive error handling:
1. **Missing Configuration**: Clear error message if embedding URL not configured
2. **Service Failures**: Graceful handling of embedding service errors
3. **Timeout Protection**: 30-second timeout for embedding requests
4. **Retry Logic**: Could be added for resilience
## Benefits of This Approach
1. **Flexibility**: Supports any embedding service with compatible API
2. **Cost Control**: Use your preferred embedding provider
3. **Scalability**: Embedding service can be scaled independently
4. **Consistency**: All embeddings use the same configured service
5. **Security**: API keys and endpoints stored securely in Redis
This implementation ensures that all embedding operations go through the external service configured in Redis, providing a clean separation between the vector database functionality and the embedding generation.
TODO EXTRA:
- secret for the embedding service API key

View File

@@ -1,6 +1,8 @@
use crate::{error::DBError, protocol::Protocol, server::Server};
use tokio::time::{timeout, Duration};
use futures::future::select_all;
use std::sync::Arc;
use base64::Engine;
#[derive(Debug, Clone)]
pub enum Cmd {
@@ -84,6 +86,49 @@ pub enum Cmd {
AgeSignName(String, String), // name, message
AgeVerifyName(String, String, String), // name, message, signature_b64
AgeList,
// Lance vector database commands
LanceCreate {
dataset: String,
dim: usize,
schema: Vec<(String, String)>, // field_name, field_type pairs
},
LanceStore {
dataset: String,
text: Option<String>,
image_base64: Option<String>,
metadata: std::collections::HashMap<String, String>,
},
LanceSearch {
dataset: String,
vector: Vec<f32>,
k: usize,
nprobes: Option<usize>,
refine_factor: Option<usize>,
},
LanceSearchText {
dataset: String,
query_text: String,
k: usize,
nprobes: Option<usize>,
refine_factor: Option<usize>,
},
LanceEmbedText {
texts: Vec<String>,
},
LanceCreateIndex {
dataset: String,
index_type: String,
num_partitions: Option<usize>,
num_sub_vectors: Option<usize>,
},
LanceList,
LanceDrop {
dataset: String,
},
LanceInfo {
dataset: String,
},
}
impl Cmd {
@@ -616,6 +661,237 @@ impl Cmd {
_ => return Err(DBError(format!("unsupported AGE subcommand {:?}", cmd))),
}
}
"lance" => {
if cmd.len() < 2 {
return Err(DBError("wrong number of arguments for LANCE".to_string()));
}
match cmd[1].to_lowercase().as_str() {
"create" => {
if cmd.len() < 4 {
return Err(DBError("LANCE CREATE <dataset> DIM <dimension> [SCHEMA field:type ...]".to_string()));
}
let dataset = cmd[2].clone();
// Parse DIM parameter
if cmd[3].to_lowercase() != "dim" {
return Err(DBError("Expected DIM after dataset name".to_string()));
}
if cmd.len() < 5 {
return Err(DBError("Missing dimension value".to_string()));
}
let dim = cmd[4].parse::<usize>().map_err(|_| DBError("Invalid dimension value".to_string()))?;
// Parse optional SCHEMA
let mut schema = Vec::new();
let mut i = 5;
if i < cmd.len() && cmd[i].to_lowercase() == "schema" {
i += 1;
while i < cmd.len() {
let field_spec = &cmd[i];
let parts: Vec<&str> = field_spec.split(':').collect();
if parts.len() != 2 {
return Err(DBError("Schema fields must be in format field:type".to_string()));
}
schema.push((parts[0].to_string(), parts[1].to_string()));
i += 1;
}
}
Cmd::LanceCreate { dataset, dim, schema }
}
"store" => {
if cmd.len() < 3 {
return Err(DBError("LANCE STORE <dataset> [TEXT <text>] [IMAGE <base64>] [metadata...]".to_string()));
}
let dataset = cmd[2].clone();
let mut text = None;
let mut image_base64 = None;
let mut metadata = std::collections::HashMap::new();
let mut i = 3;
while i < cmd.len() {
match cmd[i].to_lowercase().as_str() {
"text" => {
if i + 1 >= cmd.len() {
return Err(DBError("TEXT requires a value".to_string()));
}
text = Some(cmd[i + 1].clone());
i += 2;
}
"image" => {
if i + 1 >= cmd.len() {
return Err(DBError("IMAGE requires a base64 value".to_string()));
}
image_base64 = Some(cmd[i + 1].clone());
i += 2;
}
_ => {
// Parse as metadata key:value
if i + 1 >= cmd.len() {
return Err(DBError("Metadata requires key value pairs".to_string()));
}
metadata.insert(cmd[i].clone(), cmd[i + 1].clone());
i += 2;
}
}
}
Cmd::LanceStore { dataset, text, image_base64, metadata }
}
"search" => {
if cmd.len() < 5 {
return Err(DBError("LANCE SEARCH <dataset> VECTOR <vector> K <k> [NPROBES <n>] [REFINE <r>]".to_string()));
}
let dataset = cmd[2].clone();
if cmd[3].to_lowercase() != "vector" {
return Err(DBError("Expected VECTOR after dataset name".to_string()));
}
// Parse vector - expect comma-separated floats in brackets or just comma-separated
let vector_str = &cmd[4];
let vector_str = vector_str.trim_start_matches('[').trim_end_matches(']');
let vector: Result<Vec<f32>, _> = vector_str
.split(',')
.map(|s| s.trim().parse::<f32>())
.collect();
let vector = vector.map_err(|_| DBError("Invalid vector format".to_string()))?;
if cmd.len() < 7 || cmd[5].to_lowercase() != "k" {
return Err(DBError("Expected K after vector".to_string()));
}
let k = cmd[6].parse::<usize>().map_err(|_| DBError("Invalid K value".to_string()))?;
let mut nprobes = None;
let mut refine_factor = None;
let mut i = 7;
while i < cmd.len() {
match cmd[i].to_lowercase().as_str() {
"nprobes" => {
if i + 1 >= cmd.len() {
return Err(DBError("NPROBES requires a value".to_string()));
}
nprobes = Some(cmd[i + 1].parse::<usize>().map_err(|_| DBError("Invalid NPROBES value".to_string()))?);
i += 2;
}
"refine" => {
if i + 1 >= cmd.len() {
return Err(DBError("REFINE requires a value".to_string()));
}
refine_factor = Some(cmd[i + 1].parse::<usize>().map_err(|_| DBError("Invalid REFINE value".to_string()))?);
i += 2;
}
_ => {
return Err(DBError(format!("Unknown parameter: {}", cmd[i])));
}
}
}
Cmd::LanceSearch { dataset, vector, k, nprobes, refine_factor }
}
"search.text" => {
if cmd.len() < 6 {
return Err(DBError("LANCE SEARCH.TEXT <dataset> <query_text> K <k> [NPROBES <n>] [REFINE <r>]".to_string()));
}
let dataset = cmd[2].clone();
let query_text = cmd[3].clone();
if cmd[4].to_lowercase() != "k" {
return Err(DBError("Expected K after query text".to_string()));
}
let k = cmd[5].parse::<usize>().map_err(|_| DBError("Invalid K value".to_string()))?;
let mut nprobes = None;
let mut refine_factor = None;
let mut i = 6;
while i < cmd.len() {
match cmd[i].to_lowercase().as_str() {
"nprobes" => {
if i + 1 >= cmd.len() {
return Err(DBError("NPROBES requires a value".to_string()));
}
nprobes = Some(cmd[i + 1].parse::<usize>().map_err(|_| DBError("Invalid NPROBES value".to_string()))?);
i += 2;
}
"refine" => {
if i + 1 >= cmd.len() {
return Err(DBError("REFINE requires a value".to_string()));
}
refine_factor = Some(cmd[i + 1].parse::<usize>().map_err(|_| DBError("Invalid REFINE value".to_string()))?);
i += 2;
}
_ => {
return Err(DBError(format!("Unknown parameter: {}", cmd[i])));
}
}
}
Cmd::LanceSearchText { dataset, query_text, k, nprobes, refine_factor }
}
"embed.text" => {
if cmd.len() < 3 {
return Err(DBError("LANCE EMBED.TEXT <text1> [text2] ...".to_string()));
}
let texts = cmd[2..].to_vec();
Cmd::LanceEmbedText { texts }
}
"create.index" => {
if cmd.len() < 5 {
return Err(DBError("LANCE CREATE.INDEX <dataset> <index_type> [PARTITIONS <n>] [SUBVECTORS <n>]".to_string()));
}
let dataset = cmd[2].clone();
let index_type = cmd[3].clone();
let mut num_partitions = None;
let mut num_sub_vectors = None;
let mut i = 4;
while i < cmd.len() {
match cmd[i].to_lowercase().as_str() {
"partitions" => {
if i + 1 >= cmd.len() {
return Err(DBError("PARTITIONS requires a value".to_string()));
}
num_partitions = Some(cmd[i + 1].parse::<usize>().map_err(|_| DBError("Invalid PARTITIONS value".to_string()))?);
i += 2;
}
"subvectors" => {
if i + 1 >= cmd.len() {
return Err(DBError("SUBVECTORS requires a value".to_string()));
}
num_sub_vectors = Some(cmd[i + 1].parse::<usize>().map_err(|_| DBError("Invalid SUBVECTORS value".to_string()))?);
i += 2;
}
_ => {
return Err(DBError(format!("Unknown parameter: {}", cmd[i])));
}
}
}
Cmd::LanceCreateIndex { dataset, index_type, num_partitions, num_sub_vectors }
}
"list" => {
if cmd.len() != 2 {
return Err(DBError("LANCE LIST takes no arguments".to_string()));
}
Cmd::LanceList
}
"drop" => {
if cmd.len() != 3 {
return Err(DBError("LANCE DROP <dataset>".to_string()));
}
let dataset = cmd[2].clone();
Cmd::LanceDrop { dataset }
}
"info" => {
if cmd.len() != 3 {
return Err(DBError("LANCE INFO <dataset>".to_string()));
}
let dataset = cmd[2].clone();
Cmd::LanceInfo { dataset }
}
_ => return Err(DBError(format!("unsupported LANCE subcommand {:?}", cmd))),
}
}
_ => Cmd::Unknow(cmd[0].clone()),
},
protocol,
@@ -730,6 +1006,18 @@ impl Cmd {
Cmd::AgeSignName(name, message) => Ok(crate::age::cmd_age_sign_name(server, &name, &message).await),
Cmd::AgeVerifyName(name, message, sig_b64) => Ok(crate::age::cmd_age_verify_name(server, &name, &message, &sig_b64).await),
Cmd::AgeList => Ok(crate::age::cmd_age_list(server).await),
// Lance vector database commands
Cmd::LanceCreate { dataset, dim, schema } => lance_create_cmd(server, &dataset, dim, &schema).await,
Cmd::LanceStore { dataset, text, image_base64, metadata } => lance_store_cmd(server, &dataset, text.as_deref(), image_base64.as_deref(), &metadata).await,
Cmd::LanceSearch { dataset, vector, k, nprobes, refine_factor } => lance_search_cmd(server, &dataset, &vector, k, nprobes, refine_factor).await,
Cmd::LanceSearchText { dataset, query_text, k, nprobes, refine_factor } => lance_search_text_cmd(server, &dataset, &query_text, k, nprobes, refine_factor).await,
Cmd::LanceEmbedText { texts } => lance_embed_text_cmd(server, &texts).await,
Cmd::LanceCreateIndex { dataset, index_type, num_partitions, num_sub_vectors } => lance_create_index_cmd(server, &dataset, &index_type, num_partitions, num_sub_vectors).await,
Cmd::LanceList => lance_list_cmd(server).await,
Cmd::LanceDrop { dataset } => lance_drop_cmd(server, &dataset).await,
Cmd::LanceInfo { dataset } => lance_info_cmd(server, &dataset).await,
Cmd::Unknow(s) => Ok(Protocol::err(&format!("ERR unknown command `{}`", s))),
}
}
@@ -1513,3 +1801,243 @@ fn command_cmd(args: &[String]) -> Result<Protocol, DBError> {
_ => Ok(Protocol::Array(vec![])),
}
}
// Helper function to create Arrow schema from field specifications
fn create_schema_from_fields(dim: usize, fields: &[(String, String)]) -> arrow::datatypes::Schema {
let mut schema_fields = Vec::new();
// Always add the vector field first
let vector_field = arrow::datatypes::Field::new(
"vector",
arrow::datatypes::DataType::FixedSizeList(
Arc::new(arrow::datatypes::Field::new("item", arrow::datatypes::DataType::Float32, true)),
dim as i32
),
false
);
schema_fields.push(vector_field);
// Add custom fields
for (name, field_type) in fields {
let data_type = match field_type.to_lowercase().as_str() {
"string" | "text" => arrow::datatypes::DataType::Utf8,
"int" | "integer" => arrow::datatypes::DataType::Int64,
"float" => arrow::datatypes::DataType::Float64,
"bool" | "boolean" => arrow::datatypes::DataType::Boolean,
_ => arrow::datatypes::DataType::Utf8, // Default to string
};
schema_fields.push(arrow::datatypes::Field::new(name, data_type, true));
}
arrow::datatypes::Schema::new(schema_fields)
}
// Lance vector database command implementations
async fn lance_create_cmd(
server: &Server,
dataset: &str,
dim: usize,
schema: &[(String, String)],
) -> Result<Protocol, DBError> {
match server.lance_store() {
Ok(lance_store) => {
match lance_store.create_dataset(dataset, create_schema_from_fields(dim, schema)).await {
Ok(_) => Ok(Protocol::SimpleString("OK".to_string())),
Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR {}", e)))),
}
}
Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR Lance store not available: {}", e)))),
}
}
async fn lance_store_cmd(
server: &Server,
dataset: &str,
text: Option<&str>,
image_base64: Option<&str>,
metadata: &std::collections::HashMap<String, String>,
) -> Result<Protocol, DBError> {
match server.lance_store() {
Ok(lance_store) => {
match lance_store.store_multimodal(server, dataset, text.map(|s| s.to_string()),
image_base64.and_then(|s| base64::engine::general_purpose::STANDARD.decode(s).ok()),
metadata.clone()).await {
Ok(id) => Ok(Protocol::BulkString(id)),
Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR {}", e)))),
}
}
Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR Lance store not available: {}", e)))),
}
}
async fn lance_search_cmd(
server: &Server,
dataset: &str,
vector: &[f32],
k: usize,
nprobes: Option<usize>,
refine_factor: Option<usize>,
) -> Result<Protocol, DBError> {
match server.lance_store() {
Ok(lance_store) => {
match lance_store.search_vectors(dataset, vector.to_vec(), k, nprobes, refine_factor).await {
Ok(results) => {
let mut response = Vec::new();
for (distance, metadata) in results {
let mut item = Vec::new();
item.push(Protocol::BulkString("distance".to_string()));
item.push(Protocol::BulkString(distance.to_string()));
for (key, value) in metadata {
item.push(Protocol::BulkString(key));
item.push(Protocol::BulkString(value));
}
response.push(Protocol::Array(item));
}
Ok(Protocol::Array(response))
}
Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR {}", e)))),
}
}
Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR Lance store not available: {}", e)))),
}
}
async fn lance_search_text_cmd(
server: &Server,
dataset: &str,
query_text: &str,
k: usize,
nprobes: Option<usize>,
refine_factor: Option<usize>,
) -> Result<Protocol, DBError> {
match server.lance_store() {
Ok(lance_store) => {
match lance_store.search_with_text(server, dataset, query_text.to_string(), k, nprobes, refine_factor).await {
Ok(results) => {
let mut response = Vec::new();
for (distance, metadata) in results {
let mut item = Vec::new();
item.push(Protocol::BulkString("distance".to_string()));
item.push(Protocol::BulkString(distance.to_string()));
for (key, value) in metadata {
item.push(Protocol::BulkString(key));
item.push(Protocol::BulkString(value));
}
response.push(Protocol::Array(item));
}
Ok(Protocol::Array(response))
}
Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR {}", e)))),
}
}
Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR Lance store not available: {}", e)))),
}
}
// Helper function to sanitize error messages for Redis protocol
fn sanitize_error_message(msg: &str) -> String {
// Remove newlines, carriage returns, and limit length
let sanitized = msg
.replace('\n', " ")
.replace('\r', " ")
.replace('\t', " ");
// Limit to 200 characters to avoid overly long error messages
if sanitized.len() > 200 {
format!("{}...", &sanitized[..197])
} else {
sanitized
}
}
async fn lance_embed_text_cmd(
server: &Server,
texts: &[String],
) -> Result<Protocol, DBError> {
match server.lance_store() {
Ok(lance_store) => {
match lance_store.embed_text(server, texts.to_vec()).await {
Ok(embeddings) => {
let mut response = Vec::new();
for embedding in embeddings {
let vector_strings: Vec<Protocol> = embedding
.iter()
.map(|f| Protocol::BulkString(f.to_string()))
.collect();
response.push(Protocol::Array(vector_strings));
}
Ok(Protocol::Array(response))
}
Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR {}", e)))),
}
}
Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR Lance store not available: {}", e)))),
}
}
async fn lance_create_index_cmd(
server: &Server,
dataset: &str,
index_type: &str,
num_partitions: Option<usize>,
num_sub_vectors: Option<usize>,
) -> Result<Protocol, DBError> {
match server.lance_store() {
Ok(lance_store) => {
match lance_store.create_index(dataset, index_type, num_partitions, num_sub_vectors).await {
Ok(_) => Ok(Protocol::SimpleString("OK".to_string())),
Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR {}", e)))),
}
}
Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR Lance store not available: {}", e)))),
}
}
async fn lance_list_cmd(server: &Server) -> Result<Protocol, DBError> {
match server.lance_store() {
Ok(lance_store) => {
match lance_store.list_datasets().await {
Ok(datasets) => {
let response: Vec<Protocol> = datasets
.into_iter()
.map(Protocol::BulkString)
.collect();
Ok(Protocol::Array(response))
}
Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR {}", e)))),
}
}
Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR Lance store not available: {}", e)))),
}
}
async fn lance_drop_cmd(server: &Server, dataset: &str) -> Result<Protocol, DBError> {
match server.lance_store() {
Ok(lance_store) => {
match lance_store.drop_dataset(dataset).await {
Ok(_) => Ok(Protocol::SimpleString("OK".to_string())),
Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR {}", e)))),
}
}
Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR Lance store not available: {}", e)))),
}
}
async fn lance_info_cmd(server: &Server, dataset: &str) -> Result<Protocol, DBError> {
match server.lance_store() {
Ok(lance_store) => {
match lance_store.get_dataset_info(dataset).await {
Ok(info) => {
let mut response = Vec::new();
for (key, value) in info {
response.push(Protocol::BulkString(key));
response.push(Protocol::BulkString(value));
}
Ok(Protocol::Array(response))
}
Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR {}", e)))),
}
}
Err(e) => Ok(Protocol::err(&sanitize_error_message(&format!("ERR Lance store not available: {}", e)))),
}
}

View File

@@ -9,6 +9,12 @@ use bincode;
#[derive(Debug)]
pub struct DBError(pub String);
impl std::fmt::Display for DBError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<std::io::Error> for DBError {
fn from(item: std::io::Error) -> Self {
DBError(item.to_string().clone())
@@ -92,3 +98,40 @@ impl From<chacha20poly1305::Error> for DBError {
DBError(item.to_string())
}
}
// Lance and related dependencies error handling
impl From<lance::Error> for DBError {
fn from(item: lance::Error) -> Self {
DBError(item.to_string())
}
}
impl From<arrow::error::ArrowError> for DBError {
fn from(item: arrow::error::ArrowError) -> Self {
DBError(item.to_string())
}
}
impl From<reqwest::Error> for DBError {
fn from(item: reqwest::Error) -> Self {
DBError(item.to_string())
}
}
impl From<image::ImageError> for DBError {
fn from(item: image::ImageError) -> Self {
DBError(item.to_string())
}
}
impl From<uuid::Error> for DBError {
fn from(item: uuid::Error) -> Self {
DBError(item.to_string())
}
}
impl From<base64::DecodeError> for DBError {
fn from(item: base64::DecodeError) -> Self {
DBError(item.to_string())
}
}

609
src/lance_store.rs Normal file
View File

@@ -0,0 +1,609 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
use arrow::array::{Float32Array, StringArray, ArrayRef, FixedSizeListArray, Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::{RecordBatch, RecordBatchReader};
use arrow::error::ArrowError;
use lance::dataset::{Dataset, WriteParams, WriteMode};
use lance::index::vector::VectorIndexParams;
use lance_index::vector::pq::PQBuildParams;
use lance_index::vector::ivf::IvfBuildParams;
use lance_index::DatasetIndexExt;
use lance_linalg::distance::MetricType;
use futures::TryStreamExt;
use base64::Engine;
use serde::{Deserialize, Serialize};
use crate::error::DBError;
// Simple RecordBatchReader implementation for Vec<RecordBatch>
struct VecRecordBatchReader {
batches: std::vec::IntoIter<Result<RecordBatch, ArrowError>>,
}
impl VecRecordBatchReader {
fn new(batches: Vec<RecordBatch>) -> Self {
let result_batches = batches.into_iter().map(Ok).collect::<Vec<_>>();
Self {
batches: result_batches.into_iter(),
}
}
}
impl Iterator for VecRecordBatchReader {
type Item = Result<RecordBatch, ArrowError>;
fn next(&mut self) -> Option<Self::Item> {
self.batches.next()
}
}
impl RecordBatchReader for VecRecordBatchReader {
fn schema(&self) -> SchemaRef {
// This is a simplified implementation - in practice you'd want to store the schema
Arc::new(Schema::empty())
}
}
#[derive(Debug, Serialize, Deserialize)]
struct EmbeddingRequest {
texts: Option<Vec<String>>,
images: Option<Vec<String>>, // base64 encoded
model: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct EmbeddingResponse {
embeddings: Vec<Vec<f32>>,
model: String,
usage: Option<HashMap<String, u32>>,
}
// Ollama-specific request/response structures
#[derive(Debug, Serialize, Deserialize)]
struct OllamaEmbeddingRequest {
model: String,
prompt: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct OllamaEmbeddingResponse {
embedding: Vec<f32>,
}
pub struct LanceStore {
datasets: Arc<RwLock<HashMap<String, Arc<Dataset>>>>,
data_dir: PathBuf,
http_client: reqwest::Client,
}
impl LanceStore {
pub async fn new(data_dir: PathBuf) -> Result<Self, DBError> {
// Create data directory if it doesn't exist
std::fs::create_dir_all(&data_dir)
.map_err(|e| DBError(format!("Failed to create Lance data directory: {}", e)))?;
let http_client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
.map_err(|e| DBError(format!("Failed to create HTTP client: {}", e)))?;
Ok(Self {
datasets: Arc::new(RwLock::new(HashMap::new())),
data_dir,
http_client,
})
}
/// Get embedding service URL from Redis config, default to local Ollama
async fn get_embedding_url(&self, server: &crate::server::Server) -> Result<String, DBError> {
// Get the embedding URL from Redis config directly from storage
let storage = server.current_storage()?;
match storage.hget("config:core:aiembed", "url")? {
Some(url) => Ok(url),
None => Ok("http://localhost:11434".to_string()), // Default to local Ollama
}
}
/// Check if we're using Ollama (default) or custom embedding service
async fn is_ollama_service(&self, server: &crate::server::Server) -> Result<bool, DBError> {
let url = self.get_embedding_url(server).await?;
Ok(url.contains("localhost:11434") || url.contains("127.0.0.1:11434"))
}
/// Call external embedding service (Ollama or custom)
async fn call_embedding_service(
&self,
server: &crate::server::Server,
texts: Option<Vec<String>>,
images: Option<Vec<String>>,
) -> Result<Vec<Vec<f32>>, DBError> {
let base_url = self.get_embedding_url(server).await?;
let is_ollama = self.is_ollama_service(server).await?;
if is_ollama {
// Use Ollama API format
if let Some(texts) = texts {
let mut embeddings = Vec::new();
for text in texts {
let url = format!("{}/api/embeddings", base_url);
let request = OllamaEmbeddingRequest {
model: "nomic-embed-text".to_string(),
prompt: text,
};
let response = self.http_client
.post(&url)
.json(&request)
.send()
.await
.map_err(|e| DBError(format!("Failed to call Ollama embedding service: {}", e)))?;
if !response.status().is_success() {
let status = response.status();
let error_text = response.text().await.unwrap_or_default();
return Err(DBError(format!(
"Ollama embedding service returned error {}: {}",
status, error_text
)));
}
let ollama_response: OllamaEmbeddingResponse = response
.json()
.await
.map_err(|e| DBError(format!("Failed to parse Ollama embedding response: {}", e)))?;
embeddings.push(ollama_response.embedding);
}
Ok(embeddings)
} else if let Some(_images) = images {
// Ollama doesn't support image embeddings with this API yet
Err(DBError("Image embeddings not supported with Ollama. Please configure a custom embedding service.".to_string()))
} else {
Err(DBError("No text or images provided for embedding".to_string()))
}
} else {
// Use custom embedding service API format
let request = EmbeddingRequest {
texts,
images,
model: None, // Let the service use its default
};
let response = self.http_client
.post(&base_url)
.json(&request)
.send()
.await
.map_err(|e| DBError(format!("Failed to call embedding service: {}", e)))?;
if !response.status().is_success() {
let status = response.status();
let error_text = response.text().await.unwrap_or_default();
return Err(DBError(format!(
"Embedding service returned error {}: {}",
status, error_text
)));
}
let embedding_response: EmbeddingResponse = response
.json()
.await
.map_err(|e| DBError(format!("Failed to parse embedding response: {}", e)))?;
Ok(embedding_response.embeddings)
}
}
pub async fn embed_text(
&self,
server: &crate::server::Server,
texts: Vec<String>
) -> Result<Vec<Vec<f32>>, DBError> {
if texts.is_empty() {
return Ok(Vec::new());
}
self.call_embedding_service(server, Some(texts), None).await
}
pub async fn embed_image(
&self,
server: &crate::server::Server,
image_bytes: Vec<u8>
) -> Result<Vec<f32>, DBError> {
// Convert image bytes to base64
let base64_image = base64::engine::general_purpose::STANDARD.encode(&image_bytes);
let embeddings = self.call_embedding_service(
server,
None,
Some(vec![base64_image])
).await?;
embeddings.into_iter()
.next()
.ok_or_else(|| DBError("No embedding returned for image".to_string()))
}
pub async fn create_dataset(
&self,
name: &str,
schema: Schema,
) -> Result<(), DBError> {
let dataset_path = self.data_dir.join(format!("{}.lance", name));
// Create empty dataset with schema
let write_params = WriteParams {
mode: WriteMode::Create,
..Default::default()
};
// Create an empty RecordBatch with the schema
let empty_batch = RecordBatch::new_empty(Arc::new(schema));
// Use RecordBatchReader for Lance 0.33
let reader = VecRecordBatchReader::new(vec![empty_batch]);
let dataset = Dataset::write(
reader,
dataset_path.to_str().unwrap(),
Some(write_params)
).await
.map_err(|e| DBError(format!("Failed to create dataset: {}", e)))?;
let mut datasets = self.datasets.write().await;
datasets.insert(name.to_string(), Arc::new(dataset));
Ok(())
}
pub async fn write_vectors(
&self,
dataset_name: &str,
vectors: Vec<Vec<f32>>,
metadata: Option<HashMap<String, Vec<String>>>,
) -> Result<usize, DBError> {
let dataset_path = self.data_dir.join(format!("{}.lance", dataset_name));
// Open or get cached dataset
let _dataset = self.get_or_open_dataset(dataset_name).await?;
// Build RecordBatch
let num_vectors = vectors.len();
if num_vectors == 0 {
return Ok(0);
}
let dim = vectors.first()
.ok_or_else(|| DBError("Empty vectors".to_string()))?
.len();
// Flatten vectors
let flat_vectors: Vec<f32> = vectors.into_iter().flatten().collect();
let values_array = Float32Array::from(flat_vectors);
let field = Arc::new(Field::new("item", DataType::Float32, true));
let vector_array = FixedSizeListArray::try_new(
field,
dim as i32,
Arc::new(values_array),
None
).map_err(|e| DBError(format!("Failed to create vector array: {}", e)))?;
let mut arrays: Vec<ArrayRef> = vec![Arc::new(vector_array)];
let mut fields = vec![Field::new(
"vector",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
dim as i32
),
false
)];
// Add metadata columns if provided
if let Some(metadata) = metadata {
for (key, values) in metadata {
if values.len() != num_vectors {
return Err(DBError(format!(
"Metadata field '{}' has {} values but expected {}",
key, values.len(), num_vectors
)));
}
let array = StringArray::from(values);
arrays.push(Arc::new(array));
fields.push(Field::new(&key, DataType::Utf8, true));
}
}
let schema = Arc::new(Schema::new(fields));
let batch = RecordBatch::try_new(schema, arrays)
.map_err(|e| DBError(format!("Failed to create RecordBatch: {}", e)))?;
// Append to dataset
let write_params = WriteParams {
mode: WriteMode::Append,
..Default::default()
};
let reader = VecRecordBatchReader::new(vec![batch]);
Dataset::write(
reader,
dataset_path.to_str().unwrap(),
Some(write_params)
).await
.map_err(|e| DBError(format!("Failed to write to dataset: {}", e)))?;
// Refresh cached dataset
let mut datasets = self.datasets.write().await;
datasets.remove(dataset_name);
Ok(num_vectors)
}
pub async fn search_vectors(
&self,
dataset_name: &str,
query_vector: Vec<f32>,
k: usize,
nprobes: Option<usize>,
_refine_factor: Option<usize>,
) -> Result<Vec<(f32, HashMap<String, String>)>, DBError> {
let dataset = self.get_or_open_dataset(dataset_name).await?;
// Build query
let query_array = Float32Array::from(query_vector.clone());
let mut query = dataset.scan();
query.nearest(
"vector",
&query_array,
k,
).map_err(|e| DBError(format!("Failed to build search query: {}", e)))?;
if let Some(nprobes) = nprobes {
query.nprobs(nprobes);
}
// Note: refine_factor might not be available in this Lance version
// if let Some(refine) = refine_factor {
// query.refine_factor(refine);
// }
// Execute search
let results = query
.try_into_stream()
.await
.map_err(|e| DBError(format!("Failed to execute search: {}", e)))?
.try_collect::<Vec<_>>()
.await
.map_err(|e| DBError(format!("Failed to collect results: {}", e)))?;
// Process results
let mut output = Vec::new();
for batch in results {
// Get distances
let distances = batch
.column_by_name("_distance")
.ok_or_else(|| DBError("No distance column".to_string()))?
.as_any()
.downcast_ref::<Float32Array>()
.ok_or_else(|| DBError("Invalid distance type".to_string()))?;
// Get metadata
for i in 0..batch.num_rows() {
let distance = distances.value(i);
let mut metadata = HashMap::new();
for field in batch.schema().fields() {
if field.name() != "vector" && field.name() != "_distance" {
if let Some(col) = batch.column_by_name(field.name()) {
if let Some(str_array) = col.as_any().downcast_ref::<StringArray>() {
if !str_array.is_null(i) {
metadata.insert(
field.name().to_string(),
str_array.value(i).to_string()
);
}
}
}
}
}
output.push((distance, metadata));
}
}
Ok(output)
}
pub async fn store_multimodal(
&self,
server: &crate::server::Server,
dataset_name: &str,
text: Option<String>,
image_bytes: Option<Vec<u8>>,
metadata: HashMap<String, String>,
) -> Result<String, DBError> {
// Generate ID
let id = uuid::Uuid::new_v4().to_string();
// Generate embeddings using external service
let embedding = if let Some(text) = text.as_ref() {
self.embed_text(server, vec![text.clone()]).await?
.into_iter()
.next()
.ok_or_else(|| DBError("No embedding returned".to_string()))?
} else if let Some(img) = image_bytes.as_ref() {
self.embed_image(server, img.clone()).await?
} else {
return Err(DBError("No text or image provided".to_string()));
};
// Prepare metadata
let mut full_metadata = metadata;
full_metadata.insert("id".to_string(), id.clone());
if let Some(text) = text {
full_metadata.insert("text".to_string(), text);
}
if let Some(img) = image_bytes {
full_metadata.insert("image_base64".to_string(), base64::engine::general_purpose::STANDARD.encode(img));
}
// Convert metadata to column vectors
let mut metadata_cols = HashMap::new();
for (key, value) in full_metadata {
metadata_cols.insert(key, vec![value]);
}
// Write to dataset
self.write_vectors(dataset_name, vec![embedding], Some(metadata_cols)).await?;
Ok(id)
}
pub async fn search_with_text(
&self,
server: &crate::server::Server,
dataset_name: &str,
query_text: String,
k: usize,
nprobes: Option<usize>,
refine_factor: Option<usize>,
) -> Result<Vec<(f32, HashMap<String, String>)>, DBError> {
// Embed the query text using external service
let embeddings = self.embed_text(server, vec![query_text]).await?;
let query_vector = embeddings.into_iter()
.next()
.ok_or_else(|| DBError("No embedding returned for query".to_string()))?;
// Search with the embedding
self.search_vectors(dataset_name, query_vector, k, nprobes, refine_factor).await
}
pub async fn create_index(
&self,
dataset_name: &str,
index_type: &str,
num_partitions: Option<usize>,
num_sub_vectors: Option<usize>,
) -> Result<(), DBError> {
let _dataset = self.get_or_open_dataset(dataset_name).await?;
match index_type.to_uppercase().as_str() {
"IVF_PQ" => {
let ivf_params = IvfBuildParams {
num_partitions: num_partitions.unwrap_or(256),
..Default::default()
};
let pq_params = PQBuildParams {
num_sub_vectors: num_sub_vectors.unwrap_or(16),
..Default::default()
};
let params = VectorIndexParams::with_ivf_pq_params(
MetricType::L2,
ivf_params,
pq_params,
);
// Get a mutable reference to the dataset
let mut dataset_mut = Dataset::open(self.data_dir.join(format!("{}.lance", dataset_name)).to_str().unwrap())
.await
.map_err(|e| DBError(format!("Failed to open dataset for indexing: {}", e)))?;
dataset_mut.create_index(
&["vector"],
lance_index::IndexType::Vector,
None,
&params,
true
).await
.map_err(|e| DBError(format!("Failed to create index: {}", e)))?;
}
_ => return Err(DBError(format!("Unsupported index type: {}", index_type))),
}
Ok(())
}
async fn get_or_open_dataset(&self, name: &str) -> Result<Arc<Dataset>, DBError> {
let mut datasets = self.datasets.write().await;
if let Some(dataset) = datasets.get(name) {
return Ok(dataset.clone());
}
let dataset_path = self.data_dir.join(format!("{}.lance", name));
if !dataset_path.exists() {
return Err(DBError(format!("Dataset '{}' does not exist", name)));
}
let dataset = Dataset::open(dataset_path.to_str().unwrap())
.await
.map_err(|e| DBError(format!("Failed to open dataset: {}", e)))?;
let dataset = Arc::new(dataset);
datasets.insert(name.to_string(), dataset.clone());
Ok(dataset)
}
pub async fn list_datasets(&self) -> Result<Vec<String>, DBError> {
let mut datasets = Vec::new();
let entries = std::fs::read_dir(&self.data_dir)
.map_err(|e| DBError(format!("Failed to read data directory: {}", e)))?;
for entry in entries {
let entry = entry.map_err(|e| DBError(format!("Failed to read entry: {}", e)))?;
let path = entry.path();
if path.is_dir() {
if let Some(name) = path.file_name() {
if let Some(name_str) = name.to_str() {
if name_str.ends_with(".lance") {
let dataset_name = name_str.trim_end_matches(".lance");
datasets.push(dataset_name.to_string());
}
}
}
}
}
Ok(datasets)
}
pub async fn drop_dataset(&self, name: &str) -> Result<(), DBError> {
// Remove from cache
let mut datasets = self.datasets.write().await;
datasets.remove(name);
// Delete from disk
let dataset_path = self.data_dir.join(format!("{}.lance", name));
if dataset_path.exists() {
std::fs::remove_dir_all(dataset_path)
.map_err(|e| DBError(format!("Failed to delete dataset: {}", e)))?;
}
Ok(())
}
pub async fn get_dataset_info(&self, name: &str) -> Result<HashMap<String, String>, DBError> {
let dataset = self.get_or_open_dataset(name).await?;
let mut info = HashMap::new();
info.insert("name".to_string(), name.to_string());
info.insert("version".to_string(), dataset.version().version.to_string());
info.insert("num_rows".to_string(), dataset.count_rows(None).await?.to_string());
// Get schema info
let schema = dataset.schema();
let fields: Vec<String> = schema.fields
.iter()
.map(|f| format!("{}:{}", f.name, f.data_type()))
.collect();
info.insert("schema".to_string(), fields.join(", "));
Ok(info)
}
}

View File

@@ -2,6 +2,7 @@ pub mod age; // NEW
pub mod cmd;
pub mod crypto;
pub mod error;
pub mod lance_store; // Add Lance store module
pub mod options;
pub mod protocol;
pub mod server;

View File

@@ -51,6 +51,7 @@ async fn main() {
// new DB option
let option = herodb::options::DBOption {
dir: args.dir,
port,
debug: args.debug,
encryption_key: args.encryption_key,
encrypt: args.encrypt,

View File

@@ -7,6 +7,7 @@ pub enum BackendType {
#[derive(Debug, Clone)]
pub struct DBOption {
pub dir: String,
pub port: u16,
pub debug: bool,
pub encrypt: bool,
pub encryption_key: Option<String>,

View File

@@ -9,6 +9,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
use crate::cmd::Cmd;
use crate::error::DBError;
use crate::lance_store::LanceStore;
use crate::options;
use crate::protocol::Protocol;
use crate::storage::Storage;
@@ -26,6 +27,9 @@ pub struct Server {
// BLPOP waiter registry: per (db_index, key) FIFO of waiters
pub list_waiters: Arc<Mutex<HashMap<u64, HashMap<String, Vec<Waiter>>>>>,
pub waiter_seq: Arc<AtomicU64>,
// Lance vector store
pub lance_store: Option<Arc<LanceStore>>,
}
pub struct Waiter {
@@ -42,6 +46,16 @@ pub enum PopSide {
impl Server {
pub async fn new(option: options::DBOption) -> Self {
// Initialize Lance store
let lance_data_dir = std::path::PathBuf::from(&option.dir).join("lance");
let lance_store = match LanceStore::new(lance_data_dir).await {
Ok(store) => Some(Arc::new(store)),
Err(e) => {
eprintln!("Warning: Failed to initialize Lance store: {}", e.0);
None
}
};
Server {
db_cache: Arc::new(std::sync::RwLock::new(HashMap::new())),
option,
@@ -51,9 +65,17 @@ impl Server {
list_waiters: Arc::new(Mutex::new(HashMap::new())),
waiter_seq: Arc::new(AtomicU64::new(1)),
lance_store,
}
}
pub fn lance_store(&self) -> Result<Arc<LanceStore>, DBError> {
self.lance_store
.as_ref()
.cloned()
.ok_or_else(|| DBError("Lance store not initialized".to_string()))
}
pub fn current_storage(&self) -> Result<Arc<dyn StorageBackend>, DBError> {
let mut cache = self.db_cache.write().unwrap();

View File

@@ -27,6 +27,7 @@ async fn debug_hset_simple() {
debug: false,
encrypt: false,
encryption_key: None,
backend: herodb::options::BackendType::Redb,
};
let mut server = Server::new(option).await;

View File

@@ -18,6 +18,7 @@ async fn debug_hset_return_value() {
debug: false,
encrypt: false,
encryption_key: None,
backend: herodb::options::BackendType::Redb,
};
let mut server = Server::new(option).await;

View File

@@ -22,6 +22,7 @@ async fn start_test_server(test_name: &str) -> (Server, u16) {
debug: true,
encrypt: false,
encryption_key: None,
backend: herodb::options::BackendType::Redb,
};
let server = Server::new(option).await;

View File

@@ -24,6 +24,7 @@ async fn start_test_server(test_name: &str) -> (Server, u16) {
debug: true,
encrypt: false,
encryption_key: None,
backend: herodb::options::BackendType::Redb,
};
let server = Server::new(option).await;

View File

@@ -22,6 +22,7 @@ async fn start_test_server(test_name: &str) -> (Server, u16) {
debug: false,
encrypt: false,
encryption_key: None,
backend: herodb::options::BackendType::Redb,
};
let server = Server::new(option).await;

View File

@@ -22,6 +22,7 @@ async fn start_test_server(test_name: &str) -> (Server, u16) {
debug: false,
encrypt: false,
encryption_key: None,
backend: herodb::options::BackendType::Redb,
};
let server = Server::new(option).await;