management_rpc_server #13

Open
maximevanhees wants to merge 7 commits from management_rpc_server into main
18 changed files with 1874 additions and 120 deletions

926
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -24,6 +24,12 @@ cargo build --release
./target/release/herodb --dir /path/to/db --port 6379
```
## RPC Server
HeroDB includes an optional JSON-RPC 2.0 management server for database administration tasks. Enable it with the `--enable-rpc` flag and specify the port with `--rpc-port` (default: 8080).
For a complete list of available RPC commands and usage examples, see [RPC_COMMANDS.md](RPC_COMMANDS.md).
### Options
- `--dir`: Database directory (required)
@@ -31,6 +37,8 @@ cargo build --release
- `--debug`: Enable debug logging
- `--encrypt`: Enable database encryption
- `--encryption-key`: Master encryption key for encrypted databases
- `--enable-rpc`: Enable RPC management server
- `--rpc-port`: RPC server port (default: 8080)
### Examples

93
RPC_COMMANDS.md Normal file
View File

@@ -0,0 +1,93 @@
# HeroDB RPC Commands
HeroDB provides a JSON-RPC 2.0 interface for database management operations. The RPC server runs on a separate port (default 8080) and can be enabled with the `--enable-rpc` flag.
All RPC methods are prefixed with the namespace `herodb`. With the exception fo the `rpc.discover` call (using the `rpc` namespace), which returns the OpenRPC spec.
## Available Commands
### herodb_listDatabases
Lists all database indices that exist.
**Example:**
```bash
curl -X POST http://localhost:8080 \
-H "Content-Type: application/json" \
-d '{"jsonrpc": "2.0", "method": "herodb_listDatabases", "id": 1}'
```
### herodb_createDatabase
Creates a new database at the specified index.
**Parameters:**
- `db_index` (number): Database index to create
**Example:**
```bash
curl -X POST http://localhost:8080 \
-H "Content-Type: application/json" \
-d '{"jsonrpc": "2.0", "method": "herodb_createDatabase", "params": [1], "id": 1}'
```
### herodb_getDatabaseInfo
Retrieves detailed information about a specific database.
**Parameters:**
- `db_index` (number): Database index
**Example:**
```bash
curl -X POST http://localhost:8080 \
-H "Content-Type: application/json" \
-d '{"jsonrpc": "2.0", "method": "herodb_getDatabaseInfo", "params": [0], "id": 1}'
```
### herodb_configureDatabase
Configures an existing database with specific settings.
**Parameters:**
- `db_index` (number): Database index
- `config` (object): Configuration object
**Example:**
```bash
curl -X POST http://localhost:8080 \
-H "Content-Type: application/json" \
-d '{"jsonrpc": "2.0", "method": "herodb_configureDatabase", "params": [0, {"name": "test", "max_size": 1048576}], "id": 1}'
```
### herodb_setDatabaseEncryption
Sets encryption for a specific database index.
**Parameters:**
- `db_index` (number): Database index
- `encryption_key` (string): Encryption key
**Example:**
```bash
curl -X POST http://localhost:8080 \
-H "Content-Type: application/json" \
-d '{"jsonrpc": "2.0", "method": "herodb_setDatabaseEncryption", "params": [10, "my-secret-key"], "id": 1}'
```
### herodb_deleteDatabase
Deletes a database and its files.
**Parameters:**
- `db_index` (number): Database index to delete
**Example:**
```bash
curl -X POST http://localhost:8080 \
-H "Content-Type: application/json" \
-d '{"jsonrpc": "2.0", "method": "herodb_deleteDatabase", "params": [1], "id": 1}'
```
### herodb_getServerStats
Retrieves server statistics.
**Example:**
```bash
curl -X POST http://localhost:8080 \
-H "Content-Type: application/json" \
-d '{"jsonrpc": "2.0", "method": "herodb_getServerStats", "id": 1}'

View File

@@ -23,6 +23,7 @@ age = "0.10"
secrecy = "0.8"
ed25519-dalek = "2"
base64 = "0.22"
jsonrpsee = { version = "0.26", features = ["http-client", "ws-client", "server", "macros"] }
[dev-dependencies]
redis = { version = "0.24", features = ["aio", "tokio-comp"] }

290
herodb/docs/openrpc.json Normal file
View File

