From 7e5da9c6eb2fcfba08b21b096a99722f3594b617 Mon Sep 17 00:00:00 2001 From: Maxime Van Hees Date: Tue, 9 Sep 2025 17:20:12 +0200 Subject: [PATCH] WIP2 --- herodb/src/main.rs | 11 +- herodb/src/rpc.rs | 258 +++++++++++++++++++-------------------- herodb/src/rpc_server.rs | 20 ++- 3 files changed, 150 insertions(+), 139 deletions(-) diff --git a/herodb/src/main.rs b/herodb/src/main.rs index 79c7ada..8aca68a 100644 --- a/herodb/src/main.rs +++ b/herodb/src/main.rs @@ -1,5 +1,7 @@ // #![allow(unused_imports)] +use std::sync::Arc; +use tokio::sync::Mutex; use tokio::net::TcpListener; use herodb::server; @@ -63,7 +65,7 @@ async fn main() { }; // new server - let server = server::Server::new(option).await; + let server = Arc::new(Mutex::new(server::Server::new(option).await)); // Add a small delay to ensure the port is ready tokio::time::sleep(std::time::Duration::from_millis(100)).await; @@ -73,7 +75,7 @@ async fn main() { let rpc_addr = format!("127.0.0.1:{}", args.rpc_port).parse().unwrap(); let base_dir = format!("{}/rpc_databases", args.dir); - match rpc_server::start_rpc_server(rpc_addr, base_dir).await { + match rpc_server::start_rpc_server(rpc_addr, Arc::clone(&server), base_dir).await { Ok(handle) => { println!("RPC management server started on port {}", args.rpc_port); Some(handle) @@ -94,9 +96,10 @@ async fn main() { Ok((stream, _)) => { println!("accepted new connection"); - let mut sc = server.clone(); + let sc = Arc::clone(&server); tokio::spawn(async move { - if let Err(e) = sc.handle(stream).await { + let mut server_guard = sc.lock().await; + if let Err(e) = server_guard.handle(stream).await { println!("error: {:?}, will close the connection. Bye", e); } }); diff --git a/herodb/src/rpc.rs b/herodb/src/rpc.rs index b8b9a36..d2aced5 100644 --- a/herodb/src/rpc.rs +++ b/herodb/src/rpc.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, Mutex}; use jsonrpsee::{core::RpcResult, proc_macros::rpc}; use serde::{Deserialize, Serialize}; @@ -41,30 +41,33 @@ pub struct DatabaseInfo { /// RPC trait for HeroDB management #[rpc(server, client, namespace = "herodb")] pub trait Rpc { - /// Create a new database with specified configuration - #[method(name = "createDatabase")] - async fn create_database( + /// Configure an existing database with specific settings + #[method(name = "configureDatabase")] + async fn configure_database( &self, - backend: BackendType, - config: DatabaseConfig, - encryption_key: Option, - ) -> RpcResult; + db_index: u64, + config: DatabaseConfig + ) -> RpcResult; - /// Set encryption for an existing database (write-only key) - #[method(name = "setEncryption")] - async fn set_encryption(&self, db_id: u64, encryption_key: String) -> RpcResult; + /// Create/pre-initialize a database at the specified index + #[method(name = "createDatabase")] + async fn create_database(&self, db_index: u64) -> RpcResult; - /// List all managed databases + /// Set encryption for a specific database index (write-only key) + #[method(name = "setDatabaseEncryption")] + async fn set_database_encryption(&self, db_index: u64, encryption_key: String) -> RpcResult; + + /// List all database indices that exist #[method(name = "listDatabases")] - async fn list_databases(&self) -> RpcResult>; + async fn list_databases(&self) -> RpcResult>; /// Get detailed information about a specific database #[method(name = "getDatabaseInfo")] - async fn get_database_info(&self, db_id: u64) -> RpcResult; + async fn get_database_info(&self, db_index: u64) -> RpcResult; - /// Delete a database + /// Delete a database and its files #[method(name = "deleteDatabase")] - async fn delete_database(&self, db_id: u64) -> RpcResult; + async fn delete_database(&self, db_index: u64) -> RpcResult; /// Get server statistics #[method(name = "getServerStats")] @@ -73,169 +76,160 @@ pub trait Rpc { /// RPC Server implementation pub struct RpcServerImpl { + /// Reference to the main Redis server + main_server: Arc>, /// Base directory for database files base_dir: String, - /// Managed database servers - servers: Arc>>>, - /// Next database ID to assign - next_db_id: Arc>, } impl RpcServerImpl { - /// Create a new RPC server instance - pub fn new(base_dir: String) -> Self { + /// Create a new RPC server instance with reference to main server + pub fn new(main_server: Arc>, base_dir: String) -> Self { Self { + main_server, base_dir, - servers: Arc::new(RwLock::new(HashMap::new())), - next_db_id: Arc::new(RwLock::new(0)), } } - - /// Get the next available database ID - async fn get_next_db_id(&self) -> u64 { - let mut id = self.next_db_id.write().await; - let current_id = *id; - *id += 1; - current_id - } } #[jsonrpsee::core::async_trait] impl RpcServer for RpcServerImpl { - async fn create_database( + async fn configure_database( &self, - backend: BackendType, - config: DatabaseConfig, - encryption_key: Option, - ) -> RpcResult { - let db_id = self.get_next_db_id().await; + db_index: u64, + config: DatabaseConfig + ) -> RpcResult { + // For now, configuration is mainly informational + // In a full implementation, this could set database-specific settings + println!("Configured database {} with settings: {:?}", db_index, config); + Ok(true) + } - // For now, only support Redb backend - match backend { - BackendType::Redb => { - // Create database directory - let db_dir = if let Some(path) = &config.storage_path { - std::path::PathBuf::from(path) - } else { - std::path::PathBuf::from(&self.base_dir).join(format!("rpc_db_{}", db_id)) - }; + async fn create_database(&self, db_index: u64) -> RpcResult { + // Pre-create the database by accessing it through the main server + let server_guard = self.main_server.lock().await; - // Ensure directory exists - std::fs::create_dir_all(&db_dir) - .map_err(|e| jsonrpsee::types::ErrorObjectOwned::owned( - -32000, - format!("Failed to create directory: {}", e), - None::<()> - ))?; + // We can't directly modify selected_db, but we can try to access the storage + // This will create the database file if it doesn't exist + // Note: This is a simplified approach - in practice, we'd need to modify the server to allow database pre-creation - // Create DB options - let encrypt = encryption_key.is_some(); - let option = DBOption { - dir: db_dir.to_string_lossy().to_string(), - port: 0, // Not used for RPC-managed databases - debug: false, - encryption_key, - encrypt, - }; + println!("Note: Database {} will be created when first accessed via Redis protocol", db_index); + println!("Use: redis-cli -p 6379, then: SELECT {}", db_index); - // Create server instance - let server = Server::new(option).await; + Ok(true) + } - // Store the server - let mut servers = self.servers.write().await; - servers.insert(db_id, Arc::new(server)); + async fn set_database_encryption(&self, db_index: u64, encryption_key: String) -> RpcResult { + // Note: Encryption is determined at database creation time based on db_index + // DB 0-9 are non-encrypted, DB 10+ are encrypted + // This method is mainly for documentation/configuration purposes + println!("Note: Database {} encryption is determined by index (10+ = encrypted)", db_index); + println!("Encryption key provided but not stored (write-only policy)"); + Ok(db_index >= 10) // Return true if this DB would be encrypted + } - Ok(db_id) + async fn list_databases(&self) -> RpcResult> { + // Scan the database directory for existing .db files + let mut db_indices = Vec::new(); + + if let Ok(entries) = std::fs::read_dir(&self.base_dir) { + for entry in entries.flatten() { + if let Some(file_name) = entry.file_name().to_str() { + if let Some(index_str) = file_name.strip_suffix(".db") { + if let Ok(index) = index_str.parse::() { + db_indices.push(index); + } + } + } } } - } - async fn set_encryption(&self, db_id: u64, _encryption_key: String) -> RpcResult { - // Note: In a real implementation, we'd need to modify the existing database - // For now, return false as encryption can only be set during creation - let _servers = self.servers.read().await; - // TODO: Implement encryption setting for existing databases - Ok(false) - } - - async fn list_databases(&self) -> RpcResult> { - let servers = self.servers.read().await; - let mut result = Vec::new(); - - for (id, server) in servers.iter() { - // Get basic info from server - let info = DatabaseInfo { - id: *id, - name: None, // TODO: Store name in server metadata - backend: BackendType::Redb, - encrypted: server.option.encrypt, - redis_version: Some("7.0".to_string()), // Default Redis compatibility - storage_path: Some(server.option.dir.clone()), - size_on_disk: None, // TODO: Calculate actual size - key_count: None, // TODO: Get key count from storage - created_at: std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs(), - last_access: None, - }; - result.push(info); + // Also include database 0 (default) even if file doesn't exist yet + if !db_indices.contains(&0) { + db_indices.push(0); } - Ok(result) + db_indices.sort(); + Ok(db_indices) } - async fn get_database_info(&self, db_id: u64) -> RpcResult { - let servers = self.servers.read().await; + async fn get_database_info(&self, db_index: u64) -> RpcResult { + // Check if database file exists + let db_path = std::path::PathBuf::from(&self.base_dir).join(format!("{}.db", db_index)); + let file_exists = db_path.exists(); - if let Some(server) = servers.get(&db_id) { - Ok(DatabaseInfo { - id: db_id, - name: None, - backend: BackendType::Redb, - encrypted: server.option.encrypt, - redis_version: Some("7.0".to_string()), - storage_path: Some(server.option.dir.clone()), - size_on_disk: None, - key_count: None, - created_at: std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs(), - last_access: None, - }) + // Get file size if it exists + let size_on_disk = if file_exists { + std::fs::metadata(&db_path).ok().map(|m| m.len()) } else { - Err(jsonrpsee::types::ErrorObjectOwned::owned( + None + }; + + Ok(DatabaseInfo { + id: db_index, + name: None, // Could be extended to store names + backend: BackendType::Redb, + encrypted: db_index >= 10, // Based on HeroDB's encryption rule + redis_version: Some("7.0".to_string()), + storage_path: Some(self.base_dir.clone()), + size_on_disk, + key_count: None, // Would need to open DB to count keys + created_at: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(), + last_access: None, + }) + } + + async fn delete_database(&self, db_index: u64) -> RpcResult { + // Don't allow deletion of database 0 (default) + if db_index == 0 { + return Err(jsonrpsee::types::ErrorObjectOwned::owned( -32000, - format!("Database {} not found", db_id), + "Cannot delete default database (index 0)".to_string(), None::<()> - )) + )); } - } - async fn delete_database(&self, db_id: u64) -> RpcResult { - let mut servers = self.servers.write().await; + let db_path = std::path::PathBuf::from(&self.base_dir).join(format!("{}.db", db_index)); - if let Some(server) = servers.remove(&db_id) { - // TODO: Clean up database files - let _ = server; - Ok(true) + if db_path.exists() { + match std::fs::remove_file(&db_path) { + Ok(_) => { + println!("Deleted database file: {}", db_path.display()); + Ok(true) + } + Err(e) => { + Err(jsonrpsee::types::ErrorObjectOwned::owned( + -32000, + format!("Failed to delete database {}: {}", db_index, e), + None::<()> + )) + } + } } else { - Ok(false) + Ok(false) // Database didn't exist } } async fn get_server_stats(&self) -> RpcResult> { - let servers = self.servers.read().await; let mut stats = HashMap::new(); - stats.insert("total_databases".to_string(), serde_json::json!(servers.len())); + // Get list of databases + let databases = self.list_databases().await.unwrap_or_default(); + + stats.insert("total_databases".to_string(), serde_json::json!(databases.len())); + stats.insert("database_indices".to_string(), serde_json::json!(databases)); stats.insert("uptime".to_string(), serde_json::json!( std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_secs() )); + let server_guard = self.main_server.lock().await; + stats.insert("server_port".to_string(), serde_json::json!(server_guard.option.port)); + stats.insert("data_directory".to_string(), serde_json::json!(self.base_dir)); Ok(stats) } diff --git a/herodb/src/rpc_server.rs b/herodb/src/rpc_server.rs index 2222d9e..ab8cb14 100644 --- a/herodb/src/rpc_server.rs +++ b/herodb/src/rpc_server.rs @@ -2,12 +2,19 @@ use std::net::SocketAddr; use jsonrpsee::server::{ServerBuilder, ServerHandle}; use jsonrpsee::RpcModule; +use std::sync::Arc; +use tokio::sync::Mutex; use crate::rpc::{RpcServer, RpcServerImpl}; +use crate::server::Server; /// Start the RPC server on the specified address -pub async fn start_rpc_server(addr: SocketAddr, base_dir: String) -> Result> { +pub async fn start_rpc_server( + addr: SocketAddr, + main_server: Arc>, + base_dir: String +) -> Result> { // Create the RPC server implementation - let rpc_impl = RpcServerImpl::new(base_dir); + let rpc_impl = RpcServerImpl::new(main_server, base_dir); // Create the RPC module let mut module = RpcModule::new(()); @@ -37,7 +44,14 @@ mod tests { let addr = "127.0.0.1:0".parse().unwrap(); // Use port 0 for auto-assignment let base_dir = "/tmp/test_rpc".to_string(); - let handle = start_rpc_server(addr, base_dir).await.unwrap(); + let main_server = Arc::new(Mutex::new(crate::server::Server::new(crate::options::DBOption { + dir: "/tmp".to_string(), + port: 0, + debug: false, + encryption_key: None, + encrypt: false, + }).await)); + let handle = start_rpc_server(addr, main_server, base_dir).await.unwrap(); // Give the server a moment to start tokio::time::sleep(Duration::from_millis(100)).await;