This commit is contained in:
Maxime Van Hees
2025-09-09 16:31:06 +02:00
parent d931770e90
commit bd77a7db48
7 changed files with 1160 additions and 3 deletions

797
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -23,6 +23,7 @@ age = "0.10"
secrecy = "0.8" secrecy = "0.8"
ed25519-dalek = "2" ed25519-dalek = "2"
base64 = "0.22" base64 = "0.22"
jsonrpsee = { version = "0.24", features = ["http-client", "ws-client", "server", "macros"] }
[dev-dependencies] [dev-dependencies]
redis = { version = "0.24", features = ["aio", "tokio-comp"] } redis = { version = "0.24", features = ["aio", "tokio-comp"] }

View File

@@ -4,5 +4,7 @@ pub mod crypto;
pub mod error; pub mod error;
pub mod options; pub mod options;
pub mod protocol; pub mod protocol;
pub mod rpc;
pub mod rpc_server;
pub mod server; pub mod server;
pub mod storage; pub mod storage;

View File

@@ -3,6 +3,7 @@
use tokio::net::TcpListener; use tokio::net::TcpListener;
use herodb::server; use herodb::server;
use herodb::rpc_server;
use clap::Parser; use clap::Parser;
@@ -30,6 +31,14 @@ struct Args {
/// Encrypt the database /// Encrypt the database
#[arg(long)] #[arg(long)]
encrypt: bool, encrypt: bool,
/// Enable RPC management server
#[arg(long)]
enable_rpc: bool,
/// RPC server port (default: 8080)
#[arg(long, default_value = "8080")]
rpc_port: u16,
} }
#[tokio::main] #[tokio::main]
@@ -46,7 +55,7 @@ async fn main() {
// new DB option // new DB option
let option = herodb::options::DBOption { let option = herodb::options::DBOption {
dir: args.dir, dir: args.dir.clone(),
port, port,
debug: args.debug, debug: args.debug,
encryption_key: args.encryption_key, encryption_key: args.encryption_key,
@@ -59,6 +68,25 @@ async fn main() {
// Add a small delay to ensure the port is ready // Add a small delay to ensure the port is ready
tokio::time::sleep(std::time::Duration::from_millis(100)).await; tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// Start RPC server if enabled
let rpc_handle = if args.enable_rpc {
let rpc_addr = format!("127.0.0.1:{}", args.rpc_port).parse().unwrap();
let base_dir = format!("{}/rpc_databases", args.dir);
match rpc_server::start_rpc_server(rpc_addr, base_dir).await {
Ok(handle) => {
println!("RPC management server started on port {}", args.rpc_port);
Some(handle)
}
Err(e) => {
eprintln!("Failed to start RPC server: {}", e);
None
}
}
} else {
None
};
// accept new connections // accept new connections
loop { loop {
let stream = listener.accept().await; let stream = listener.accept().await;

242
herodb/src/rpc.rs Normal file
View File

@@ -0,0 +1,242 @@
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use serde::{Deserialize, Serialize};
use crate::server::Server;
use crate::options::DBOption;
/// Database backend types
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum BackendType {
Redb,
// Future: InMemory, Custom(String)
}
/// Database configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DatabaseConfig {
pub name: Option<String>,
pub storage_path: Option<String>,
pub max_size: Option<u64>,
pub redis_version: Option<String>,
}
/// Database information returned by metadata queries
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DatabaseInfo {
pub id: u64,
pub name: Option<String>,
pub backend: BackendType,
pub encrypted: bool,
pub redis_version: Option<String>,
pub storage_path: Option<String>,
pub size_on_disk: Option<u64>,
pub key_count: Option<u64>,
pub created_at: u64,
pub last_access: Option<u64>,
}
/// RPC trait for HeroDB management
#[rpc(server, client, namespace = "herodb")]
pub trait Rpc {
/// Create a new database with specified configuration
#[method(name = "createDatabase")]
async fn create_database(
&self,
backend: BackendType,
config: DatabaseConfig,
encryption_key: Option<String>,
) -> RpcResult<u64>;
/// Set encryption for an existing database (write-only key)
#[method(name = "setEncryption")]
async fn set_encryption(&self, db_id: u64, encryption_key: String) -> RpcResult<bool>;
/// List all managed databases
#[method(name = "listDatabases")]
async fn list_databases(&self) -> RpcResult<Vec<DatabaseInfo>>;
/// Get detailed information about a specific database
#[method(name = "getDatabaseInfo")]
async fn get_database_info(&self, db_id: u64) -> RpcResult<DatabaseInfo>;
/// Delete a database
#[method(name = "deleteDatabase")]
async fn delete_database(&self, db_id: u64) -> RpcResult<bool>;
/// Get server statistics
#[method(name = "getServerStats")]
async fn get_server_stats(&self) -> RpcResult<HashMap<String, serde_json::Value>>;
}
/// RPC Server implementation
pub struct RpcServerImpl {
/// Base directory for database files
base_dir: String,
/// Managed database servers
servers: Arc<RwLock<HashMap<u64, Arc<Server>>>>,
/// Next database ID to assign
next_db_id: Arc<RwLock<u64>>,
}
impl RpcServerImpl {
/// Create a new RPC server instance
pub fn new(base_dir: String) -> Self {
Self {
base_dir,
servers: Arc::new(RwLock::new(HashMap::new())),
next_db_id: Arc::new(RwLock::new(0)),
}
}
/// Get the next available database ID
async fn get_next_db_id(&self) -> u64 {
let mut id = self.next_db_id.write().await;
let current_id = *id;
*id += 1;
current_id
}
}
#[jsonrpsee::core::async_trait]
impl RpcServer for RpcServerImpl {
async fn create_database(
&self,
backend: BackendType,
config: DatabaseConfig,
encryption_key: Option<String>,
) -> RpcResult<u64> {
let db_id = self.get_next_db_id().await;
// For now, only support Redb backend
match backend {
BackendType::Redb => {
// Create database directory
let db_dir = if let Some(path) = &config.storage_path {
std::path::PathBuf::from(path)
} else {
std::path::PathBuf::from(&self.base_dir).join(format!("rpc_db_{}", db_id))
};
// Ensure directory exists
std::fs::create_dir_all(&db_dir)
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned(
-32000,
format!("Failed to create directory: {}", e),
None::<()>
))?;
// Create DB options
let encrypt = encryption_key.is_some();
let option = DBOption {
dir: db_dir.to_string_lossy().to_string(),
port: 0, // Not used for RPC-managed databases
debug: false,
encryption_key,
encrypt,
};
// Create server instance
let server = Server::new(option).await;
// Store the server
let mut servers = self.servers.write().await;
servers.insert(db_id, Arc::new(server));
Ok(db_id)
}
}
}
async fn set_encryption(&self, db_id: u64, _encryption_key: String) -> RpcResult<bool> {
// Note: In a real implementation, we'd need to modify the existing database
// For now, return false as encryption can only be set during creation
let _servers = self.servers.read().await;
// TODO: Implement encryption setting for existing databases
Ok(false)
}
async fn list_databases(&self) -> RpcResult<Vec<DatabaseInfo>> {
let servers = self.servers.read().await;
let mut result = Vec::new();
for (id, server) in servers.iter() {
// Get basic info from server
let info = DatabaseInfo {
id: *id,
name: None, // TODO: Store name in server metadata
backend: BackendType::Redb,
encrypted: server.option.encrypt,
redis_version: Some("7.0".to_string()), // Default Redis compatibility
storage_path: Some(server.option.dir.clone()),
size_on_disk: None, // TODO: Calculate actual size
key_count: None, // TODO: Get key count from storage
created_at: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
last_access: None,
};
result.push(info);
}
Ok(result)
}
async fn get_database_info(&self, db_id: u64) -> RpcResult<DatabaseInfo> {
let servers = self.servers.read().await;
if let Some(server) = servers.get(&db_id) {
Ok(DatabaseInfo {
id: db_id,
name: None,
backend: BackendType::Redb,
encrypted: server.option.encrypt,
redis_version: Some("7.0".to_string()),
storage_path: Some(server.option.dir.clone()),
size_on_disk: None,
key_count: None,
created_at: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
last_access: None,
})
} else {
Err(jsonrpsee::types::ErrorObjectOwned::owned(
-32000,
format!("Database {} not found", db_id),
None::<()>
))
}
}
async fn delete_database(&self, db_id: u64) -> RpcResult<bool> {
let mut servers = self.servers.write().await;
if let Some(server) = servers.remove(&db_id) {
// TODO: Clean up database files
let _ = server;
Ok(true)
} else {
Ok(false)
}
}
async fn get_server_stats(&self) -> RpcResult<HashMap<String, serde_json::Value>> {
let servers = self.servers.read().await;
let mut stats = HashMap::new();
stats.insert("total_databases".to_string(), serde_json::json!(servers.len()));
stats.insert("uptime".to_string(), serde_json::json!(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
));
Ok(stats)
}
}

49
herodb/src/rpc_server.rs Normal file
View File

@@ -0,0 +1,49 @@
use std::net::SocketAddr;
use jsonrpsee::server::{ServerBuilder, ServerHandle};
use jsonrpsee::RpcModule;
use crate::rpc::{RpcServer, RpcServerImpl};
/// Start the RPC server on the specified address
pub async fn start_rpc_server(addr: SocketAddr, base_dir: String) -> Result<ServerHandle, Box<dyn std::error::Error + Send + Sync>> {
// Create the RPC server implementation
let rpc_impl = RpcServerImpl::new(base_dir);
// Create the RPC module
let mut module = RpcModule::new(());
module.merge(RpcServer::into_rpc(rpc_impl))?;
// Build the server with both HTTP and WebSocket support
let server = ServerBuilder::default()
.http_only() // Start with HTTP only, can be extended to WS
.build(addr)
.await?;
// Start the server
let handle = server.start(module);
println!("RPC server started on {}", addr);
Ok(handle)
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[tokio::test]
async fn test_rpc_server_startup() {
let addr = "127.0.0.1:0".parse().unwrap(); // Use port 0 for auto-assignment
let base_dir = "/tmp/test_rpc".to_string();
let handle = start_rpc_server(addr, base_dir).await.unwrap();
// Give the server a moment to start
tokio::time::sleep(Duration::from_millis(100)).await;
// Stop the server
handle.stop().unwrap();
handle.stopped().await;
}
}

42
herodb/tests/rpc_tests.rs Normal file
View File

@@ -0,0 +1,42 @@
use std::net::SocketAddr;
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::core::client::ClientT;
use serde_json::json;
use herodb::rpc::{RpcClient, BackendType, DatabaseConfig};
#[tokio::test]
async fn test_rpc_server_basic() {
// This test would require starting the RPC server in a separate thread
// For now, we'll just test that the types compile correctly
// Test serialization of types
let backend = BackendType::Redb;
let config = DatabaseConfig {
name: Some("test_db".to_string()),
storage_path: Some("/tmp/test".to_string()),
max_size: Some(1024 * 1024),
redis_version: Some("7.0".to_string()),
};
let backend_json = serde_json::to_string(&backend).unwrap();
let config_json = serde_json::to_string(&config).unwrap();
assert_eq!(backend_json, "\"Redb\"");
assert!(config_json.contains("test_db"));
}
#[tokio::test]
async fn test_database_config_serialization() {
let config = DatabaseConfig {
name: Some("my_db".to_string()),
storage_path: None,
max_size: Some(1000000),
redis_version: Some("7.0".to_string()),
};
let json = serde_json::to_value(&config).unwrap();
assert_eq!(json["name"], "my_db");
assert_eq!(json["max_size"], 1000000);
assert_eq!(json["redis_version"], "7.0");
}