@@ -0,0 +1,290 @@
{
"openrpc": "1.2.6",
"info": {
"title": "HeroDB RPC API",
"version": "0.0.1",
"description": "Database management API for HeroDB"
},
"servers": [
{
"name": "HeroDB Server",
"url": "http://localhost:8080"
}
],
"methods": [
{
"name": "herodb_configureDatabase",
"summary": "Configure an existing database with specific settings",
"params": [
{
"name": "db_index",
"description": "Database index to configure",
"schema": {
"type": "integer",
"minimum": 0
},
"required": true
},
{
"name": "config",
"description": "Configuration object",
"schema": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"storage_path": {
"type": "string"
},
"max_size": {
"type": "integer"
},
"redis_version": {
"type": "string"
}
}
},
"required": true
}
],
"result": {
"name": "success",
"schema": {
"type": "boolean"
}
}
},
{
"name": "herodb_createDatabase",
"summary": "Create/pre-initialize a database at the specified index",
"params": [
{
"name": "db_index",
"description": "Database index to create",
"schema": {
"type": "integer",
"minimum": 0
},
"required": true
}
],
"result": {
"name": "success",
"schema": {
"type": "boolean"
}
}
},
{
"name": "herodb_setDatabaseEncryption",
"summary": "Set encryption for a specific database index",
"params": [
{
"name": "db_index",
"description": "Database index",
"schema": {
"type": "integer",
"minimum": 0
},
"required": true
},
{
"name": "encryption_key",
"description": "Encryption key (write-only)",
"schema": {
"type": "string"
},
"required": true
}
],
"result": {
"name": "success",
"schema": {
"type": "boolean"
}
}
},
{
"name": "herodb_listDatabases",
"summary": "List all database indices that exist",
"params": [],
"result": {
"name": "database_indices",
"schema": {
"type": "array",
"items": {
"type": "integer"
}
}
}
},
{
"name": "herodb_getDatabaseInfo",
"summary": "Get detailed information about a specific database",
"params": [
{
"name": "db_index",
"description": "Database index",
"schema": {
"type": "integer",
"minimum": 0
},
"required": true
}
],
"result": {
"name": "database_info",
"schema": {
"type": "object",
"properties": {
"id": {
"type": "integer"
},
"name": {
"type": "string",
"nullable": true
},
"backend": {
"type": "string",
"enum": ["Redb"]
},
"encrypted": {
"type": "boolean"
},
"redis_version": {
"type": "string",
"nullable": true
},
"storage_path": {
"type": "string",
"nullable": true
},
"size_on_disk": {
"type": "integer",
"nullable": true
},
"key_count": {
"type": "integer",
"nullable": true
},
"created_at": {
"type": "integer"
},
"last_access": {
"type": "integer",
"nullable": true
}
}
}
}
},
{
"name": "herodb_deleteDatabase",
"summary": "Delete a database and its files",
"params": [
{
"name": "db_index",
"description": "Database index to delete",
"schema": {
"type": "integer",
"minimum": 0
},
"required": true
}
],
"result": {
"name": "success",
"schema": {
"type": "boolean"
}
}
},
{
"name": "herodb_getServerStats",
"summary": "Get server statistics",
"params": [],
"result": {
"name": "stats",
"schema": {
"type": "object",
"additionalProperties": {
"oneOf": [
{"type": "string"},
{"type": "integer"},
{"type": "boolean"},
{"type": "array"}
]
}
}
}
}
],
"components": {
"schemas": {
"DatabaseConfig": {
"type": "object",
"properties": {
"name": {
"type": "string",
"nullable": true
},
"storage_path": {
"type": "string",
"nullable": true
},
"max_size": {
"type": "integer",
"nullable": true
},
"redis_version": {
"type": "string",
"nullable": true
}
}
},
"DatabaseInfo": {
"type": "object",
"properties": {
"id": {
"type": "integer"
},
"name": {
"type": "string",
"nullable": true
},
"backend": {
"type": "string",
"enum": ["Redb", "InMemory", "Custom"]
},
"encrypted": {
"type": "boolean"
},
"redis_version": {
"type": "string",
"nullable": true
},
"storage_path": {
"type": "string",
"nullable": true
},
"size_on_disk": {
"type": "integer",
"nullable": true
},
"key_count": {
"type": "integer",
"nullable": true
},
"created_at": {
"type": "integer"
},
"last_access": {
"type": "integer",
"nullable": true
}
}
}
}
}
}

View File

@@ -1,8 +1,11 @@
pub mod age; // NEW
pub mod age;
pub mod cmd;
pub mod crypto;
pub mod error;
pub mod options;
pub mod protocol;
pub mod rpc;
pub mod rpc_server;
pub mod server;
pub mod storage;
pub mod openrpc_spec;

View File

