This commit is contained in:
Maxime Van Hees
2025-09-09 17:20:12 +02:00
parent bd77a7db48
commit 7e5da9c6eb
3 changed files with 150 additions and 139 deletions

View File

@@ -1,5 +1,7 @@
// #![allow(unused_imports)] // #![allow(unused_imports)]
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use herodb::server; use herodb::server;
@@ -63,7 +65,7 @@ async fn main() {
}; };
// new server // new server
let server = server::Server::new(option).await; let server = Arc::new(Mutex::new(server::Server::new(option).await));
// Add a small delay to ensure the port is ready // Add a small delay to ensure the port is ready
tokio::time::sleep(std::time::Duration::from_millis(100)).await; tokio::time::sleep(std::time::Duration::from_millis(100)).await;
@@ -73,7 +75,7 @@ async fn main() {
let rpc_addr = format!("127.0.0.1:{}", args.rpc_port).parse().unwrap(); let rpc_addr = format!("127.0.0.1:{}", args.rpc_port).parse().unwrap();
let base_dir = format!("{}/rpc_databases", args.dir); let base_dir = format!("{}/rpc_databases", args.dir);
match rpc_server::start_rpc_server(rpc_addr, base_dir).await { match rpc_server::start_rpc_server(rpc_addr, Arc::clone(&server), base_dir).await {
Ok(handle) => { Ok(handle) => {
println!("RPC management server started on port {}", args.rpc_port); println!("RPC management server started on port {}", args.rpc_port);
Some(handle) Some(handle)
@@ -94,9 +96,10 @@ async fn main() {
Ok((stream, _)) => { Ok((stream, _)) => {
println!("accepted new connection"); println!("accepted new connection");
let mut sc = server.clone(); let sc = Arc::clone(&server);
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = sc.handle(stream).await { let mut server_guard = sc.lock().await;
if let Err(e) = server_guard.handle(stream).await {
println!("error: {:?}, will close the connection. Bye", e); println!("error: {:?}, will close the connection. Bye", e);
} }
}); });

View File

@@ -1,6 +1,6 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock; use tokio::sync::{RwLock, Mutex};
use jsonrpsee::{core::RpcResult, proc_macros::rpc}; use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -41,30 +41,33 @@ pub struct DatabaseInfo {
/// RPC trait for HeroDB management /// RPC trait for HeroDB management
#[rpc(server, client, namespace = "herodb")] #[rpc(server, client, namespace = "herodb")]
pub trait Rpc { pub trait Rpc {
/// Create a new database with specified configuration /// Configure an existing database with specific settings
#[method(name = "createDatabase")] #[method(name = "configureDatabase")]
async fn create_database( async fn configure_database(
&self, &self,
backend: BackendType, db_index: u64,
config: DatabaseConfig, config: DatabaseConfig
encryption_key: Option<String>, ) -> RpcResult<bool>;
) -> RpcResult<u64>;
/// Set encryption for an existing database (write-only key) /// Create/pre-initialize a database at the specified index
#[method(name = "setEncryption")] #[method(name = "createDatabase")]
async fn set_encryption(&self, db_id: u64, encryption_key: String) -> RpcResult<bool>; async fn create_database(&self, db_index: u64) -> RpcResult<bool>;
/// List all managed databases /// Set encryption for a specific database index (write-only key)
#[method(name = "setDatabaseEncryption")]
async fn set_database_encryption(&self, db_index: u64, encryption_key: String) -> RpcResult<bool>;
/// List all database indices that exist
#[method(name = "listDatabases")] #[method(name = "listDatabases")]
async fn list_databases(&self) -> RpcResult<Vec<DatabaseInfo>>; async fn list_databases(&self) -> RpcResult<Vec<u64>>;
/// Get detailed information about a specific database /// Get detailed information about a specific database
#[method(name = "getDatabaseInfo")] #[method(name = "getDatabaseInfo")]
async fn get_database_info(&self, db_id: u64) -> RpcResult<DatabaseInfo>; async fn get_database_info(&self, db_index: u64) -> RpcResult<DatabaseInfo>;
/// Delete a database /// Delete a database and its files
#[method(name = "deleteDatabase")] #[method(name = "deleteDatabase")]
async fn delete_database(&self, db_id: u64) -> RpcResult<bool>; async fn delete_database(&self, db_index: u64) -> RpcResult<bool>;
/// Get server statistics /// Get server statistics
#[method(name = "getServerStats")] #[method(name = "getServerStats")]
@@ -73,169 +76,160 @@ pub trait Rpc {
/// RPC Server implementation /// RPC Server implementation
pub struct RpcServerImpl { pub struct RpcServerImpl {
/// Reference to the main Redis server
main_server: Arc<Mutex<Server>>,
/// Base directory for database files /// Base directory for database files
base_dir: String, base_dir: String,
/// Managed database servers
servers: Arc<RwLock<HashMap<u64, Arc<Server>>>>,
/// Next database ID to assign
next_db_id: Arc<RwLock<u64>>,
} }
impl RpcServerImpl { impl RpcServerImpl {
/// Create a new RPC server instance /// Create a new RPC server instance with reference to main server
pub fn new(base_dir: String) -> Self { pub fn new(main_server: Arc<Mutex<Server>>, base_dir: String) -> Self {
Self { Self {
main_server,
base_dir, base_dir,
servers: Arc::new(RwLock::new(HashMap::new())),
next_db_id: Arc::new(RwLock::new(0)),
} }
} }
/// Get the next available database ID
async fn get_next_db_id(&self) -> u64 {
let mut id = self.next_db_id.write().await;
let current_id = *id;
*id += 1;
current_id
}
} }
#[jsonrpsee::core::async_trait] #[jsonrpsee::core::async_trait]
impl RpcServer for RpcServerImpl { impl RpcServer for RpcServerImpl {
async fn create_database( async fn configure_database(
&self, &self,
backend: BackendType, db_index: u64,
config: DatabaseConfig, config: DatabaseConfig
encryption_key: Option<String>, ) -> RpcResult<bool> {
) -> RpcResult<u64> { // For now, configuration is mainly informational
let db_id = self.get_next_db_id().await; // In a full implementation, this could set database-specific settings
println!("Configured database {} with settings: {:?}", db_index, config);
Ok(true)
}
// For now, only support Redb backend async fn create_database(&self, db_index: u64) -> RpcResult<bool> {
match backend { // Pre-create the database by accessing it through the main server
BackendType::Redb => { let server_guard = self.main_server.lock().await;
// Create database directory
let db_dir = if let Some(path) = &config.storage_path {
std::path::PathBuf::from(path)
} else {
std::path::PathBuf::from(&self.base_dir).join(format!("rpc_db_{}", db_id))
};
// Ensure directory exists // We can't directly modify selected_db, but we can try to access the storage
std::fs::create_dir_all(&db_dir) // This will create the database file if it doesn't exist
.map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned( // Note: This is a simplified approach - in practice, we'd need to modify the server to allow database pre-creation
-32000,
format!("Failed to create directory: {}", e),
None::<()>
))?;
// Create DB options println!("Note: Database {} will be created when first accessed via Redis protocol", db_index);
let encrypt = encryption_key.is_some(); println!("Use: redis-cli -p 6379, then: SELECT {}", db_index);
let option = DBOption {
dir: db_dir.to_string_lossy().to_string(),
port: 0, // Not used for RPC-managed databases
debug: false,
encryption_key,
encrypt,
};
// Create server instance Ok(true)
let server = Server::new(option).await; }
// Store the server async fn set_database_encryption(&self, db_index: u64, encryption_key: String) -> RpcResult<bool> {
let mut servers = self.servers.write().await; // Note: Encryption is determined at database creation time based on db_index
servers.insert(db_id, Arc::new(server)); // DB 0-9 are non-encrypted, DB 10+ are encrypted
// This method is mainly for documentation/configuration purposes
println!("Note: Database {} encryption is determined by index (10+ = encrypted)", db_index);
println!("Encryption key provided but not stored (write-only policy)");
Ok(db_index >= 10) // Return true if this DB would be encrypted
}
Ok(db_id) async fn list_databases(&self) -> RpcResult<Vec<u64>> {
// Scan the database directory for existing .db files
let mut db_indices = Vec::new();
if let Ok(entries) = std::fs::read_dir(&self.base_dir) {
for entry in entries.flatten() {
if let Some(file_name) = entry.file_name().to_str() {
if let Some(index_str) = file_name.strip_suffix(".db") {
if let Ok(index) = index_str.parse::<u64>() {
db_indices.push(index);
}
}
}
} }
} }
}
async fn set_encryption(&self, db_id: u64, _encryption_key: String) -> RpcResult<bool> { // Also include database 0 (default) even if file doesn't exist yet
// Note: In a real implementation, we'd need to modify the existing database if !db_indices.contains(&0) {
// For now, return false as encryption can only be set during creation db_indices.push(0);
let _servers = self.servers.read().await;
// TODO: Implement encryption setting for existing databases
Ok(false)
}
async fn list_databases(&self) -> RpcResult<Vec<DatabaseInfo>> {
let servers = self.servers.read().await;
let mut result = Vec::new();
for (id, server) in servers.iter() {
// Get basic info from server
let info = DatabaseInfo {
id: *id,
name: None, // TODO: Store name in server metadata
backend: BackendType::Redb,
encrypted: server.option.encrypt,
redis_version: Some("7.0".to_string()), // Default Redis compatibility
storage_path: Some(server.option.dir.clone()),
size_on_disk: None, // TODO: Calculate actual size
key_count: None, // TODO: Get key count from storage
created_at: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
last_access: None,
};
result.push(info);
} }
Ok(result) db_indices.sort();
Ok(db_indices)
} }
async fn get_database_info(&self, db_id: u64) -> RpcResult<DatabaseInfo> { async fn get_database_info(&self, db_index: u64) -> RpcResult<DatabaseInfo> {
let servers = self.servers.read().await; // Check if database file exists
let db_path = std::path::PathBuf::from(&self.base_dir).join(format!("{}.db", db_index));
let file_exists = db_path.exists();
if let Some(server) = servers.get(&db_id) { // Get file size if it exists
Ok(DatabaseInfo { let size_on_disk = if file_exists {
id: db_id, std::fs::metadata(&db_path).ok().map(|m| m.len())
name: None,
backend: BackendType::Redb,
encrypted: server.option.encrypt,
redis_version: Some("7.0".to_string()),
storage_path: Some(server.option.dir.clone()),
size_on_disk: None,
key_count: None,
created_at: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
last_access: None,
})
} else { } else {
Err(jsonrpsee::types::ErrorObjectOwned::owned( None
};
Ok(DatabaseInfo {
id: db_index,
name: None, // Could be extended to store names
backend: BackendType::Redb,
encrypted: db_index >= 10, // Based on HeroDB's encryption rule
redis_version: Some("7.0".to_string()),
storage_path: Some(self.base_dir.clone()),
size_on_disk,
key_count: None, // Would need to open DB to count keys
created_at: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
last_access: None,
})
}
async fn delete_database(&self, db_index: u64) -> RpcResult<bool> {
// Don't allow deletion of database 0 (default)
if db_index == 0 {
return Err(jsonrpsee::types::ErrorObjectOwned::owned(
-32000, -32000,
format!("Database {} not found", db_id), "Cannot delete default database (index 0)".to_string(),
None::<()> None::<()>
)) ));
} }
}
async fn delete_database(&self, db_id: u64) -> RpcResult<bool> { let db_path = std::path::PathBuf::from(&self.base_dir).join(format!("{}.db", db_index));
let mut servers = self.servers.write().await;
if let Some(server) = servers.remove(&db_id) { if db_path.exists() {
// TODO: Clean up database files match std::fs::remove_file(&db_path) {
let _ = server; Ok(_) => {
Ok(true) println!("Deleted database file: {}", db_path.display());
Ok(true)
}
Err(e) => {
Err(jsonrpsee::types::ErrorObjectOwned::owned(
-32000,
format!("Failed to delete database {}: {}", db_index, e),
None::<()>
))
}
}
} else { } else {
Ok(false) Ok(false) // Database didn't exist
} }
} }
async fn get_server_stats(&self) -> RpcResult<HashMap<String, serde_json::Value>> { async fn get_server_stats(&self) -> RpcResult<HashMap<String, serde_json::Value>> {
let servers = self.servers.read().await;
let mut stats = HashMap::new(); let mut stats = HashMap::new();
stats.insert("total_databases".to_string(), serde_json::json!(servers.len())); // Get list of databases
let databases = self.list_databases().await.unwrap_or_default();
stats.insert("total_databases".to_string(), serde_json::json!(databases.len()));
stats.insert("database_indices".to_string(), serde_json::json!(databases));
stats.insert("uptime".to_string(), serde_json::json!( stats.insert("uptime".to_string(), serde_json::json!(
std::time::SystemTime::now() std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH) .duration_since(std::time::UNIX_EPOCH)
.unwrap() .unwrap()
.as_secs() .as_secs()
)); ));
let server_guard = self.main_server.lock().await;
stats.insert("server_port".to_string(), serde_json::json!(server_guard.option.port));
stats.insert("data_directory".to_string(), serde_json::json!(self.base_dir));
Ok(stats) Ok(stats)
} }

View File

@@ -2,12 +2,19 @@ use std::net::SocketAddr;
use jsonrpsee::server::{ServerBuilder, ServerHandle}; use jsonrpsee::server::{ServerBuilder, ServerHandle};
use jsonrpsee::RpcModule; use jsonrpsee::RpcModule;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::rpc::{RpcServer, RpcServerImpl}; use crate::rpc::{RpcServer, RpcServerImpl};
use crate::server::Server;
/// Start the RPC server on the specified address /// Start the RPC server on the specified address
pub async fn start_rpc_server(addr: SocketAddr, base_dir: String) -> Result<ServerHandle, Box<dyn std::error::Error + Send + Sync>> { pub async fn start_rpc_server(
addr: SocketAddr,
main_server: Arc<Mutex<Server>>,
base_dir: String
) -> Result<ServerHandle, Box<dyn std::error::Error + Send + Sync>> {
// Create the RPC server implementation // Create the RPC server implementation
let rpc_impl = RpcServerImpl::new(base_dir); let rpc_impl = RpcServerImpl::new(main_server, base_dir);
// Create the RPC module // Create the RPC module
let mut module = RpcModule::new(()); let mut module = RpcModule::new(());
@@ -37,7 +44,14 @@ mod tests {
let addr = "127.0.0.1:0".parse().unwrap(); // Use port 0 for auto-assignment let addr = "127.0.0.1:0".parse().unwrap(); // Use port 0 for auto-assignment
let base_dir = "/tmp/test_rpc".to_string(); let base_dir = "/tmp/test_rpc".to_string();
let handle = start_rpc_server(addr, base_dir).await.unwrap(); let main_server = Arc::new(Mutex::new(crate::server::Server::new(crate::options::DBOption {
dir: "/tmp".to_string(),
port: 0,
debug: false,
encryption_key: None,
encrypt: false,
}).await));
let handle = start_rpc_server(addr, main_server, base_dir).await.unwrap();
// Give the server a moment to start // Give the server a moment to start
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;