This commit is contained in:
2025-08-23 05:46:38 +02:00
parent a1127b72da
commit ab56fad635
7 changed files with 6956 additions and 20 deletions

5876
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -24,6 +24,17 @@ age = "0.10"
secrecy = "0.8" secrecy = "0.8"
ed25519-dalek = "2" ed25519-dalek = "2"
base64 = "0.22" base64 = "0.22"
# Lance vector database dependencies
lance = "0.33"
lance-index = "0.33"
lance-linalg = "0.33"
arrow = "56"
arrow-array = "56"
arrow-schema = "56"
parquet = "56"
uuid = { version = "1.10", features = ["v4"] }
reqwest = { version = "0.11", features = ["json"] }
image = "0.25"
[dev-dependencies] [dev-dependencies]
redis = { version = "0.24", features = ["aio", "tokio-comp"] } redis = { version = "0.24", features = ["aio", "tokio-comp"] }

View File

@@ -84,6 +84,49 @@ pub enum Cmd {
AgeSignName(String, String), // name, message AgeSignName(String, String), // name, message
AgeVerifyName(String, String, String), // name, message, signature_b64 AgeVerifyName(String, String, String), // name, message, signature_b64
AgeList, AgeList,
// Lance vector database commands
LanceCreate {
dataset: String,
dim: usize,
schema: Vec<(String, String)>, // field_name, field_type pairs
},
LanceStore {
dataset: String,
text: Option<String>,
image_base64: Option<String>,
metadata: std::collections::HashMap<String, String>,
},
LanceSearch {
dataset: String,
vector: Vec<f32>,
k: usize,
nprobes: Option<usize>,
refine_factor: Option<usize>,
},
LanceSearchText {
dataset: String,
query_text: String,
k: usize,
nprobes: Option<usize>,
refine_factor: Option<usize>,
},
LanceEmbedText {
texts: Vec<String>,
},
LanceCreateIndex {
dataset: String,
index_type: String,
num_partitions: Option<usize>,
num_sub_vectors: Option<usize>,
},
LanceList,
LanceDrop {
dataset: String,
},
LanceInfo {
dataset: String,
},
} }
impl Cmd { impl Cmd {
@@ -616,6 +659,237 @@ impl Cmd {
_ => return Err(DBError(format!("unsupported AGE subcommand {:?}", cmd))), _ => return Err(DBError(format!("unsupported AGE subcommand {:?}", cmd))),
} }
} }
"lance" => {
if cmd.len() < 2 {
return Err(DBError("wrong number of arguments for LANCE".to_string()));
}
match cmd[1].to_lowercase().as_str() {
"create" => {
if cmd.len() < 4 {
return Err(DBError("LANCE CREATE <dataset> DIM <dimension> [SCHEMA field:type ...]".to_string()));
}
let dataset = cmd[2].clone();
// Parse DIM parameter
if cmd[3].to_lowercase() != "dim" {
return Err(DBError("Expected DIM after dataset name".to_string()));
}
if cmd.len() < 5 {
return Err(DBError("Missing dimension value".to_string()));
}
let dim = cmd[4].parse::<usize>().map_err(|_| DBError("Invalid dimension value".to_string()))?;
// Parse optional SCHEMA
let mut schema = Vec::new();
let mut i = 5;
if i < cmd.len() && cmd[i].to_lowercase() == "schema" {
i += 1;
while i < cmd.len() {
let field_spec = &cmd[i];
let parts: Vec<&str> = field_spec.split(':').collect();
if parts.len() != 2 {
return Err(DBError("Schema fields must be in format field:type".to_string()));
}
schema.push((parts[0].to_string(), parts[1].to_string()));
i += 1;
}
}
Cmd::LanceCreate { dataset, dim, schema }
}
"store" => {
if cmd.len() < 3 {
return Err(DBError("LANCE STORE <dataset> [TEXT <text>] [IMAGE <base64>] [metadata...]".to_string()));
}
let dataset = cmd[2].clone();
let mut text = None;
let mut image_base64 = None;
let mut metadata = std::collections::HashMap::new();
let mut i = 3;
while i < cmd.len() {
match cmd[i].to_lowercase().as_str() {
"text" => {
if i + 1 >= cmd.len() {
return Err(DBError("TEXT requires a value".to_string()));
}
text = Some(cmd[i + 1].clone());
i += 2;
}
"image" => {
if i + 1 >= cmd.len() {
return Err(DBError("IMAGE requires a base64 value".to_string()));
}
image_base64 = Some(cmd[i + 1].clone());
i += 2;
}
_ => {
// Parse as metadata key:value
if i + 1 >= cmd.len() {
return Err(DBError("Metadata requires key value pairs".to_string()));
}
metadata.insert(cmd[i].clone(), cmd[i + 1].clone());
i += 2;
}
}
}
Cmd::LanceStore { dataset, text, image_base64, metadata }
}
"search" => {
if cmd.len() < 5 {
return Err(DBError("LANCE SEARCH <dataset> VECTOR <vector> K <k> [NPROBES <n>] [REFINE <r>]".to_string()));
}
let dataset = cmd[2].clone();
if cmd[3].to_lowercase() != "vector" {
return Err(DBError("Expected VECTOR after dataset name".to_string()));
}
// Parse vector - expect comma-separated floats in brackets or just comma-separated
let vector_str = &cmd[4];
let vector_str = vector_str.trim_start_matches('[').trim_end_matches(']');
let vector: Result<Vec<f32>, _> = vector_str
.split(',')
.map(|s| s.trim().parse::<f32>())
.collect();
let vector = vector.map_err(|_| DBError("Invalid vector format".to_string()))?;
if cmd.len() < 7 || cmd[5].to_lowercase() != "k" {
return Err(DBError("Expected K after vector".to_string()));
}
let k = cmd[6].parse::<usize>().map_err(|_| DBError("Invalid K value".to_string()))?;
let mut nprobes = None;
let mut refine_factor = None;
let mut i = 7;
while i < cmd.len() {
match cmd[i].to_lowercase().as_str() {
"nprobes" => {
if i + 1 >= cmd.len() {
return Err(DBError("NPROBES requires a value".to_string()));
}
nprobes = Some(cmd[i + 1].parse::<usize>().map_err(|_| DBError("Invalid NPROBES value".to_string()))?);
i += 2;
}
"refine" => {
if i + 1 >= cmd.len() {
return Err(DBError("REFINE requires a value".to_string()));
}
refine_factor = Some(cmd[i + 1].parse::<usize>().map_err(|_| DBError("Invalid REFINE value".to_string()))?);
i += 2;
}
_ => {
return Err(DBError(format!("Unknown parameter: {}", cmd[i])));
}
}
}
Cmd::LanceSearch { dataset, vector, k, nprobes, refine_factor }
}
"search.text" => {
if cmd.len() < 6 {
return Err(DBError("LANCE SEARCH.TEXT <dataset> <query_text> K <k> [NPROBES <n>] [REFINE <r>]".to_string()));
}
let dataset = cmd[2].clone();
let query_text = cmd[3].clone();
if cmd[4].to_lowercase() != "k" {
return Err(DBError("Expected K after query text".to_string()));
}
let k = cmd[5].parse::<usize>().map_err(|_| DBError("Invalid K value".to_string()))?;
let mut nprobes = None;
let mut refine_factor = None;
let mut i = 6;
while i < cmd.len() {
match cmd[i].to_lowercase().as_str() {
"nprobes" => {
if i + 1 >= cmd.len() {
return Err(DBError("NPROBES requires a value".to_string()));
}
nprobes = Some(cmd[i + 1].parse::<usize>().map_err(|_| DBError("Invalid NPROBES value".to_string()))?);
i += 2;
}
"refine" => {
if i + 1 >= cmd.len() {
return Err(DBError("REFINE requires a value".to_string()));
}
refine_factor = Some(cmd[i + 1].parse::<usize>().map_err(|_| DBError("Invalid REFINE value".to_string()))?);
i += 2;
}
_ => {
return Err(DBError(format!("Unknown parameter: {}", cmd[i])));
}
}
}
Cmd::LanceSearchText { dataset, query_text, k, nprobes, refine_factor }
}
"embed.text" => {
if cmd.len() < 3 {
return Err(DBError("LANCE EMBED.TEXT <text1> [text2] ...".to_string()));
}
let texts = cmd[2..].to_vec();
Cmd::LanceEmbedText { texts }
}
"create.index" => {
if cmd.len() < 5 {
return Err(DBError("LANCE CREATE.INDEX <dataset> <index_type> [PARTITIONS <n>] [SUBVECTORS <n>]".to_string()));
}
let dataset = cmd[2].clone();
let index_type = cmd[3].clone();
let mut num_partitions = None;
let mut num_sub_vectors = None;
let mut i = 4;
while i < cmd.len() {
match cmd[i].to_lowercase().as_str() {
"partitions" => {
if i + 1 >= cmd.len() {
return Err(DBError("PARTITIONS requires a value".to_string()));
}
num_partitions = Some(cmd[i + 1].parse::<usize>().map_err(|_| DBError("Invalid PARTITIONS value".to_string()))?);
i += 2;
}
"subvectors" => {
if i + 1 >= cmd.len() {
return Err(DBError("SUBVECTORS requires a value".to_string()));
}
num_sub_vectors = Some(cmd[i + 1].parse::<usize>().map_err(|_| DBError("Invalid SUBVECTORS value".to_string()))?);
i += 2;
}
_ => {
return Err(DBError(format!("Unknown parameter: {}", cmd[i])));
}
}
}
Cmd::LanceCreateIndex { dataset, index_type, num_partitions, num_sub_vectors }
}
"list" => {
if cmd.len() != 2 {
return Err(DBError("LANCE LIST takes no arguments".to_string()));
}
Cmd::LanceList
}
"drop" => {
if cmd.len() != 3 {
return Err(DBError("LANCE DROP <dataset>".to_string()));
}
let dataset = cmd[2].clone();
Cmd::LanceDrop { dataset }
}
"info" => {
if cmd.len() != 3 {
return Err(DBError("LANCE INFO <dataset>".to_string()));
}
let dataset = cmd[2].clone();
Cmd::LanceInfo { dataset }
}
_ => return Err(DBError(format!("unsupported LANCE subcommand {:?}", cmd))),
}
}
_ => Cmd::Unknow(cmd[0].clone()), _ => Cmd::Unknow(cmd[0].clone()),
}, },
protocol, protocol,
@@ -730,6 +1004,18 @@ impl Cmd {
Cmd::AgeSignName(name, message) => Ok(crate::age::cmd_age_sign_name(server, &name, &message).await), Cmd::AgeSignName(name, message) => Ok(crate::age::cmd_age_sign_name(server, &name, &message).await),
Cmd::AgeVerifyName(name, message, sig_b64) => Ok(crate::age::cmd_age_verify_name(server, &name, &message, &sig_b64).await), Cmd::AgeVerifyName(name, message, sig_b64) => Ok(crate::age::cmd_age_verify_name(server, &name, &message, &sig_b64).await),
Cmd::AgeList => Ok(crate::age::cmd_age_list(server).await), Cmd::AgeList => Ok(crate::age::cmd_age_list(server).await),
// Lance vector database commands
Cmd::LanceCreate { dataset, dim, schema } => lance_create_cmd(server, &dataset, *dim, &schema).await,
Cmd::LanceStore { dataset, text, image_base64, metadata } => lance_store_cmd(server, &dataset, text.as_deref(), image_base64.as_deref(), metadata).await,
Cmd::LanceSearch { dataset, vector, k, nprobes, refine_factor } => lance_search_cmd(server, &dataset, vector, *k, nprobes, refine_factor).await,
Cmd::LanceSearchText { dataset, query_text, k, nprobes, refine_factor } => lance_search_text_cmd(server, &dataset, &query_text, *k, nprobes, refine_factor).await,
Cmd::LanceEmbedText { texts } => lance_embed_text_cmd(server, texts).await,
Cmd::LanceCreateIndex { dataset, index_type, num_partitions, num_sub_vectors } => lance_create_index_cmd(server, &dataset, &index_type, num_partitions, num_sub_vectors).await,
Cmd::LanceList => lance_list_cmd(server).await,
Cmd::LanceDrop { dataset } => lance_drop_cmd(server, &dataset).await,
Cmd::LanceInfo { dataset } => lance_info_cmd(server, &dataset).await,
Cmd::Unknow(s) => Ok(Protocol::err(&format!("ERR unknown command `{}`", s))), Cmd::Unknow(s) => Ok(Protocol::err(&format!("ERR unknown command `{}`", s))),
} }
} }
@@ -1513,3 +1799,228 @@ fn command_cmd(args: &[String]) -> Result<Protocol, DBError> {
_ => Ok(Protocol::Array(vec![])), _ => Ok(Protocol::Array(vec![])),
} }
} }
// Lance vector database command implementations
async fn lance_create_cmd(
server: &Server,
dataset: &str,
dim: usize,
schema: &[(String, String)],
) -> Result<Protocol, DBError> {
match server.lance_store() {
Ok(lance_store) => {
match lance_store.create_dataset(dataset, dim, schema.to_vec()).await {
Ok(_) => Ok(Protocol::SimpleString("OK".to_string())),
Err(e) => Ok(Protocol::err(&format!("ERR {}", e))),
}
}
Err(e) => Ok(Protocol::err(&format!("ERR Lance store not available: {}", e))),
}
}
async fn lance_store_cmd(
server: &Server,
dataset: &str,
text: Option<&str>,
image_base64: Option<&str>,
metadata: &std::collections::HashMap<String, String>,
) -> Result<Protocol, DBError> {
match server.lance_store() {
Ok(lance_store) => {
match lance_store.store_data(dataset, text, image_base64, metadata.clone()).await {
Ok(id) => Ok(Protocol::BulkString(id)),
Err(e) => Ok(Protocol::err(&format!("ERR {}", e))),
}
}
Err(e) => Ok(Protocol::err(&format!("ERR Lance store not available: {}", e))),
}
}
async fn lance_search_cmd(
server: &Server,
dataset: &str,
vector: &[f32],
k: usize,
nprobes: Option<usize>,
refine_factor: Option<usize>,
) -> Result<Protocol, DBError> {
match server.lance_store() {
Ok(lance_store) => {
match lance_store.search_vector(dataset, vector, k, nprobes, refine_factor).await {
Ok(results) => {
let mut response = Vec::new();
for result in results {
let mut item = Vec::new();
item.push(Protocol::BulkString("id".to_string()));
item.push(Protocol::BulkString(result.id));
item.push(Protocol::BulkString("score".to_string()));
item.push(Protocol::BulkString(result.score.to_string()));
if let Some(text) = result.text {
item.push(Protocol::BulkString("text".to_string()));
item.push(Protocol::BulkString(text));
}
if let Some(image) = result.image_base64 {
item.push(Protocol::BulkString("image".to_string()));
item.push(Protocol::BulkString(image));
}
for (key, value) in result.metadata {
item.push(Protocol::BulkString(key));
item.push(Protocol::BulkString(value));
}
response.push(Protocol::Array(item));
}
Ok(Protocol::Array(response))
}
Err(e) => Ok(Protocol::err(&format!("ERR {}", e))),
}
}
Err(e) => Ok(Protocol::err(&format!("ERR Lance store not available: {}", e))),
}
}
async fn lance_search_text_cmd(
server: &Server,
dataset: &str,
query_text: &str,
k: usize,
nprobes: Option<usize>,
refine_factor: Option<usize>,
) -> Result<Protocol, DBError> {
match server.lance_store() {
Ok(lance_store) => {
match lance_store.search_text(dataset, query_text, k, nprobes, refine_factor).await {
Ok(results) => {
let mut response = Vec::new();
for result in results {
let mut item = Vec::new();
item.push(Protocol::BulkString("id".to_string()));
item.push(Protocol::BulkString(result.id));
item.push(Protocol::BulkString("score".to_string()));
item.push(Protocol::BulkString(result.score.to_string()));
if let Some(text) = result.text {
item.push(Protocol::BulkString("text".to_string()));
item.push(Protocol::BulkString(text));
}
if let Some(image) = result.image_base64 {
item.push(Protocol::BulkString("image".to_string()));
item.push(Protocol::BulkString(image));
}
for (key, value) in result.metadata {
item.push(Protocol::BulkString(key));
item.push(Protocol::BulkString(value));
}
response.push(Protocol::Array(item));
}
Ok(Protocol::Array(response))
}
Err(e) => Ok(Protocol::err(&format!("ERR {}", e))),
}
}
Err(e) => Ok(Protocol::err(&format!("ERR Lance store not available: {}", e))),
}
}
async fn lance_embed_text_cmd(
server: &Server,
texts: &[String],
) -> Result<Protocol, DBError> {
match server.lance_store() {
Ok(lance_store) => {
match lance_store.embed_texts(texts).await {
Ok(embeddings) => {
let mut response = Vec::new();
for embedding in embeddings {
let vector_strings: Vec<Protocol> = embedding
.iter()
.map(|f| Protocol::BulkString(f.to_string()))
.collect();
response.push(Protocol::Array(vector_strings));
}
Ok(Protocol::Array(response))
}
Err(e) => Ok(Protocol::err(&format!("ERR {}", e))),
}
}
Err(e) => Ok(Protocol::err(&format!("ERR Lance store not available: {}", e))),
}
}
async fn lance_create_index_cmd(
server: &Server,
dataset: &str,
index_type: &str,
num_partitions: Option<usize>,
num_sub_vectors: Option<usize>,
) -> Result<Protocol, DBError> {
match server.lance_store() {
Ok(lance_store) => {
match lance_store.create_index(dataset, index_type, num_partitions, num_sub_vectors).await {
Ok(_) => Ok(Protocol::SimpleString("OK".to_string())),
Err(e) => Ok(Protocol::err(&format!("ERR {}", e))),
}
}
Err(e) => Ok(Protocol::err(&format!("ERR Lance store not available: {}", e))),
}
}
async fn lance_list_cmd(server: &Server) -> Result<Protocol, DBError> {
match server.lance_store() {
Ok(lance_store) => {
match lance_store.list_datasets().await {
Ok(datasets) => {
let response: Vec<Protocol> = datasets
.into_iter()
.map(Protocol::BulkString)
.collect();
Ok(Protocol::Array(response))
}
Err(e) => Ok(Protocol::err(&format!("ERR {}", e))),
}
}
Err(e) => Ok(Protocol::err(&format!("ERR Lance store not available: {}", e))),
}
}
async fn lance_drop_cmd(server: &Server, dataset: &str) -> Result<Protocol, DBError> {
match server.lance_store() {
Ok(lance_store) => {
match lance_store.drop_dataset(dataset).await {
Ok(_) => Ok(Protocol::SimpleString("OK".to_string())),
Err(e) => Ok(Protocol::err(&format!("ERR {}", e))),
}
}
Err(e) => Ok(Protocol::err(&format!("ERR Lance store not available: {}", e))),
}
}
async fn lance_info_cmd(server: &Server, dataset: &str) -> Result<Protocol, DBError> {
match server.lance_store() {
Ok(lance_store) => {
match lance_store.dataset_info(dataset).await {
Ok(info) => {
let mut response = Vec::new();
response.push(Protocol::BulkString("name".to_string()));
response.push(Protocol::BulkString(info.name));
response.push(Protocol::BulkString("dimension".to_string()));
response.push(Protocol::BulkString(info.dimension.to_string()));
response.push(Protocol::BulkString("num_rows".to_string()));
response.push(Protocol::BulkString(info.num_rows.to_string()));
response.push(Protocol::BulkString("schema".to_string()));
let schema_items: Vec<Protocol> = info.schema
.into_iter()
.map(|(field, field_type)| {
Protocol::Array(vec![
Protocol::BulkString(field),
Protocol::BulkString(field_type),
])
})
.collect();
response.push(Protocol::Array(schema_items));
Ok(Protocol::Array(response))
}
Err(e) => Ok(Protocol::err(&format!("ERR {}", e))),
}
}
Err(e) => Ok(Protocol::err(&format!("ERR Lance store not available: {}", e))),
}
}