@@ -1,8 +1,10 @@
// #![allow(unused_imports)]
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::net::TcpListener;
use herodb::server;
use herodb::server::Server;
use herodb::rpc_server;
use clap::Parser;
@@ -30,6 +32,14 @@ struct Args {
/// Encrypt the database
#[arg(long)]
encrypt: bool,
/// Enable RPC management server
#[arg(long)]
enable_rpc: bool,
/// RPC server port (default: 8080)
#[arg(long, default_value = "8080")]
rpc_port: u16,
}
#[tokio::main]
@@ -46,7 +56,7 @@ async fn main() {
// new DB option
let option = herodb::options::DBOption {
dir: args.dir,
dir: args.dir.clone(),
port,
debug: args.debug,
encryption_key: args.encryption_key,
@@ -54,11 +64,30 @@ async fn main() {
};
// new server
let server = server::Server::new(option).await;
let server = Arc::new(Mutex::new(server::Server::new(option).await));
// Add a small delay to ensure the port is ready
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// Start RPC server if enabled
let _rpc_handle = if args.enable_rpc {
let rpc_addr = format!("127.0.0.1:{}", args.rpc_port).parse().unwrap();
let base_dir = args.dir.clone();
match rpc_server::start_rpc_server(rpc_addr, Arc::clone(&server), base_dir).await {
Ok(handle) => {
println!("RPC management server started on port {}", args.rpc_port);
Some(handle)
}
Err(e) => {
eprintln!("Failed to start RPC server: {}", e);
None
}
}
} else {
None
};
// accept new connections
loop {
let stream = listener.accept().await;
@@ -66,9 +95,9 @@ async fn main() {
Ok((stream, _)) => {
println!("accepted new connection");
let mut sc = server.clone();
let sc = Arc::clone(&server);
tokio::spawn(async move {
if let Err(e) = sc.handle(stream).await {
if let Err(e) = Server::handle(sc, stream).await {
println!("error: {:?}, will close the connection. Bye", e);
}
});

View File

@@ -0,0 +1,2 @@
/// The OpenRPC specification for the HeroDB JSON-RPC API
pub const OPENRPC_SPEC: &str = include_str!("../docs/openrpc.json");

300
herodb/src/rpc.rs Normal file
View File

@@ -0,0 +1,300 @@
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::server::Server;
use crate::openrpc_spec::OPENRPC_SPEC;
/// Database backend types
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum BackendType {
Redb,
// Future: InMemory, Custom(String)
}
/// Database configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DatabaseConfig {
pub name: Option<String>,
pub storage_path: Option<String>,
pub max_size: Option<u64>,
pub redis_version: Option<String>,
}
/// Database information returned by metadata queries
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DatabaseInfo {
pub id: u64,
pub name: Option<String>,
pub backend: BackendType,
pub encrypted: bool,
pub redis_version: Option<String>,
pub storage_path: Option<String>,
pub size_on_disk: Option<u64>,
pub key_count: Option<u64>,
pub created_at: u64,
pub last_access: Option<u64>,
}
/// RPC trait for HeroDB management
#[rpc(client, server, namespace = "herodb")]
pub trait Rpc {
/// Configure an existing database with specific settings
#[method(name = "configureDatabase")]
async fn configure_database(
&self,
db_index: u64,
config: DatabaseConfig
) -> RpcResult<bool>;
/// Create/pre-initialize a database at the specified index
#[method(name = "createDatabase")]
async fn create_database(&self, db_index: u64) -> RpcResult<bool>;
/// Set encryption for a specific database index (write-only key)
#[method(name = "setDatabaseEncryption")]
async fn set_database_encryption(&self, db_index: u64, encryption_key: String) -> RpcResult<bool>;
/// List all database indices that exist
#[method(name = "listDatabases")]
async fn list_databases(&self) -> RpcResult<Vec<u64>>;
/// Get detailed information about a specific database
#[method(name = "getDatabaseInfo")]
async fn get_database_info(&self, db_index: u64) -> RpcResult<DatabaseInfo>;
/// Delete a database and its files
#[method(name = "deleteDatabase")]
async fn delete_database(&self, db_index: u64) -> RpcResult<bool>;
/// Get server statistics
#[method(name = "getServerStats")]
async fn get_server_stats(&self) -> RpcResult<HashMap<String, serde_json::Value>>;
}
/// RPC Discovery trait for API introspection
#[rpc(client, server, namespace = "rpc", namespace_separator = ".")]
pub trait RpcDiscovery {
/// Get the OpenRPC specification for API discovery
#[method(name = "discover")]
async fn discover(&self) -> RpcResult<Value>;
}
/// RPC Server implementation
#[derive(Clone)]
pub struct RpcServerImpl {
/// Reference to the main Redis server
main_server: Arc<Mutex<Server>>,
/// Base directory for database files
base_dir: String,
}
impl RpcServerImpl {
/// Create a new RPC server instance with reference to main server
pub fn new(main_server: Arc<Mutex<Server>>, base_dir: String) -> Self {
Self {
main_server,
base_dir,
}
}
}
#[jsonrpsee::core::async_trait]
impl RpcServer for RpcServerImpl {
async fn configure_database(
&self,
db_index: u64,
config: DatabaseConfig
) -> RpcResult<bool> {
// For now, configuration is mainly informational
// In a full implementation, this could set database-specific settings
println!("Configured database {} with settings: {:?}", db_index, config);
Ok(true)
}
async fn create_database(&self, db_index: u64) -> RpcResult<bool> {
// Lock the main server to create the database
let mut server_guard = self.main_server.lock().await;
// Save the current selected_db to restore it later
let original_db = server_guard.selected_db;
// Temporarily set the selected_db to the target database
server_guard.selected_db = db_index;
// Call current_storage() which will create the database file if it doesn't exist
match server_guard.current_storage() {
Ok(_) => {
println!("Successfully created database at index {}", db_index);
// Restore the original selected_db
server_guard.selected_db = original_db;
Ok(true)
}
Err(e) => {
// Restore the original selected_db even on error
server_guard.selected_db = original_db;
Err(jsonrpsee::types::ErrorObjectOwned::owned(
-32000,
format!("Failed to create database {}: {}", db_index, e.0),
None::<()>
))
}
}
}
async fn set_database_encryption(&self, db_index: u64, _encryption_key: String) -> RpcResult<bool> {
// Note: Encryption is determined at database creation time based on db_index
// DB 0-9 are non-encrypted, DB 10+ are encrypted
// This method is mainly for documentation/configuration purposes
println!("Note: Database {} encryption is determined by index (10+ = encrypted)", db_index);
println!("Encryption key provided but not stored (write-only policy)");
Ok(db_index >= 10) // Return true if this DB would be encrypted
}
async fn list_databases(&self) -> RpcResult<Vec<u64>> {
// Scan the database directory for existing .db files
let mut db_indices = Vec::new();
if let Ok(entries) = std::fs::read_dir(&self.base_dir) {
for entry in entries.flatten() {
if let Some(file_name) = entry.file_name().to_str() {
if let Some(index_str) = file_name.strip_suffix(".db") {
if let Ok(index) = index_str.parse::<u64>() {
db_indices.push(index);
}
}
}
}
}
// Also include database 0 (default) even if file doesn't exist yet
if !db_indices.contains(&0) {
db_indices.push(0);
}
db_indices.sort();
Ok(db_indices)
}
async fn get_database_info(&self, db_index: u64) -> RpcResult<DatabaseInfo> {
// Check if database file exists
let db_path = std::path::PathBuf::from(&self.base_dir).join(format!("{}.db", db_index));
let file_exists = db_path.exists();
// If database doesn't exist, return an error
if !file_exists && db_index != 0 {
return Err(jsonrpsee::types::ErrorObjectOwned::owned(
-32000,
format!("Database {} does not exist", db_index),
None::<()>
));
}
// Get file metadata if it exists
let (size_on_disk, created_at) = if file_exists {
if let Ok(metadata) = std::fs::metadata(&db_path) {
let size = Some(metadata.len());
let created = metadata.created()
.unwrap_or(std::time::SystemTime::UNIX_EPOCH)
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
(size, created)
} else {
(None, 0)
}
} else {
// Database 0 might not have a file yet
(None, std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs())
};
Ok(DatabaseInfo {
id: db_index,
name: None, // Could be extended to store names
backend: BackendType::Redb,
encrypted: db_index >= 10, // Based on HeroDB's encryption rule
redis_version: Some("7.0".to_string()),
storage_path: Some(self.base_dir.clone()),
size_on_disk,
key_count: None, // Would need to open DB to count keys
created_at,
last_access: None,
})
}
async fn delete_database(&self, db_index: u64) -> RpcResult<bool> {
// Don't allow deletion of database 0 (default)
if db_index == 0 {
return Err(jsonrpsee::types::ErrorObjectOwned::owned(
-32000,
"Cannot delete default database (index 0)".to_string(),
None::<()>
));
}
let db_path = std::path::PathBuf::from(&self.base_dir).join(format!("{}.db", db_index));
if db_path.exists() {
match std::fs::remove_file(&db_path) {
Ok(_) => {
println!("Deleted database file: {}", db_path.display());
Ok(true)
}
Err(e) => {
Err(jsonrpsee::types::ErrorObjectOwned::owned(
-32000,
format!("Failed to delete database {}: {}", db_index, e),
None::<()>
))
}
}
} else {
Ok(false) // Database didn't exist
}
}
async fn get_server_stats(&self) -> RpcResult<HashMap<String, serde_json::Value>> {
let mut stats = HashMap::new();
// Get list of databases
let databases = self.list_databases().await.unwrap_or_default();
stats.insert("total_databases".to_string(), serde_json::json!(databases.len()));
stats.insert("database_indices".to_string(), serde_json::json!(databases));
stats.insert("uptime".to_string(), serde_json::json!(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
));
let server_guard = self.main_server.lock().await;
stats.insert("server_port".to_string(), serde_json::json!(server_guard.option.port));
stats.insert("data_directory".to_string(), serde_json::json!(self.base_dir));
Ok(stats)
}
}
#[jsonrpsee::core::async_trait]
impl RpcDiscoveryServer for RpcServerImpl {
async fn discover(&self) -> RpcResult<Value> {
// Parse the OpenRPC spec JSON and return it
match serde_json::from_str(OPENRPC_SPEC) {
Ok(spec) => Ok(spec),
Err(e) => Err(jsonrpsee::types::ErrorObjectOwned::owned(
-32000,
format!("Failed to parse OpenRPC specification: {}", e),
None::<()>
))
}
}
}

63
herodb/src/rpc_server.rs Normal file
View File

@@ -0,0 +1,63 @@
use std::net::SocketAddr;
use jsonrpsee::server::{ServerBuilder, ServerHandle};
use jsonrpsee::RpcModule;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::rpc::{RpcServer, RpcDiscoveryServer, RpcServerImpl};
use crate::server::Server;
/// Start the RPC server on the specified address
pub async fn start_rpc_server(
addr: SocketAddr,
main_server: Arc<Mutex<Server>>,
base_dir: String
) -> Result<ServerHandle, Box<dyn std::error::Error + Send + Sync>> {
// Create the RPC server implementation
let rpc_impl = RpcServerImpl::new(main_server, base_dir);
// Create the RPC module
let mut module = RpcModule::new(());
module.merge(RpcServer::into_rpc(rpc_impl.clone()))?;
module.merge(RpcDiscoveryServer::into_rpc(rpc_impl))?;
// Build the server with both HTTP and WebSocket support
let server = ServerBuilder::default()
.build(addr)
.await?;
// Start the server
let handle = server.start(module);
println!("RPC server started on {}", addr);
Ok(handle)
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[tokio::test]
async fn test_rpc_server_startup() {
let addr = "127.0.0.1:0".parse().unwrap(); // Use port 0 for auto-assignment
let base_dir = "/tmp/test_rpc".to_string();
let main_server = Arc::new(Mutex::new(crate::server::Server::new(crate::options::DBOption {
dir: "/tmp".to_string(),
port: 0,
debug: false,
encryption_key: None,
encrypt: false,
}).await));
let handle = start_rpc_server(addr, main_server, base_dir).await.unwrap();
// Give the server a moment to start
tokio::time::sleep(Duration::from_millis(100)).await;
// Stop the server
handle.stop().unwrap();
handle.stopped().await;
}
}

View File

@@ -167,7 +167,7 @@ impl Server {
}
pub async fn handle(
&mut self,
server: Arc<Mutex<Server>>,
mut stream: tokio::net::TcpStream,
) -> Result<(), DBError> {
// Accumulate incoming bytes to handle partial RESP frames
@@ -205,31 +205,49 @@ impl Server {
// Advance the accumulator to the unparsed remainder
acc = remaining.to_string();
if self.option.debug {
println!("\x1b[34;1mgot command: {:?}, protocol: {:?}\x1b[0m", cmd, protocol);
} else {
println!("got command: {:?}, protocol: {:?}", cmd, protocol);
}
// Check if this is a QUIT command before processing
let is_quit = matches!(cmd, Cmd::Quit);
let res = match cmd.run(self).await {
Ok(p) => p,
Err(e) => {
if self.option.debug {
eprintln!("[run error] {:?}", e);
}
Protocol::err(&format!("ERR {}", e.0))
// Lock the server only for command execution
let (res, debug_info) = {
let mut server_guard = server.lock().await;
if server_guard.option.debug {
println!("\x1b[34;1mgot command: {:?}, protocol: {:?}\x1b[0m", cmd, protocol);
} else {
println!("got command: {:?}, protocol: {:?}", cmd, protocol);
}
let res = match cmd.run(&mut server_guard).await {
Ok(p) => p,
Err(e) => {
if server_guard.option.debug {
eprintln!("[run error] {:?}", e);
}
Protocol::err(&format!("ERR {}", e.0))
}
};
let debug_info = if server_guard.option.debug {
Some((format!("queued cmd {:?}", server_guard.queued_cmd), format!("going to send response {}", res.encode())))
} else {
Some((format!("queued cmd {:?}", server_guard.queued_cmd), format!("going to send response {}", res.encode())))
};
(res, debug_info)
};
if self.option.debug {
println!("\x1b[34;1mqueued cmd {:?}\x1b[0m", self.queued_cmd);
println!("\x1b[32;1mgoing to send response {}\x1b[0m", res.encode());
} else {
print!("queued cmd {:?}", self.queued_cmd);
println!("going to send response {}", res.encode());
// Print debug info outside the lock
if let Some((queued_info, response_info)) = debug_info {
if let Some((_, response)) = response_info.split_once("going to send response ") {
if queued_info.contains("\x1b[34;1m") {
println!("\x1b[34;1m{}\x1b[0m", queued_info);
println!("\x1b[32;1mgoing to send response {}\x1b[0m", response);
} else {
println!("{}", queued_info);
println!("going to send response {}", response);
}
}
}
_ = stream.write(res.encode().as_bytes()).await?;

View File

@@ -1,4 +1,6 @@
use herodb::{server::Server, options::DBOption};
use std::sync::Arc;
use tokio::sync::Mutex;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
@@ -29,17 +31,20 @@ async fn debug_hset_simple() {
encryption_key: None,
};
let mut server = Server::new(option).await;
let server = Arc::new(Mutex::new(Server::new(option).await));
// Start server in background
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.unwrap();
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
let server_clone = Arc::clone(&server);
tokio::spawn(async move {
let _ = Server::handle(server_clone, stream).await;
});
}
}
});

View File

@@ -3,6 +3,8 @@ use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::sleep;
use std::sync::Arc;
use tokio::sync::Mutex;
#[tokio::test]
async fn debug_hset_return_value() {
@@ -20,17 +22,20 @@ async fn debug_hset_return_value() {
encryption_key: None,
};
let mut server = Server::new(option).await;
let server = Arc::new(Mutex::new(Server::new(option).await));
// Start server in background
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind("127.0.0.1:16390")
.await
.unwrap();
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
let server_clone = Arc::clone(&server);
tokio::spawn(async move {
let _ = Server::handle(server_clone, stream).await;
});
}
}
});

View File

@@ -1,21 +1,23 @@
use herodb::{server::Server, options::DBOption};
use std::sync::Arc;
use tokio::sync::Mutex;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::sleep;
// Helper function to start a test server
async fn start_test_server(test_name: &str) -> (Server, u16) {
async fn start_test_server(test_name: &str) -> (Arc<Mutex<Server>>, u16) {
use std::sync::atomic::{AtomicU16, Ordering};
static PORT_COUNTER: AtomicU16 = AtomicU16::new(16379);
let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst);
let test_dir = format!("/tmp/herodb_test_{}", test_name);
// Clean up and create test directory
let _ = std::fs::remove_dir_all(&test_dir);
std::fs::create_dir_all(&test_dir).unwrap();
let option = DBOption {
dir: test_dir,
port,
@@ -23,8 +25,8 @@ async fn start_test_server(test_name: &str) -> (Server, u16) {
encrypt: false,
encryption_key: None,
};
let server = Server::new(option).await;
let server = Arc::new(Mutex::new(Server::new(option).await));
(server, port)
}
@@ -54,7 +56,7 @@ async fn send_command(stream: &mut TcpStream, command: &str) -> String {
#[tokio::test]
async fn test_basic_ping() {
let (mut server, port) = start_test_server("ping").await;
let (server, port) = start_test_server("ping").await;
// Start server in background
tokio::spawn(async move {
@@ -64,7 +66,7 @@ async fn test_basic_ping() {
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
let _ = Server::handle(Arc::clone(&server), stream).await;
}
}
});
@@ -78,7 +80,7 @@ async fn test_basic_ping() {
#[tokio::test]
async fn test_string_operations() {
let (mut server, port) = start_test_server("string").await;
let (server, port) = start_test_server("string").await;
// Start server in background
tokio::spawn(async move {
@@ -88,7 +90,7 @@ async fn test_string_operations() {
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
let _ = Server::handle(Arc::clone(&server), stream).await;
}
}
});
@@ -120,7 +122,7 @@ async fn test_string_operations() {
#[tokio::test]
async fn test_incr_operations() {
let (mut server, port) = start_test_server("incr").await;
let (server, port) = start_test_server("incr").await;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
@@ -129,7 +131,7 @@ async fn test_incr_operations() {
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
let _ = Server::handle(Arc::clone(&server), stream).await;
}
}
});
@@ -154,7 +156,7 @@ async fn test_incr_operations() {
#[tokio::test]
async fn test_hash_operations() {
let (mut server, port) = start_test_server("hash").await;
let (server, port) = start_test_server("hash").await;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
@@ -163,7 +165,7 @@ async fn test_hash_operations() {
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
let _ = Server::handle(Arc::clone(&server), stream).await;
}
}
});
@@ -220,7 +222,7 @@ async fn test_hash_operations() {
#[tokio::test]
async fn test_expiration() {
let (mut server, port) = start_test_server("expiration").await;
let (server, port) = start_test_server("expiration").await;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
@@ -229,7 +231,7 @@ async fn test_expiration() {
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
let _ = Server::handle(Arc::clone(&server), stream).await;
}
}
});
@@ -268,7 +270,7 @@ async fn test_expiration() {
#[tokio::test]
async fn test_scan_operations() {
let (mut server, port) = start_test_server("scan").await;
let (server, port) = start_test_server("scan").await;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
@@ -277,7 +279,7 @@ async fn test_scan_operations() {
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
let _ = Server::handle(Arc::clone(&server), stream).await;
}
}
});
@@ -304,7 +306,7 @@ async fn test_scan_operations() {
#[tokio::test]
async fn test_hscan_operations() {
let (mut server, port) = start_test_server("hscan").await;
let (server, port) = start_test_server("hscan").await;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
@@ -313,7 +315,7 @@ async fn test_hscan_operations() {
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
let _ = Server::handle(Arc::clone(&server), stream).await;
}
}
});
@@ -336,7 +338,7 @@ async fn test_hscan_operations() {
#[tokio::test]
async fn test_transaction_operations() {
let (mut server, port) = start_test_server("transaction").await;
let (server, port) = start_test_server("transaction").await;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
@@ -345,7 +347,7 @@ async fn test_transaction_operations() {
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
let _ = Server::handle(Arc::clone(&server), stream).await;
}
}
});
@@ -379,7 +381,7 @@ async fn test_transaction_operations() {
#[tokio::test]
async fn test_discard_transaction() {
let (mut server, port) = start_test_server("discard").await;
let (server, port) = start_test_server("discard").await;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
@@ -388,7 +390,7 @@ async fn test_discard_transaction() {
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
let _ = Server::handle(Arc::clone(&server), stream).await;
}
}
});
@@ -416,7 +418,7 @@ async fn test_discard_transaction() {
#[tokio::test]
async fn test_type_command() {
let (mut server, port) = start_test_server("type").await;
let (server, port) = start_test_server("type").await;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
@@ -425,7 +427,7 @@ async fn test_type_command() {
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
let _ = Server::handle(Arc::clone(&server), stream).await;
}
}
});
@@ -451,7 +453,7 @@ async fn test_type_command() {
#[tokio::test]
async fn test_config_commands() {
let (mut server, port) = start_test_server("config").await;
let (server, port) = start_test_server("config").await;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
@@ -460,7 +462,7 @@ async fn test_config_commands() {
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
let _ = Server::handle(Arc::clone(&server), stream).await;
}
}
});
@@ -482,7 +484,7 @@ async fn test_config_commands() {
#[tokio::test]
async fn test_info_command() {
let (mut server, port) = start_test_server("info").await;
let (server, port) = start_test_server("info").await;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
@@ -491,7 +493,7 @@ async fn test_info_command() {
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
let _ = Server::handle(Arc::clone(&server), stream).await;
}
}
});
@@ -511,7 +513,7 @@ async fn test_info_command() {
#[tokio::test]
async fn test_error_handling() {
let (mut server, port) = start_test_server("error").await;
let (server, port) = start_test_server("error").await;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
@@ -520,7 +522,7 @@ async fn test_error_handling() {
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
let _ = Server::handle(Arc::clone(&server), stream).await;
}
}
});
@@ -549,7 +551,7 @@ async fn test_error_handling() {
#[tokio::test]
async fn test_list_operations() {
let (mut server, port) = start_test_server("list").await;
let (server, port) = start_test_server("list").await;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
@@ -558,7 +560,7 @@ async fn test_list_operations() {
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
let _ = Server::handle(Arc::clone(&server), stream).await;
}
}
});

37
herodb/tests/rpc_tests.rs Normal file
View File

@@ -0,0 +1,37 @@
use herodb::rpc::{BackendType, DatabaseConfig};
#[tokio::test]
async fn test_rpc_server_basic() {
// This test would require starting the RPC server in a separate thread
// For now, we'll just test that the types compile correctly
// Test serialization of types
let backend = BackendType::Redb;
let config = DatabaseConfig {
name: Some("test_db".to_string()),
storage_path: Some("/tmp/test".to_string()),
max_size: Some(1024 * 1024),
redis_version: Some("7.0".to_string()),
};
let backend_json = serde_json::to_string(&backend).unwrap();
let config_json = serde_json::to_string(&config).unwrap();
assert_eq!(backend_json, "\"Redb\"");
assert!(config_json.contains("test_db"));
}
#[tokio::test]
async fn test_database_config_serialization() {
let config = DatabaseConfig {
name: Some("my_db".to_string()),
storage_path: None,
max_size: Some(1000000),
redis_version: Some("7.0".to_string()),
};
let json = serde_json::to_value(&config).unwrap();
assert_eq!(json["name"], "my_db");
assert_eq!(json["max_size"], 1000000);
assert_eq!(json["redis_version"], "7.0");
}

View File

@@ -1,23 +1,25 @@
use herodb::{server::Server, options::DBOption};
use std::sync::Arc;
use tokio::sync::Mutex;
use std::time::Duration;
use tokio::time::sleep;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
// Helper function to start a test server with clean data directory
async fn start_test_server(test_name: &str) -> (Server, u16) {
async fn start_test_server(test_name: &str) -> (Arc<Mutex<Server>>, u16) {
use std::sync::atomic::{AtomicU16, Ordering};
static PORT_COUNTER: AtomicU16 = AtomicU16::new(17000);
// Get a unique port for this test
let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst);
let test_dir = format!("/tmp/herodb_test_{}", test_name);
// Clean up any existing test data
let _ = std::fs::remove_dir_all(&test_dir);
std::fs::create_dir_all(&test_dir).unwrap();
let option = DBOption {
dir: test_dir,
port,
@@ -25,8 +27,8 @@ async fn start_test_server(test_name: &str) -> (Server, u16) {
encrypt: false,
encryption_key: None,
};
let server = Server::new(option).await;
let server = Arc::new(Mutex::new(Server::new(option).await));
(server, port)
}
@@ -42,7 +44,7 @@ async fn send_redis_command(port: u16, command: &str) -> String {
#[tokio::test]
async fn test_basic_redis_functionality() {
let (mut server, port) = start_test_server("basic").await;
let (server, port) = start_test_server("basic").await;
// Start server in background with timeout
let server_handle = tokio::spawn(async move {
@@ -53,7 +55,7 @@ async fn test_basic_redis_functionality() {
// Accept only a few connections for testing
for _ in 0..10 {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
let _ = Server::handle(Arc::clone(&server), stream).await;
}
}
});
@@ -111,7 +113,7 @@ async fn test_basic_redis_functionality() {
#[tokio::test]
async fn test_hash_operations() {
let (mut server, port) = start_test_server("hash_ops").await;
let (server, port) = start_test_server("hash_ops").await;
// Start server in background with timeout
let server_handle = tokio::spawn(async move {
@@ -122,7 +124,7 @@ async fn test_hash_operations() {
// Accept only a few connections for testing
for _ in 0..5 {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
let _ = Server::handle(Arc::clone(&server), stream).await;
}
}
});
@@ -165,7 +167,7 @@ async fn test_hash_operations() {
#[tokio::test]
async fn test_transaction_operations() {
let (mut server, port) = start_test_server("transactions").await;
let (server, port) = start_test_server("transactions").await;
// Start server in background with timeout
let server_handle = tokio::spawn(async move {
@@ -176,7 +178,7 @@ async fn test_transaction_operations() {
// Accept only a few connections for testing
for _ in 0..5 {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
let _ = Server::handle(Arc::clone(&server), stream).await;
}
}
});

View File

@@ -1,21 +1,23 @@
use herodb::{server::Server, options::DBOption};
use std::sync::Arc;
use tokio::sync::Mutex;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::sleep;
// Helper function to start a test server with clean data directory
async fn start_test_server(test_name: &str) -> (Server, u16) {
async fn start_test_server(test_name: &str) -> (Arc<Mutex<Server>>, u16) {
use std::sync::atomic::{AtomicU16, Ordering};
static PORT_COUNTER: AtomicU16 = AtomicU16::new(16500);
let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst);
let test_dir = format!("/tmp/herodb_simple_test_{}", test_name);
// Clean up any existing test data
let _ = std::fs::remove_dir_all(&test_dir);
std::fs::create_dir_all(&test_dir).unwrap();
let option = DBOption {
dir: test_dir,
port,
@@ -23,8 +25,8 @@ async fn start_test_server(test_name: &str) -> (Server, u16) {
encrypt: false,
encryption_key: None,
};
let server = Server::new(option).await;
let server = Arc::new(Mutex::new(Server::new(option).await));
(server, port)
}
@@ -54,7 +56,7 @@ async fn connect_to_server(port: u16) -> TcpStream {
#[tokio::test]
async fn test_basic_ping_simple() {
let (mut server, port) = start_test_server("ping").await;
let (server, port) = start_test_server("ping").await;
// Start server in background
tokio::spawn(async move {
@@ -64,7 +66,8 @@ async fn test_basic_ping_simple() {
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
let server_clone = Arc::clone(&server);
let _ = Server::handle(server_clone, stream).await;
}
}
});
@@ -78,7 +81,7 @@ async fn test_basic_ping_simple() {
#[tokio::test]
async fn test_hset_clean_db() {
let (mut server, port) = start_test_server("hset_clean").await;
let (server, port) = start_test_server("hset_clean").await;
// Start server in background
tokio::spawn(async move {
@@ -88,7 +91,8 @@ async fn test_hset_clean_db() {
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
let server_clone = Arc::clone(&server);
let _ = Server::handle(server_clone, stream).await;
}
}
});
@@ -110,7 +114,7 @@ async fn test_hset_clean_db() {
#[tokio::test]
async fn test_type_command_simple() {
let (mut server, port) = start_test_server("type").await;
let (server, port) = start_test_server("type").await;
// Start server in background
tokio::spawn(async move {
@@ -120,7 +124,8 @@ async fn test_type_command_simple() {
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
let server_clone = Arc::clone(&server);
let _ = Server::handle(server_clone, stream).await;
}
}
});
@@ -149,7 +154,7 @@ async fn test_type_command_simple() {
#[tokio::test]
async fn test_hexists_simple() {
let (mut server, port) = start_test_server("hexists").await;
let (server, port) = start_test_server("hexists").await;
// Start server in background
tokio::spawn(async move {
@@ -159,7 +164,8 @@ async fn test_hexists_simple() {
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
let server_clone = Arc::clone(&server);
let _ = Server::handle(server_clone, stream).await;
}
}
});

View File

@@ -2,12 +2,14 @@ use herodb::{options::DBOption, server::Server};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::{sleep, Duration};
use std::sync::Arc;
use tokio::sync::Mutex;
// =========================
// Helpers
// =========================
async fn start_test_server(test_name: &str) -> (Server, u16) {
async fn start_test_server(test_name: &str) -> (Arc<Mutex<Server>>, u16) {
use std::sync::atomic::{AtomicU16, Ordering};
static PORT_COUNTER: AtomicU16 = AtomicU16::new(17100);
let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst);
@@ -24,11 +26,11 @@ async fn start_test_server(test_name: &str) -> (Server, u16) {
encryption_key: None,
};
let server = Server::new(option).await;
let server = Arc::new(Mutex::new(Server::new(option).await));
(server, port)
}
async fn spawn_listener(server: Server, port: u16) {
async fn spawn_listener(server: Arc<Mutex<Server>>, port: u16) {
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
.await
@@ -36,9 +38,9 @@ async fn spawn_listener(server: Server, port: u16) {
loop {
match listener.accept().await {
Ok((stream, _)) => {
let mut s_clone = server.clone();
let server_clone = Arc::clone(&server);
tokio::spawn(async move {
let _ = s_clone.handle(stream).await;
let _ = Server::handle(server_clone, stream).await;
});
}
Err(_e) => break,