View File

@@ -9,6 +9,12 @@ use bincode;
#[derive(Debug)] #[derive(Debug)]
pub struct DBError(pub String); pub struct DBError(pub String);
impl std::fmt::Display for DBError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<std::io::Error> for DBError { impl From<std::io::Error> for DBError {
fn from(item: std::io::Error) -> Self { fn from(item: std::io::Error) -> Self {
DBError(item.to_string().clone()) DBError(item.to_string().clone())
@@ -92,3 +98,40 @@ impl From<chacha20poly1305::Error> for DBError {
DBError(item.to_string()) DBError(item.to_string())
} }
} }
// Lance and related dependencies error handling
impl From<lance::Error> for DBError {
fn from(item: lance::Error) -> Self {
DBError(item.to_string())
}
}
impl From<arrow::error::ArrowError> for DBError {
fn from(item: arrow::error::ArrowError) -> Self {
DBError(item.to_string())
}
}
impl From<reqwest::Error> for DBError {
fn from(item: reqwest::Error) -> Self {
DBError(item.to_string())
}
}
impl From<image::ImageError> for DBError {
fn from(item: image::ImageError) -> Self {
DBError(item.to_string())
}
}
impl From<uuid::Error> for DBError {
fn from(item: uuid::Error) -> Self {
DBError(item.to_string())
}
}
impl From<base64::DecodeError> for DBError {
fn from(item: base64::DecodeError) -> Self {
DBError(item.to_string())
}
}

512
src/lance_store.rs Normal file
View File

@@ -0,0 +1,512 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
use arrow::array::{Float32Array, StringArray, ArrayRef, FixedSizeListArray};
use arrow::datatypes::{DataType, Field, Schema, FieldRef};
use arrow::record_batch::RecordBatch;
use lance::dataset::{Dataset, WriteParams, WriteMode};
use lance::index::vector::VectorIndexParams;
use lance_index::vector::pq::PQBuildParams;
use lance_index::vector::ivf::IvfBuildParams;
use lance_index::DatasetIndexExt;
use lance_linalg::distance::MetricType;
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
use crate::error::DBError;
use crate::protocol::Protocol;
#[derive(Debug, Serialize, Deserialize)]
struct EmbeddingRequest {
texts: Option<Vec<String>>,
images: Option<Vec<String>>, // base64 encoded
model: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
struct EmbeddingResponse {
embeddings: Vec<Vec<f32>>,
model: String,
usage: Option<HashMap<String, u32>>,
}
pub struct LanceStore {
datasets: Arc<RwLock<HashMap<String, Arc<Dataset>>>>,
data_dir: PathBuf,
http_client: reqwest::Client,
}
impl LanceStore {
pub async fn new(data_dir: PathBuf) -> Result<Self, DBError> {
// Create data directory if it doesn't exist
std::fs::create_dir_all(&data_dir)
.map_err(|e| DBError(format!("Failed to create Lance data directory: {}", e)))?;
let http_client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
.map_err(|e| DBError(format!("Failed to create HTTP client: {}", e)))?;
Ok(Self {
datasets: Arc::new(RwLock::new(HashMap::new())),
data_dir,
http_client,
})
}
/// Get embedding service URL from Redis config
async fn get_embedding_url(&self, server: &crate::server::Server) -> Result<String, DBError> {
// Get the embedding URL from Redis config
let key = "config:core:aiembed:url";
// Use HGET to retrieve the URL from Redis hash
let cmd = crate::cmd::Cmd::HGet(key.to_string(), "url".to_string());
// Execute command to get the config
let result = cmd.run(&mut server.clone()).await?;
match result {
Protocol::BulkString(url) => Ok(url),
Protocol::SimpleString(url) => Ok(url),
Protocol::Nil => Err(DBError(
"Embedding service URL not configured. Set it with: HSET config:core:aiembed:url url <YOUR_EMBEDDING_SERVICE_URL>".to_string()
)),
_ => Err(DBError("Invalid embedding URL configuration".to_string())),
}
}
/// Call external embedding service
async fn call_embedding_service(
&self,
server: &crate::server::Server,
texts: Option<Vec<String>>,
images: Option<Vec<String>>,
) -> Result<Vec<Vec<f32>>, DBError> {
let url = self.get_embedding_url(server).await?;
let request = EmbeddingRequest {
texts,
images,
model: None, // Let the service use its default
};
let response = self.http_client
.post(&url)
.json(&request)
.send()
.await
.map_err(|e| DBError(format!("Failed to call embedding service: {}", e)))?;
if !response.status().is_success() {
let status = response.status();
let error_text = response.text().await.unwrap_or_default();
return Err(DBError(format!(
"Embedding service returned error {}: {}",
status, error_text
)));
}
let embedding_response: EmbeddingResponse = response
.json()
.await
.map_err(|e| DBError(format!("Failed to parse embedding response: {}", e)))?;
Ok(embedding_response.embeddings)
}
pub async fn embed_text(
&self,
server: &crate::server::Server,
texts: Vec<String>
) -> Result<Vec<Vec<f32>>, DBError> {
if texts.is_empty() {
return Ok(Vec::new());
}
self.call_embedding_service(server, Some(texts), None).await
}
pub async fn embed_image(
&self,
server: &crate::server::Server,
image_bytes: Vec<u8>
) -> Result<Vec<f32>, DBError> {
// Convert image bytes to base64
let base64_image = base64::engine::general_purpose::STANDARD.encode(&image_bytes);
let embeddings = self.call_embedding_service(
server,
None,
Some(vec![base64_image])
).await?;
embeddings.into_iter()
.next()
.ok_or_else(|| DBError("No embedding returned for image".to_string()))
}
pub async fn create_dataset(
&self,
name: &str,
schema: Schema,
) -> Result<(), DBError> {
let dataset_path = self.data_dir.join(format!("{}.lance", name));
// Create empty dataset with schema
let write_params = WriteParams {
mode: WriteMode::Create,
..Default::default()
};
// Create an empty RecordBatch with the schema
let empty_batch = RecordBatch::new_empty(Arc::new(schema));
let batches = vec![empty_batch];
let dataset = Dataset::write(
batches,
dataset_path.to_str().unwrap(),
Some(write_params)
).await
.map_err(|e| DBError(format!("Failed to create dataset: {}", e)))?;
let mut datasets = self.datasets.write().await;
datasets.insert(name.to_string(), Arc::new(dataset));
Ok(())
}
pub async fn write_vectors(
&self,
dataset_name: &str,
vectors: Vec<Vec<f32>>,
metadata: Option<HashMap<String, Vec<String>>>,
) -> Result<usize, DBError> {
let dataset_path = self.data_dir.join(format!("{}.lance", dataset_name));
// Open or get cached dataset
let dataset = self.get_or_open_dataset(dataset_name).await?;
// Build RecordBatch
let num_vectors = vectors.len();
if num_vectors == 0 {
return Ok(0);
}
let dim = vectors.first()
.ok_or_else(|| DBError("Empty vectors".to_string()))?
.len();
// Flatten vectors
let flat_vectors: Vec<f32> = vectors.into_iter().flatten().collect();
let vector_array = Float32Array::from(flat_vectors);
let vector_array = arrow::array::FixedSizeListArray::try_new_from_values(
vector_array,
dim as i32
).map_err(|e| DBError(format!("Failed to create vector array: {}", e)))?;
let mut arrays: Vec<ArrayRef> = vec![Arc::new(vector_array)];
let mut fields = vec![Field::new(
"vector",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
dim as i32
),
false
)];
// Add metadata columns if provided
if let Some(metadata) = metadata {
for (key, values) in metadata {
if values.len() != num_vectors {
return Err(DBError(format!(
"Metadata field '{}' has {} values but expected {}",
key, values.len(), num_vectors
)));
}
let array = StringArray::from(values);
arrays.push(Arc::new(array));
fields.push(Field::new(&key, DataType::Utf8, true));
}
}
let schema = Arc::new(Schema::new(fields));
let batch = RecordBatch::try_new(schema, arrays)
.map_err(|e| DBError(format!("Failed to create RecordBatch: {}", e)))?;
// Append to dataset
let write_params = WriteParams {
mode: WriteMode::Append,
..Default::default()
};
Dataset::write(
vec![batch],
dataset_path.to_str().unwrap(),
Some(write_params)
).await
.map_err(|e| DBError(format!("Failed to write to dataset: {}", e)))?;
// Refresh cached dataset
let mut datasets = self.datasets.write().await;
datasets.remove(dataset_name);
Ok(num_vectors)
}
pub async fn search_vectors(
&self,
dataset_name: &str,
query_vector: Vec<f32>,
k: usize,
nprobes: Option<usize>,
refine_factor: Option<usize>,
) -> Result<Vec<(f32, HashMap<String, String>)>, DBError> {
let dataset = self.get_or_open_dataset(dataset_name).await?;
// Build query
let mut query = dataset.scan();
query = query.nearest(
"vector",
&query_vector,
k,
).map_err(|e| DBError(format!("Failed to build search query: {}", e)))?;
if let Some(nprobes) = nprobes {
query = query.nprobes(nprobes);
}
if let Some(refine) = refine_factor {
query = query.refine_factor(refine);
}
// Execute search
let results = query
.try_into_stream()
.await
.map_err(|e| DBError(format!("Failed to execute search: {}", e)))?
.try_collect::<Vec<_>>()
.await
.map_err(|e| DBError(format!("Failed to collect results: {}", e)))?;
// Process results
let mut output = Vec::new();
for batch in results {
// Get distances
let distances = batch
.column_by_name("_distance")
.ok_or_else(|| DBError("No distance column".to_string()))?
.as_any()
.downcast_ref::<Float32Array>()
.ok_or_else(|| DBError("Invalid distance type".to_string()))?;
// Get metadata
for i in 0..batch.num_rows() {
let distance = distances.value(i);
let mut metadata = HashMap::new();
for field in batch.schema().fields() {
if field.name() != "vector" && field.name() != "_distance" {
if let Some(col) = batch.column_by_name(field.name()) {
if let Some(str_array) = col.as_any().downcast_ref::<StringArray>() {
if !str_array.is_null(i) {
metadata.insert(
field.name().to_string(),
str_array.value(i).to_string()
);
}
}
}
}
}
output.push((distance, metadata));
}
}
Ok(output)
}
pub async fn store_multimodal(
&self,
server: &crate::server::Server,
dataset_name: &str,
text: Option<String>,
image_bytes: Option<Vec<u8>>,
metadata: HashMap<String, String>,
) -> Result<String, DBError> {
// Generate ID
let id = uuid::Uuid::new_v4().to_string();
// Generate embeddings using external service
let embedding = if let Some(text) = text.as_ref() {
self.embed_text(server, vec![text.clone()]).await?
.into_iter()
.next()
.ok_or_else(|| DBError("No embedding returned".to_string()))?
} else if let Some(img) = image_bytes.as_ref() {
self.embed_image(server, img.clone()).await?
} else {
return Err(DBError("No text or image provided".to_string()));
};
// Prepare metadata
let mut full_metadata = metadata;
full_metadata.insert("id".to_string(), id.clone());
if let Some(text) = text {
full_metadata.insert("text".to_string(), text);
}
if let Some(img) = image_bytes {
full_metadata.insert("image_base64".to_string(), base64::engine::general_purpose::STANDARD.encode(img));
}
// Convert metadata to column vectors
let mut metadata_cols = HashMap::new();
for (key, value) in full_metadata {
metadata_cols.insert(key, vec![value]);
}
// Write to dataset
self.write_vectors(dataset_name, vec![embedding], Some(metadata_cols)).await?;
Ok(id)
}
pub async fn search_with_text(
&self,
server: &crate::server::Server,
dataset_name: &str,
query_text: String,
k: usize,
nprobes: Option<usize>,
refine_factor: Option<usize>,
) -> Result<Vec<(f32, HashMap<String, String>)>, DBError> {
// Embed the query text using external service
let embeddings = self.embed_text(server, vec![query_text]).await?;
let query_vector = embeddings.into_iter()
.next()
.ok_or_else(|| DBError("No embedding returned for query".to_string()))?;
// Search with the embedding
self.search_vectors(dataset_name, query_vector, k, nprobes, refine_factor).await
}
pub async fn create_index(
&self,
dataset_name: &str,
index_type: &str,
num_partitions: Option<usize>,
num_sub_vectors: Option<usize>,
) -> Result<(), DBError> {
let dataset = self.get_or_open_dataset(dataset_name).await?;
let mut params = VectorIndexParams::default();
match index_type.to_uppercase().as_str() {
"IVF_PQ" => {
params.ivf = IvfBuildParams {
num_partitions: num_partitions.unwrap_or(256),
..Default::default()
};
params.pq = PQBuildParams {
num_sub_vectors: num_sub_vectors.unwrap_or(16),
..Default::default()
};
}
_ => return Err(DBError(format!("Unsupported index type: {}", index_type))),
}
dataset.create_index(
&["vector"],
lance::index::IndexType::Vector,
None,
&params,
true
).await
.map_err(|e| DBError(format!("Failed to create index: {}", e)))?;
Ok(())
}
async fn get_or_open_dataset(&self, name: &str) -> Result<Arc<Dataset>, DBError> {
let mut datasets = self.datasets.write().await;
if let Some(dataset) = datasets.get(name) {
return Ok(dataset.clone());
}
let dataset_path = self.data_dir.join(format!("{}.lance", name));
if !dataset_path.exists() {
return Err(DBError(format!("Dataset '{}' does not exist", name)));
}
let dataset = Dataset::open(dataset_path.to_str().unwrap())
.await
.map_err(|e| DBError(format!("Failed to open dataset: {}", e)))?;
let dataset = Arc::new(dataset);
datasets.insert(name.to_string(), dataset.clone());
Ok(dataset)
}
pub async fn list_datasets(&self) -> Result<Vec<String>, DBError> {
let mut datasets = Vec::new();
let entries = std::fs::read_dir(&self.data_dir)
.map_err(|e| DBError(format!("Failed to read data directory: {}", e)))?;
for entry in entries {
let entry = entry.map_err(|e| DBError(format!("Failed to read entry: {}", e)))?;
let path = entry.path();
if path.is_dir() {
if let Some(name) = path.file_name() {
if let Some(name_str) = name.to_str() {
if name_str.ends_with(".lance") {
let dataset_name = name_str.trim_end_matches(".lance");
datasets.push(dataset_name.to_string());
}
}
}
}
}
Ok(datasets)
}
pub async fn drop_dataset(&self, name: &str) -> Result<(), DBError> {
// Remove from cache
let mut datasets = self.datasets.write().await;
datasets.remove(name);
// Delete from disk
let dataset_path = self.data_dir.join(format!("{}.lance", name));
if dataset_path.exists() {
std::fs::remove_dir_all(dataset_path)
.map_err(|e| DBError(format!("Failed to delete dataset: {}", e)))?;
}
Ok(())
}
pub async fn get_dataset_info(&self, name: &str) -> Result<HashMap<String, String>, DBError> {
let dataset = self.get_or_open_dataset(name).await?;
let mut info = HashMap::new();
info.insert("name".to_string(), name.to_string());
info.insert("version".to_string(), dataset.version().to_string());
info.insert("num_rows".to_string(), dataset.count_rows().await?.to_string());
// Get schema info
let schema = dataset.schema();
let fields: Vec<String> = schema.fields()
.iter()
.map(|f| format!("{}:{}", f.name(), f.data_type()))
.collect();
info.insert("schema".to_string(), fields.join(", "));
Ok(info)
}
}

View File

@@ -2,6 +2,7 @@ pub mod age; // NEW
pub mod cmd; pub mod cmd;
pub mod crypto; pub mod crypto;
pub mod error; pub mod error;
pub mod lance_store; // Add Lance store module
pub mod options; pub mod options;
pub mod protocol; pub mod protocol;
pub mod server; pub mod server;

View File

@@ -9,6 +9,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
use crate::cmd::Cmd; use crate::cmd::Cmd;
use crate::error::DBError; use crate::error::DBError;
use crate::lance_store::LanceStore;
use crate::options; use crate::options;
use crate::protocol::Protocol; use crate::protocol::Protocol;
use crate::storage::Storage; use crate::storage::Storage;
@@ -26,6 +27,9 @@ pub struct Server {
// BLPOP waiter registry: per (db_index, key) FIFO of waiters // BLPOP waiter registry: per (db_index, key) FIFO of waiters
pub list_waiters: Arc<Mutex<HashMap<u64, HashMap<String, Vec<Waiter>>>>>, pub list_waiters: Arc<Mutex<HashMap<u64, HashMap<String, Vec<Waiter>>>>>,
pub waiter_seq: Arc<AtomicU64>, pub waiter_seq: Arc<AtomicU64>,
// Lance vector store
pub lance_store: Option<Arc<LanceStore>>,
} }
pub struct Waiter { pub struct Waiter {
@@ -42,6 +46,16 @@ pub enum PopSide {
impl Server { impl Server {
pub async fn new(option: options::DBOption) -> Self { pub async fn new(option: options::DBOption) -> Self {
// Initialize Lance store
let lance_data_dir = std::path::PathBuf::from(&option.dir).join("lance");
let lance_store = match LanceStore::new(lance_data_dir).await {
Ok(store) => Some(Arc::new(store)),
Err(e) => {
eprintln!("Warning: Failed to initialize Lance store: {}", e.0);
None
}
};
Server { Server {
db_cache: Arc::new(std::sync::RwLock::new(HashMap::new())), db_cache: Arc::new(std::sync::RwLock::new(HashMap::new())),
option, option,
@@ -51,9 +65,17 @@ impl Server {
list_waiters: Arc::new(Mutex::new(HashMap::new())), list_waiters: Arc::new(Mutex::new(HashMap::new())),
waiter_seq: Arc::new(AtomicU64::new(1)), waiter_seq: Arc::new(AtomicU64::new(1)),
lance_store,
} }
} }
pub fn lance_store(&self) -> Result<Arc<LanceStore>, DBError> {
self.lance_store
.as_ref()
.cloned()
.ok_or_else(|| DBError("Lance store not initialized".to_string()))
}
pub fn current_storage(&self) -> Result<Arc<dyn StorageBackend>, DBError> { pub fn current_storage(&self) -> Result<Arc<dyn StorageBackend>, DBError> {
let mut cache = self.db_cache.write().unwrap(); let mut cache = self.db_cache.write().unwrap();