management_rpc_server #13
926
Cargo.lock
generated
926
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -24,6 +24,12 @@ cargo build --release
|
||||
./target/release/herodb --dir /path/to/db --port 6379
|
||||
```
|
||||
|
||||
## RPC Server
|
||||
|
||||
HeroDB includes an optional JSON-RPC 2.0 management server for database administration tasks. Enable it with the `--enable-rpc` flag and specify the port with `--rpc-port` (default: 8080).
|
||||
|
||||
For a complete list of available RPC commands and usage examples, see [RPC_COMMANDS.md](RPC_COMMANDS.md).
|
||||
|
||||
### Options
|
||||
|
||||
- `--dir`: Database directory (required)
|
||||
@@ -31,6 +37,8 @@ cargo build --release
|
||||
- `--debug`: Enable debug logging
|
||||
- `--encrypt`: Enable database encryption
|
||||
- `--encryption-key`: Master encryption key for encrypted databases
|
||||
- `--enable-rpc`: Enable RPC management server
|
||||
- `--rpc-port`: RPC server port (default: 8080)
|
||||
|
||||
### Examples
|
||||
|
||||
|
93
RPC_COMMANDS.md
Normal file
93
RPC_COMMANDS.md
Normal file
@@ -0,0 +1,93 @@
|
||||
# HeroDB RPC Commands
|
||||
|
||||
HeroDB provides a JSON-RPC 2.0 interface for database management operations. The RPC server runs on a separate port (default 8080) and can be enabled with the `--enable-rpc` flag.
|
||||
|
||||
All RPC methods are prefixed with the namespace `herodb`. With the exception fo the `rpc.discover` call (using the `rpc` namespace), which returns the OpenRPC spec.
|
||||
|
||||
## Available Commands
|
||||
|
||||
### herodb_listDatabases
|
||||
Lists all database indices that exist.
|
||||
|
||||
**Example:**
|
||||
```bash
|
||||
curl -X POST http://localhost:8080 \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"jsonrpc": "2.0", "method": "herodb_listDatabases", "id": 1}'
|
||||
```
|
||||
|
||||
### herodb_createDatabase
|
||||
Creates a new database at the specified index.
|
||||
|
||||
**Parameters:**
|
||||
- `db_index` (number): Database index to create
|
||||
|
||||
**Example:**
|
||||
```bash
|
||||
curl -X POST http://localhost:8080 \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"jsonrpc": "2.0", "method": "herodb_createDatabase", "params": [1], "id": 1}'
|
||||
```
|
||||
|
||||
### herodb_getDatabaseInfo
|
||||
Retrieves detailed information about a specific database.
|
||||
|
||||
**Parameters:**
|
||||
- `db_index` (number): Database index
|
||||
|
||||
**Example:**
|
||||
```bash
|
||||
curl -X POST http://localhost:8080 \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"jsonrpc": "2.0", "method": "herodb_getDatabaseInfo", "params": [0], "id": 1}'
|
||||
```
|
||||
|
||||
### herodb_configureDatabase
|
||||
Configures an existing database with specific settings.
|
||||
|
||||
**Parameters:**
|
||||
- `db_index` (number): Database index
|
||||
- `config` (object): Configuration object
|
||||
|
||||
**Example:**
|
||||
```bash
|
||||
curl -X POST http://localhost:8080 \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"jsonrpc": "2.0", "method": "herodb_configureDatabase", "params": [0, {"name": "test", "max_size": 1048576}], "id": 1}'
|
||||
```
|
||||
|
||||
### herodb_setDatabaseEncryption
|
||||
Sets encryption for a specific database index.
|
||||
|
||||
**Parameters:**
|
||||
- `db_index` (number): Database index
|
||||
- `encryption_key` (string): Encryption key
|
||||
|
||||
**Example:**
|
||||
```bash
|
||||
curl -X POST http://localhost:8080 \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"jsonrpc": "2.0", "method": "herodb_setDatabaseEncryption", "params": [10, "my-secret-key"], "id": 1}'
|
||||
```
|
||||
|
||||
### herodb_deleteDatabase
|
||||
Deletes a database and its files.
|
||||
|
||||
**Parameters:**
|
||||
- `db_index` (number): Database index to delete
|
||||
|
||||
**Example:**
|
||||
```bash
|
||||
curl -X POST http://localhost:8080 \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"jsonrpc": "2.0", "method": "herodb_deleteDatabase", "params": [1], "id": 1}'
|
||||
```
|
||||
|
||||
### herodb_getServerStats
|
||||
Retrieves server statistics.
|
||||
|
||||
**Example:**
|
||||
```bash
|
||||
curl -X POST http://localhost:8080 \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"jsonrpc": "2.0", "method": "herodb_getServerStats", "id": 1}'
|
@@ -23,6 +23,7 @@ age = "0.10"
|
||||
secrecy = "0.8"
|
||||
ed25519-dalek = "2"
|
||||
base64 = "0.22"
|
||||
jsonrpsee = { version = "0.26", features = ["http-client", "ws-client", "server", "macros"] }
|
||||
|
||||
[dev-dependencies]
|
||||
redis = { version = "0.24", features = ["aio", "tokio-comp"] }
|
||||
|
290
herodb/docs/openrpc.json
Normal file
290
herodb/docs/openrpc.json
Normal file
@@ -0,0 +1,290 @@
|
||||
{
|
||||
"openrpc": "1.2.6",
|
||||
"info": {
|
||||
"title": "HeroDB RPC API",
|
||||
"version": "0.0.1",
|
||||
"description": "Database management API for HeroDB"
|
||||
},
|
||||
"servers": [
|
||||
{
|
||||
"name": "HeroDB Server",
|
||||
"url": "http://localhost:8080"
|
||||
}
|
||||
],
|
||||
"methods": [
|
||||
{
|
||||
"name": "herodb_configureDatabase",
|
||||
"summary": "Configure an existing database with specific settings",
|
||||
"params": [
|
||||
{
|
||||
"name": "db_index",
|
||||
"description": "Database index to configure",
|
||||
"schema": {
|
||||
"type": "integer",
|
||||
"minimum": 0
|
||||
},
|
||||
"required": true
|
||||
},
|
||||
{
|
||||
"name": "config",
|
||||
"description": "Configuration object",
|
||||
"schema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string"
|
||||
},
|
||||
"storage_path": {
|
||||
"type": "string"
|
||||
},
|
||||
"max_size": {
|
||||
"type": "integer"
|
||||
},
|
||||
"redis_version": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": true
|
||||
}
|
||||
],
|
||||
"result": {
|
||||
"name": "success",
|
||||
"schema": {
|
||||
"type": "boolean"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "herodb_createDatabase",
|
||||
"summary": "Create/pre-initialize a database at the specified index",
|
||||
"params": [
|
||||
{
|
||||
"name": "db_index",
|
||||
"description": "Database index to create",
|
||||
"schema": {
|
||||
"type": "integer",
|
||||
"minimum": 0
|
||||
},
|
||||
"required": true
|
||||
}
|
||||
],
|
||||
"result": {
|
||||
"name": "success",
|
||||
"schema": {
|
||||
"type": "boolean"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "herodb_setDatabaseEncryption",
|
||||
"summary": "Set encryption for a specific database index",
|
||||
"params": [
|
||||
{
|
||||
"name": "db_index",
|
||||
"description": "Database index",
|
||||
"schema": {
|
||||
"type": "integer",
|
||||
"minimum": 0
|
||||
},
|
||||
"required": true
|
||||
},
|
||||
{
|
||||
"name": "encryption_key",
|
||||
"description": "Encryption key (write-only)",
|
||||
"schema": {
|
||||
"type": "string"
|
||||
},
|
||||
"required": true
|
||||
}
|
||||
],
|
||||
"result": {
|
||||
"name": "success",
|
||||
"schema": {
|
||||
"type": "boolean"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "herodb_listDatabases",
|
||||
"summary": "List all database indices that exist",
|
||||
"params": [],
|
||||
"result": {
|
||||
"name": "database_indices",
|
||||
"schema": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "integer"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "herodb_getDatabaseInfo",
|
||||
"summary": "Get detailed information about a specific database",
|
||||
"params": [
|
||||
{
|
||||
"name": "db_index",
|
||||
"description": "Database index",
|
||||
"schema": {
|
||||
"type": "integer",
|
||||
"minimum": 0
|
||||
},
|
||||
"required": true
|
||||
}
|
||||
],
|
||||
"result": {
|
||||
"name": "database_info",
|
||||
"schema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"id": {
|
||||
"type": "integer"
|
||||
},
|
||||
"name": {
|
||||
"type": "string",
|
||||
"nullable": true
|
||||
},
|
||||
"backend": {
|
||||
"type": "string",
|
||||
"enum": ["Redb"]
|
||||
},
|
||||
"encrypted": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"redis_version": {
|
||||
"type": "string",
|
||||
"nullable": true
|
||||
},
|
||||
"storage_path": {
|
||||
"type": "string",
|
||||
"nullable": true
|
||||
},
|
||||
"size_on_disk": {
|
||||
"type": "integer",
|
||||
"nullable": true
|
||||
},
|
||||
"key_count": {
|
||||
"type": "integer",
|
||||
"nullable": true
|
||||
},
|
||||
"created_at": {
|
||||
"type": "integer"
|
||||
},
|
||||
"last_access": {
|
||||
"type": "integer",
|
||||
"nullable": true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "herodb_deleteDatabase",
|
||||
"summary": "Delete a database and its files",
|
||||
"params": [
|
||||
{
|
||||
"name": "db_index",
|
||||
"description": "Database index to delete",
|
||||
"schema": {
|
||||
"type": "integer",
|
||||
"minimum": 0
|
||||
},
|
||||
"required": true
|
||||
}
|
||||
],
|
||||
"result": {
|
||||
"name": "success",
|
||||
"schema": {
|
||||
"type": "boolean"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "herodb_getServerStats",
|
||||
"summary": "Get server statistics",
|
||||
"params": [],
|
||||
"result": {
|
||||
"name": "stats",
|
||||
"schema": {
|
||||
"type": "object",
|
||||
"additionalProperties": {
|
||||
"oneOf": [
|
||||
{"type": "string"},
|
||||
{"type": "integer"},
|
||||
{"type": "boolean"},
|
||||
{"type": "array"}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"components": {
|
||||
"schemas": {
|
||||
"DatabaseConfig": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string",
|
||||
"nullable": true
|
||||
},
|
||||
"storage_path": {
|
||||
"type": "string",
|
||||
"nullable": true
|
||||
},
|
||||
"max_size": {
|
||||
"type": "integer",
|
||||
"nullable": true
|
||||
},
|
||||
"redis_version": {
|
||||
"type": "string",
|
||||
"nullable": true
|
||||
}
|
||||
}
|
||||
},
|
||||
"DatabaseInfo": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"id": {
|
||||
"type": "integer"
|
||||
},
|
||||
"name": {
|
||||
"type": "string",
|
||||
"nullable": true
|
||||
},
|
||||
"backend": {
|
||||
"type": "string",
|
||||
"enum": ["Redb", "InMemory", "Custom"]
|
||||
},
|
||||
"encrypted": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"redis_version": {
|
||||
"type": "string",
|
||||
"nullable": true
|
||||
},
|
||||
"storage_path": {
|
||||
"type": "string",
|
||||
"nullable": true
|
||||
},
|
||||
"size_on_disk": {
|
||||
"type": "integer",
|
||||
"nullable": true
|
||||
},
|
||||
"key_count": {
|
||||
"type": "integer",
|
||||
"nullable": true
|
||||
},
|
||||
"created_at": {
|
||||
"type": "integer"
|
||||
},
|
||||
"last_access": {
|
||||
"type": "integer",
|
||||
"nullable": true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,8 +1,11 @@
|
||||
pub mod age; // NEW
|
||||
pub mod age;
|
||||
pub mod cmd;
|
||||
pub mod crypto;
|
||||
pub mod error;
|
||||
pub mod options;
|
||||
pub mod protocol;
|
||||
pub mod rpc;
|
||||
pub mod rpc_server;
|
||||
pub mod server;
|
||||
pub mod storage;
|
||||
pub mod openrpc_spec;
|
||||
|
@@ -1,8 +1,10 @@
|
||||
// #![allow(unused_imports)]
|
||||
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
use herodb::server;
|
||||
use herodb::server::Server;
|
||||
use herodb::rpc_server;
|
||||
|
||||
use clap::Parser;
|
||||
|
||||
@@ -30,6 +32,14 @@ struct Args {
|
||||
/// Encrypt the database
|
||||
#[arg(long)]
|
||||
encrypt: bool,
|
||||
|
||||
/// Enable RPC management server
|
||||
#[arg(long)]
|
||||
enable_rpc: bool,
|
||||
|
||||
/// RPC server port (default: 8080)
|
||||
#[arg(long, default_value = "8080")]
|
||||
rpc_port: u16,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
@@ -46,7 +56,7 @@ async fn main() {
|
||||
|
||||
// new DB option
|
||||
let option = herodb::options::DBOption {
|
||||
dir: args.dir,
|
||||
dir: args.dir.clone(),
|
||||
port,
|
||||
debug: args.debug,
|
||||
encryption_key: args.encryption_key,
|
||||
@@ -54,11 +64,30 @@ async fn main() {
|
||||
};
|
||||
|
||||
// new server
|
||||
let server = server::Server::new(option).await;
|
||||
let server = Arc::new(Mutex::new(server::Server::new(option).await));
|
||||
|
||||
// Add a small delay to ensure the port is ready
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
|
||||
// Start RPC server if enabled
|
||||
let _rpc_handle = if args.enable_rpc {
|
||||
let rpc_addr = format!("127.0.0.1:{}", args.rpc_port).parse().unwrap();
|
||||
let base_dir = args.dir.clone();
|
||||
|
||||
match rpc_server::start_rpc_server(rpc_addr, Arc::clone(&server), base_dir).await {
|
||||
Ok(handle) => {
|
||||
println!("RPC management server started on port {}", args.rpc_port);
|
||||
Some(handle)
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Failed to start RPC server: {}", e);
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// accept new connections
|
||||
loop {
|
||||
let stream = listener.accept().await;
|
||||
@@ -66,9 +95,9 @@ async fn main() {
|
||||
Ok((stream, _)) => {
|
||||
println!("accepted new connection");
|
||||
|
||||
let mut sc = server.clone();
|
||||
let sc = Arc::clone(&server);
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = sc.handle(stream).await {
|
||||
if let Err(e) = Server::handle(sc, stream).await {
|
||||
println!("error: {:?}, will close the connection. Bye", e);
|
||||
}
|
||||
});
|
||||
|
2
herodb/src/openrpc_spec.rs
Normal file
2
herodb/src/openrpc_spec.rs
Normal file
@@ -0,0 +1,2 @@
|
||||
/// The OpenRPC specification for the HeroDB JSON-RPC API
|
||||
pub const OPENRPC_SPEC: &str = include_str!("../docs/openrpc.json");
|
300
herodb/src/rpc.rs
Normal file
300
herodb/src/rpc.rs
Normal file
@@ -0,0 +1,300 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::server::Server;
|
||||
use crate::openrpc_spec::OPENRPC_SPEC;
|
||||
|
||||
/// Database backend types
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum BackendType {
|
||||
Redb,
|
||||
// Future: InMemory, Custom(String)
|
||||
}
|
||||
|
||||
/// Database configuration
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct DatabaseConfig {
|
||||
pub name: Option<String>,
|
||||
pub storage_path: Option<String>,
|
||||
pub max_size: Option<u64>,
|
||||
pub redis_version: Option<String>,
|
||||
}
|
||||
|
||||
/// Database information returned by metadata queries
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct DatabaseInfo {
|
||||
pub id: u64,
|
||||
pub name: Option<String>,
|
||||
pub backend: BackendType,
|
||||
pub encrypted: bool,
|
||||
pub redis_version: Option<String>,
|
||||
pub storage_path: Option<String>,
|
||||
pub size_on_disk: Option<u64>,
|
||||
pub key_count: Option<u64>,
|
||||
pub created_at: u64,
|
||||
pub last_access: Option<u64>,
|
||||
}
|
||||
|
||||
/// RPC trait for HeroDB management
|
||||
#[rpc(client, server, namespace = "herodb")]
|
||||
pub trait Rpc {
|
||||
/// Configure an existing database with specific settings
|
||||
#[method(name = "configureDatabase")]
|
||||
async fn configure_database(
|
||||
&self,
|
||||
db_index: u64,
|
||||
config: DatabaseConfig
|
||||
) -> RpcResult<bool>;
|
||||
|
||||
/// Create/pre-initialize a database at the specified index
|
||||
#[method(name = "createDatabase")]
|
||||
async fn create_database(&self, db_index: u64) -> RpcResult<bool>;
|
||||
|
||||
/// Set encryption for a specific database index (write-only key)
|
||||
#[method(name = "setDatabaseEncryption")]
|
||||
async fn set_database_encryption(&self, db_index: u64, encryption_key: String) -> RpcResult<bool>;
|
||||
|
||||
/// List all database indices that exist
|
||||
#[method(name = "listDatabases")]
|
||||
async fn list_databases(&self) -> RpcResult<Vec<u64>>;
|
||||
|
||||
/// Get detailed information about a specific database
|
||||
#[method(name = "getDatabaseInfo")]
|
||||
async fn get_database_info(&self, db_index: u64) -> RpcResult<DatabaseInfo>;
|
||||
|
||||
/// Delete a database and its files
|
||||
#[method(name = "deleteDatabase")]
|
||||
async fn delete_database(&self, db_index: u64) -> RpcResult<bool>;
|
||||
|
||||
/// Get server statistics
|
||||
#[method(name = "getServerStats")]
|
||||
async fn get_server_stats(&self) -> RpcResult<HashMap<String, serde_json::Value>>;
|
||||
}
|
||||
|
||||
/// RPC Discovery trait for API introspection
|
||||
#[rpc(client, server, namespace = "rpc", namespace_separator = ".")]
|
||||
pub trait RpcDiscovery {
|
||||
/// Get the OpenRPC specification for API discovery
|
||||
#[method(name = "discover")]
|
||||
async fn discover(&self) -> RpcResult<Value>;
|
||||
}
|
||||
|
||||
/// RPC Server implementation
|
||||
#[derive(Clone)]
|
||||
pub struct RpcServerImpl {
|
||||
/// Reference to the main Redis server
|
||||
main_server: Arc<Mutex<Server>>,
|
||||
/// Base directory for database files
|
||||
base_dir: String,
|
||||
}
|
||||
|
||||
impl RpcServerImpl {
|
||||
/// Create a new RPC server instance with reference to main server
|
||||
pub fn new(main_server: Arc<Mutex<Server>>, base_dir: String) -> Self {
|
||||
Self {
|
||||
main_server,
|
||||
base_dir,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[jsonrpsee::core::async_trait]
|
||||
impl RpcServer for RpcServerImpl {
|
||||
async fn configure_database(
|
||||
&self,
|
||||
db_index: u64,
|
||||
config: DatabaseConfig
|
||||
) -> RpcResult<bool> {
|
||||
// For now, configuration is mainly informational
|
||||
// In a full implementation, this could set database-specific settings
|
||||
println!("Configured database {} with settings: {:?}", db_index, config);
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
async fn create_database(&self, db_index: u64) -> RpcResult<bool> {
|
||||
// Lock the main server to create the database
|
||||
let mut server_guard = self.main_server.lock().await;
|
||||
|
||||
// Save the current selected_db to restore it later
|
||||
let original_db = server_guard.selected_db;
|
||||
|
||||
// Temporarily set the selected_db to the target database
|
||||
server_guard.selected_db = db_index;
|
||||
|
||||
// Call current_storage() which will create the database file if it doesn't exist
|
||||
match server_guard.current_storage() {
|
||||
Ok(_) => {
|
||||
println!("Successfully created database at index {}", db_index);
|
||||
|
||||
// Restore the original selected_db
|
||||
server_guard.selected_db = original_db;
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
Err(e) => {
|
||||
// Restore the original selected_db even on error
|
||||
server_guard.selected_db = original_db;
|
||||
|
||||
Err(jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
format!("Failed to create database {}: {}", db_index, e.0),
|
||||
None::<()>
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn set_database_encryption(&self, db_index: u64, _encryption_key: String) -> RpcResult<bool> {
|
||||
// Note: Encryption is determined at database creation time based on db_index
|
||||
// DB 0-9 are non-encrypted, DB 10+ are encrypted
|
||||
// This method is mainly for documentation/configuration purposes
|
||||
println!("Note: Database {} encryption is determined by index (10+ = encrypted)", db_index);
|
||||
println!("Encryption key provided but not stored (write-only policy)");
|
||||
Ok(db_index >= 10) // Return true if this DB would be encrypted
|
||||
}
|
||||
|
||||
async fn list_databases(&self) -> RpcResult<Vec<u64>> {
|
||||
// Scan the database directory for existing .db files
|
||||
let mut db_indices = Vec::new();
|
||||
|
||||
if let Ok(entries) = std::fs::read_dir(&self.base_dir) {
|
||||
for entry in entries.flatten() {
|
||||
if let Some(file_name) = entry.file_name().to_str() {
|
||||
if let Some(index_str) = file_name.strip_suffix(".db") {
|
||||
if let Ok(index) = index_str.parse::<u64>() {
|
||||
db_indices.push(index);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Also include database 0 (default) even if file doesn't exist yet
|
||||
if !db_indices.contains(&0) {
|
||||
db_indices.push(0);
|
||||
}
|
||||
|
||||
db_indices.sort();
|
||||
Ok(db_indices)
|
||||
}
|
||||
|
||||
async fn get_database_info(&self, db_index: u64) -> RpcResult<DatabaseInfo> {
|
||||
// Check if database file exists
|
||||
let db_path = std::path::PathBuf::from(&self.base_dir).join(format!("{}.db", db_index));
|
||||
let file_exists = db_path.exists();
|
||||
|
||||
// If database doesn't exist, return an error
|
||||
if !file_exists && db_index != 0 {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
format!("Database {} does not exist", db_index),
|
||||
None::<()>
|
||||
));
|
||||
}
|
||||
|
||||
// Get file metadata if it exists
|
||||
let (size_on_disk, created_at) = if file_exists {
|
||||
if let Ok(metadata) = std::fs::metadata(&db_path) {
|
||||
let size = Some(metadata.len());
|
||||
let created = metadata.created()
|
||||
.unwrap_or(std::time::SystemTime::UNIX_EPOCH)
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs();
|
||||
(size, created)
|
||||
} else {
|
||||
(None, 0)
|
||||
}
|
||||
} else {
|
||||
// Database 0 might not have a file yet
|
||||
(None, std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs())
|
||||
};
|
||||
|
||||
Ok(DatabaseInfo {
|
||||
id: db_index,
|
||||
name: None, // Could be extended to store names
|
||||
backend: BackendType::Redb,
|
||||
encrypted: db_index >= 10, // Based on HeroDB's encryption rule
|
||||
redis_version: Some("7.0".to_string()),
|
||||
storage_path: Some(self.base_dir.clone()),
|
||||
size_on_disk,
|
||||
key_count: None, // Would need to open DB to count keys
|
||||
created_at,
|
||||
last_access: None,
|
||||
})
|
||||
}
|
||||
|
||||
async fn delete_database(&self, db_index: u64) -> RpcResult<bool> {
|
||||
// Don't allow deletion of database 0 (default)
|
||||
if db_index == 0 {
|
||||
return Err(jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
"Cannot delete default database (index 0)".to_string(),
|
||||
None::<()>
|
||||
));
|
||||
}
|
||||
|
||||
let db_path = std::path::PathBuf::from(&self.base_dir).join(format!("{}.db", db_index));
|
||||
|
||||
if db_path.exists() {
|
||||
match std::fs::remove_file(&db_path) {
|
||||
Ok(_) => {
|
||||
println!("Deleted database file: {}", db_path.display());
|
||||
Ok(true)
|
||||
}
|
||||
Err(e) => {
|
||||
Err(jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
format!("Failed to delete database {}: {}", db_index, e),
|
||||
None::<()>
|
||||
))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Ok(false) // Database didn't exist
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_server_stats(&self) -> RpcResult<HashMap<String, serde_json::Value>> {
|
||||
let mut stats = HashMap::new();
|
||||
|
||||
// Get list of databases
|
||||
let databases = self.list_databases().await.unwrap_or_default();
|
||||
|
||||
stats.insert("total_databases".to_string(), serde_json::json!(databases.len()));
|
||||
stats.insert("database_indices".to_string(), serde_json::json!(databases));
|
||||
stats.insert("uptime".to_string(), serde_json::json!(
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs()
|
||||
));
|
||||
let server_guard = self.main_server.lock().await;
|
||||
stats.insert("server_port".to_string(), serde_json::json!(server_guard.option.port));
|
||||
stats.insert("data_directory".to_string(), serde_json::json!(self.base_dir));
|
||||
|
||||
Ok(stats)
|
||||
}
|
||||
}
|
||||
|
||||
#[jsonrpsee::core::async_trait]
|
||||
impl RpcDiscoveryServer for RpcServerImpl {
|
||||
async fn discover(&self) -> RpcResult<Value> {
|
||||
// Parse the OpenRPC spec JSON and return it
|
||||
match serde_json::from_str(OPENRPC_SPEC) {
|
||||
Ok(spec) => Ok(spec),
|
||||
Err(e) => Err(jsonrpsee::types::ErrorObjectOwned::owned(
|
||||
-32000,
|
||||
format!("Failed to parse OpenRPC specification: {}", e),
|
||||
None::<()>
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
63
herodb/src/rpc_server.rs
Normal file
63
herodb/src/rpc_server.rs
Normal file
@@ -0,0 +1,63 @@
|
||||
use std::net::SocketAddr;
|
||||
use jsonrpsee::server::{ServerBuilder, ServerHandle};
|
||||
use jsonrpsee::RpcModule;
|
||||
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
use crate::rpc::{RpcServer, RpcDiscoveryServer, RpcServerImpl};
|
||||
use crate::server::Server;
|
||||
|
||||
/// Start the RPC server on the specified address
|
||||
pub async fn start_rpc_server(
|
||||
addr: SocketAddr,
|
||||
main_server: Arc<Mutex<Server>>,
|
||||
base_dir: String
|
||||
) -> Result<ServerHandle, Box<dyn std::error::Error + Send + Sync>> {
|
||||
// Create the RPC server implementation
|
||||
let rpc_impl = RpcServerImpl::new(main_server, base_dir);
|
||||
|
||||
// Create the RPC module
|
||||
let mut module = RpcModule::new(());
|
||||
module.merge(RpcServer::into_rpc(rpc_impl.clone()))?;
|
||||
module.merge(RpcDiscoveryServer::into_rpc(rpc_impl))?;
|
||||
|
||||
// Build the server with both HTTP and WebSocket support
|
||||
let server = ServerBuilder::default()
|
||||
.build(addr)
|
||||
.await?;
|
||||
|
||||
// Start the server
|
||||
let handle = server.start(module);
|
||||
|
||||
println!("RPC server started on {}", addr);
|
||||
|
||||
Ok(handle)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::time::Duration;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_rpc_server_startup() {
|
||||
let addr = "127.0.0.1:0".parse().unwrap(); // Use port 0 for auto-assignment
|
||||
let base_dir = "/tmp/test_rpc".to_string();
|
||||
|
||||
let main_server = Arc::new(Mutex::new(crate::server::Server::new(crate::options::DBOption {
|
||||
dir: "/tmp".to_string(),
|
||||
port: 0,
|
||||
debug: false,
|
||||
encryption_key: None,
|
||||
encrypt: false,
|
||||
}).await));
|
||||
let handle = start_rpc_server(addr, main_server, base_dir).await.unwrap();
|
||||
|
||||
// Give the server a moment to start
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
|
||||
// Stop the server
|
||||
handle.stop().unwrap();
|
||||
handle.stopped().await;
|
||||
}
|
||||
}
|
@@ -167,7 +167,7 @@ impl Server {
|
||||
}
|
||||
|
||||
pub async fn handle(
|
||||
&mut self,
|
||||
server: Arc<Mutex<Server>>,
|
||||
mut stream: tokio::net::TcpStream,
|
||||
) -> Result<(), DBError> {
|
||||
// Accumulate incoming bytes to handle partial RESP frames
|
||||
@@ -205,31 +205,49 @@ impl Server {
|
||||
// Advance the accumulator to the unparsed remainder
|
||||
acc = remaining.to_string();
|
||||
|
||||
if self.option.debug {
|
||||
println!("\x1b[34;1mgot command: {:?}, protocol: {:?}\x1b[0m", cmd, protocol);
|
||||
} else {
|
||||
println!("got command: {:?}, protocol: {:?}", cmd, protocol);
|
||||
}
|
||||
|
||||
// Check if this is a QUIT command before processing
|
||||
let is_quit = matches!(cmd, Cmd::Quit);
|
||||
|
||||
let res = match cmd.run(self).await {
|
||||
Ok(p) => p,
|
||||
Err(e) => {
|
||||
if self.option.debug {
|
||||
eprintln!("[run error] {:?}", e);
|
||||
}
|
||||
Protocol::err(&format!("ERR {}", e.0))
|
||||
// Lock the server only for command execution
|
||||
let (res, debug_info) = {
|
||||
let mut server_guard = server.lock().await;
|
||||
|
||||
if server_guard.option.debug {
|
||||
println!("\x1b[34;1mgot command: {:?}, protocol: {:?}\x1b[0m", cmd, protocol);
|
||||
} else {
|
||||
println!("got command: {:?}, protocol: {:?}", cmd, protocol);
|
||||
}
|
||||
|
||||
let res = match cmd.run(&mut server_guard).await {
|
||||
Ok(p) => p,
|
||||
Err(e) => {
|
||||
if server_guard.option.debug {
|
||||
eprintln!("[run error] {:?}", e);
|
||||
}
|
||||
Protocol::err(&format!("ERR {}", e.0))
|
||||
}
|
||||
};
|
||||
|
||||
let debug_info = if server_guard.option.debug {
|
||||
Some((format!("queued cmd {:?}", server_guard.queued_cmd), format!("going to send response {}", res.encode())))
|
||||
} else {
|
||||
Some((format!("queued cmd {:?}", server_guard.queued_cmd), format!("going to send response {}", res.encode())))
|
||||
};
|
||||
|
||||
(res, debug_info)
|
||||
};
|
||||
|
||||
if self.option.debug {
|
||||
println!("\x1b[34;1mqueued cmd {:?}\x1b[0m", self.queued_cmd);
|
||||
println!("\x1b[32;1mgoing to send response {}\x1b[0m", res.encode());
|
||||
} else {
|
||||
print!("queued cmd {:?}", self.queued_cmd);
|
||||
println!("going to send response {}", res.encode());
|
||||
// Print debug info outside the lock
|
||||
if let Some((queued_info, response_info)) = debug_info {
|
||||
if let Some((_, response)) = response_info.split_once("going to send response ") {
|
||||
if queued_info.contains("\x1b[34;1m") {
|
||||
println!("\x1b[34;1m{}\x1b[0m", queued_info);
|
||||
println!("\x1b[32;1mgoing to send response {}\x1b[0m", response);
|
||||
} else {
|
||||
println!("{}", queued_info);
|
||||
println!("going to send response {}", response);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_ = stream.write(res.encode().as_bytes()).await?;
|
||||
|
@@ -1,4 +1,6 @@
|
||||
use herodb::{server::Server, options::DBOption};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::TcpStream;
|
||||
@@ -29,17 +31,20 @@ async fn debug_hset_simple() {
|
||||
encryption_key: None,
|
||||
};
|
||||
|
||||
let mut server = Server::new(option).await;
|
||||
|
||||
let server = Arc::new(Mutex::new(Server::new(option).await));
|
||||
|
||||
// Start server in background
|
||||
tokio::spawn(async move {
|
||||
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
||||
loop {
|
||||
if let Ok((stream, _)) = listener.accept().await {
|
||||
let _ = server.handle(stream).await;
|
||||
let server_clone = Arc::clone(&server);
|
||||
tokio::spawn(async move {
|
||||
let _ = Server::handle(server_clone, stream).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@@ -3,6 +3,8 @@ use std::time::Duration;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::sleep;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
#[tokio::test]
|
||||
async fn debug_hset_return_value() {
|
||||
@@ -20,17 +22,20 @@ async fn debug_hset_return_value() {
|
||||
encryption_key: None,
|
||||
};
|
||||
|
||||
let mut server = Server::new(option).await;
|
||||
|
||||
let server = Arc::new(Mutex::new(Server::new(option).await));
|
||||
|
||||
// Start server in background
|
||||
tokio::spawn(async move {
|
||||
let listener = tokio::net::TcpListener::bind("127.0.0.1:16390")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
||||
loop {
|
||||
if let Ok((stream, _)) = listener.accept().await {
|
||||
let _ = server.handle(stream).await;
|
||||
let server_clone = Arc::clone(&server);
|
||||
tokio::spawn(async move {
|
||||
let _ = Server::handle(server_clone, stream).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@@ -1,21 +1,23 @@
|
||||
use herodb::{server::Server, options::DBOption};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::sleep;
|
||||
|
||||
// Helper function to start a test server
|
||||
async fn start_test_server(test_name: &str) -> (Server, u16) {
|
||||
async fn start_test_server(test_name: &str) -> (Arc<Mutex<Server>>, u16) {
|
||||
use std::sync::atomic::{AtomicU16, Ordering};
|
||||
static PORT_COUNTER: AtomicU16 = AtomicU16::new(16379);
|
||||
|
||||
|
||||
let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst);
|
||||
let test_dir = format!("/tmp/herodb_test_{}", test_name);
|
||||
|
||||
|
||||
// Clean up and create test directory
|
||||
let _ = std::fs::remove_dir_all(&test_dir);
|
||||
std::fs::create_dir_all(&test_dir).unwrap();
|
||||
|
||||
|
||||
let option = DBOption {
|
||||
dir: test_dir,
|
||||
port,
|
||||
@@ -23,8 +25,8 @@ async fn start_test_server(test_name: &str) -> (Server, u16) {
|
||||
encrypt: false,
|
||||
encryption_key: None,
|
||||
};
|
||||
|
||||
let server = Server::new(option).await;
|
||||
|
||||
let server = Arc::new(Mutex::new(Server::new(option).await));
|
||||
(server, port)
|
||||
}
|
||||
|
||||
@@ -54,7 +56,7 @@ async fn send_command(stream: &mut TcpStream, command: &str) -> String {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_basic_ping() {
|
||||
let (mut server, port) = start_test_server("ping").await;
|
||||
let (server, port) = start_test_server("ping").await;
|
||||
|
||||
// Start server in background
|
||||
tokio::spawn(async move {
|
||||
@@ -64,7 +66,7 @@ async fn test_basic_ping() {
|
||||
|
||||
loop {
|
||||
if let Ok((stream, _)) = listener.accept().await {
|
||||
let _ = server.handle(stream).await;
|
||||
let _ = Server::handle(Arc::clone(&server), stream).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -78,7 +80,7 @@ async fn test_basic_ping() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_string_operations() {
|
||||
let (mut server, port) = start_test_server("string").await;
|
||||
let (server, port) = start_test_server("string").await;
|
||||
|
||||
// Start server in background
|
||||
tokio::spawn(async move {
|
||||
@@ -88,7 +90,7 @@ async fn test_string_operations() {
|
||||
|
||||
loop {
|
||||
if let Ok((stream, _)) = listener.accept().await {
|
||||
let _ = server.handle(stream).await;
|
||||
let _ = Server::handle(Arc::clone(&server), stream).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -120,7 +122,7 @@ async fn test_string_operations() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_incr_operations() {
|
||||
let (mut server, port) = start_test_server("incr").await;
|
||||
let (server, port) = start_test_server("incr").await;
|
||||
|
||||
tokio::spawn(async move {
|
||||
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
|
||||
@@ -129,7 +131,7 @@ async fn test_incr_operations() {
|
||||
|
||||
loop {
|
||||
if let Ok((stream, _)) = listener.accept().await {
|
||||
let _ = server.handle(stream).await;
|
||||
let _ = Server::handle(Arc::clone(&server), stream).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -154,7 +156,7 @@ async fn test_incr_operations() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_hash_operations() {
|
||||
let (mut server, port) = start_test_server("hash").await;
|
||||
let (server, port) = start_test_server("hash").await;
|
||||
|
||||
tokio::spawn(async move {
|
||||
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
|
||||
@@ -163,7 +165,7 @@ async fn test_hash_operations() {
|
||||
|
||||
loop {
|
||||
if let Ok((stream, _)) = listener.accept().await {
|
||||
let _ = server.handle(stream).await;
|
||||
let _ = Server::handle(Arc::clone(&server), stream).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -220,7 +222,7 @@ async fn test_hash_operations() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_expiration() {
|
||||
let (mut server, port) = start_test_server("expiration").await;
|
||||
let (server, port) = start_test_server("expiration").await;
|
||||
|
||||
tokio::spawn(async move {
|
||||
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
|
||||
@@ -229,7 +231,7 @@ async fn test_expiration() {
|
||||
|
||||
loop {
|
||||
if let Ok((stream, _)) = listener.accept().await {
|
||||
let _ = server.handle(stream).await;
|
||||
let _ = Server::handle(Arc::clone(&server), stream).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -268,7 +270,7 @@ async fn test_expiration() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_scan_operations() {
|
||||
let (mut server, port) = start_test_server("scan").await;
|
||||
let (server, port) = start_test_server("scan").await;
|
||||
|
||||
tokio::spawn(async move {
|
||||
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
|
||||
@@ -277,7 +279,7 @@ async fn test_scan_operations() {
|
||||
|
||||
loop {
|
||||
if let Ok((stream, _)) = listener.accept().await {
|
||||
let _ = server.handle(stream).await;
|
||||
let _ = Server::handle(Arc::clone(&server), stream).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -304,7 +306,7 @@ async fn test_scan_operations() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_hscan_operations() {
|
||||
let (mut server, port) = start_test_server("hscan").await;
|
||||
let (server, port) = start_test_server("hscan").await;
|
||||
|
||||
tokio::spawn(async move {
|
||||
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
|
||||
@@ -313,7 +315,7 @@ async fn test_hscan_operations() {
|
||||
|
||||
loop {
|
||||
if let Ok((stream, _)) = listener.accept().await {
|
||||
let _ = server.handle(stream).await;
|
||||
let _ = Server::handle(Arc::clone(&server), stream).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -336,7 +338,7 @@ async fn test_hscan_operations() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_transaction_operations() {
|
||||
let (mut server, port) = start_test_server("transaction").await;
|
||||
let (server, port) = start_test_server("transaction").await;
|
||||
|
||||
tokio::spawn(async move {
|
||||
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
|
||||
@@ -345,7 +347,7 @@ async fn test_transaction_operations() {
|
||||
|
||||
loop {
|
||||
if let Ok((stream, _)) = listener.accept().await {
|
||||
let _ = server.handle(stream).await;
|
||||
let _ = Server::handle(Arc::clone(&server), stream).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -379,7 +381,7 @@ async fn test_transaction_operations() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_discard_transaction() {
|
||||
let (mut server, port) = start_test_server("discard").await;
|
||||
let (server, port) = start_test_server("discard").await;
|
||||
|
||||
tokio::spawn(async move {
|
||||
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
|
||||
@@ -388,7 +390,7 @@ async fn test_discard_transaction() {
|
||||
|
||||
loop {
|
||||
if let Ok((stream, _)) = listener.accept().await {
|
||||
let _ = server.handle(stream).await;
|
||||
let _ = Server::handle(Arc::clone(&server), stream).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -416,7 +418,7 @@ async fn test_discard_transaction() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_type_command() {
|
||||
let (mut server, port) = start_test_server("type").await;
|
||||
let (server, port) = start_test_server("type").await;
|
||||
|
||||
tokio::spawn(async move {
|
||||
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
|
||||
@@ -425,7 +427,7 @@ async fn test_type_command() {
|
||||
|
||||
loop {
|
||||
if let Ok((stream, _)) = listener.accept().await {
|
||||
let _ = server.handle(stream).await;
|
||||
let _ = Server::handle(Arc::clone(&server), stream).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -451,7 +453,7 @@ async fn test_type_command() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_config_commands() {
|
||||
let (mut server, port) = start_test_server("config").await;
|
||||
let (server, port) = start_test_server("config").await;
|
||||
|
||||
tokio::spawn(async move {
|
||||
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
|
||||
@@ -460,7 +462,7 @@ async fn test_config_commands() {
|
||||
|
||||
loop {
|
||||
if let Ok((stream, _)) = listener.accept().await {
|
||||
let _ = server.handle(stream).await;
|
||||
let _ = Server::handle(Arc::clone(&server), stream).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -482,7 +484,7 @@ async fn test_config_commands() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_info_command() {
|
||||
let (mut server, port) = start_test_server("info").await;
|
||||
let (server, port) = start_test_server("info").await;
|
||||
|
||||
tokio::spawn(async move {
|
||||
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
|
||||
@@ -491,7 +493,7 @@ async fn test_info_command() {
|
||||
|
||||
loop {
|
||||
if let Ok((stream, _)) = listener.accept().await {
|
||||
let _ = server.handle(stream).await;
|
||||
let _ = Server::handle(Arc::clone(&server), stream).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -511,7 +513,7 @@ async fn test_info_command() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_error_handling() {
|
||||
let (mut server, port) = start_test_server("error").await;
|
||||
let (server, port) = start_test_server("error").await;
|
||||
|
||||
tokio::spawn(async move {
|
||||
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
|
||||
@@ -520,7 +522,7 @@ async fn test_error_handling() {
|
||||
|
||||
loop {
|
||||
if let Ok((stream, _)) = listener.accept().await {
|
||||
let _ = server.handle(stream).await;
|
||||
let _ = Server::handle(Arc::clone(&server), stream).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -549,7 +551,7 @@ async fn test_error_handling() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_list_operations() {
|
||||
let (mut server, port) = start_test_server("list").await;
|
||||
let (server, port) = start_test_server("list").await;
|
||||
|
||||
tokio::spawn(async move {
|
||||
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
|
||||
@@ -558,7 +560,7 @@ async fn test_list_operations() {
|
||||
|
||||
loop {
|
||||
if let Ok((stream, _)) = listener.accept().await {
|
||||
let _ = server.handle(stream).await;
|
||||
let _ = Server::handle(Arc::clone(&server), stream).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
37
herodb/tests/rpc_tests.rs
Normal file
37
herodb/tests/rpc_tests.rs
Normal file
@@ -0,0 +1,37 @@
|
||||
use herodb::rpc::{BackendType, DatabaseConfig};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_rpc_server_basic() {
|
||||
// This test would require starting the RPC server in a separate thread
|
||||
// For now, we'll just test that the types compile correctly
|
||||
|
||||
// Test serialization of types
|
||||
let backend = BackendType::Redb;
|
||||
let config = DatabaseConfig {
|
||||
name: Some("test_db".to_string()),
|
||||
storage_path: Some("/tmp/test".to_string()),
|
||||
max_size: Some(1024 * 1024),
|
||||
redis_version: Some("7.0".to_string()),
|
||||
};
|
||||
|
||||
let backend_json = serde_json::to_string(&backend).unwrap();
|
||||
let config_json = serde_json::to_string(&config).unwrap();
|
||||
|
||||
assert_eq!(backend_json, "\"Redb\"");
|
||||
assert!(config_json.contains("test_db"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_database_config_serialization() {
|
||||
let config = DatabaseConfig {
|
||||
name: Some("my_db".to_string()),
|
||||
storage_path: None,
|
||||
max_size: Some(1000000),
|
||||
redis_version: Some("7.0".to_string()),
|
||||
};
|
||||
|
||||
let json = serde_json::to_value(&config).unwrap();
|
||||
assert_eq!(json["name"], "my_db");
|
||||
assert_eq!(json["max_size"], 1000000);
|
||||
assert_eq!(json["redis_version"], "7.0");
|
||||
}
|
@@ -1,23 +1,25 @@
|
||||
use herodb::{server::Server, options::DBOption};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use tokio::time::sleep;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
// Helper function to start a test server with clean data directory
|
||||
async fn start_test_server(test_name: &str) -> (Server, u16) {
|
||||
async fn start_test_server(test_name: &str) -> (Arc<Mutex<Server>>, u16) {
|
||||
use std::sync::atomic::{AtomicU16, Ordering};
|
||||
static PORT_COUNTER: AtomicU16 = AtomicU16::new(17000);
|
||||
|
||||
|
||||
// Get a unique port for this test
|
||||
let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
|
||||
let test_dir = format!("/tmp/herodb_test_{}", test_name);
|
||||
|
||||
|
||||
// Clean up any existing test data
|
||||
let _ = std::fs::remove_dir_all(&test_dir);
|
||||
std::fs::create_dir_all(&test_dir).unwrap();
|
||||
|
||||
|
||||
let option = DBOption {
|
||||
dir: test_dir,
|
||||
port,
|
||||
@@ -25,8 +27,8 @@ async fn start_test_server(test_name: &str) -> (Server, u16) {
|
||||
encrypt: false,
|
||||
encryption_key: None,
|
||||
};
|
||||
|
||||
let server = Server::new(option).await;
|
||||
|
||||
let server = Arc::new(Mutex::new(Server::new(option).await));
|
||||
(server, port)
|
||||
}
|
||||
|
||||
@@ -42,7 +44,7 @@ async fn send_redis_command(port: u16, command: &str) -> String {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_basic_redis_functionality() {
|
||||
let (mut server, port) = start_test_server("basic").await;
|
||||
let (server, port) = start_test_server("basic").await;
|
||||
|
||||
// Start server in background with timeout
|
||||
let server_handle = tokio::spawn(async move {
|
||||
@@ -53,7 +55,7 @@ async fn test_basic_redis_functionality() {
|
||||
// Accept only a few connections for testing
|
||||
for _ in 0..10 {
|
||||
if let Ok((stream, _)) = listener.accept().await {
|
||||
let _ = server.handle(stream).await;
|
||||
let _ = Server::handle(Arc::clone(&server), stream).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -111,7 +113,7 @@ async fn test_basic_redis_functionality() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_hash_operations() {
|
||||
let (mut server, port) = start_test_server("hash_ops").await;
|
||||
let (server, port) = start_test_server("hash_ops").await;
|
||||
|
||||
// Start server in background with timeout
|
||||
let server_handle = tokio::spawn(async move {
|
||||
@@ -122,7 +124,7 @@ async fn test_hash_operations() {
|
||||
// Accept only a few connections for testing
|
||||
for _ in 0..5 {
|
||||
if let Ok((stream, _)) = listener.accept().await {
|
||||
let _ = server.handle(stream).await;
|
||||
let _ = Server::handle(Arc::clone(&server), stream).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -165,7 +167,7 @@ async fn test_hash_operations() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_transaction_operations() {
|
||||
let (mut server, port) = start_test_server("transactions").await;
|
||||
let (server, port) = start_test_server("transactions").await;
|
||||
|
||||
// Start server in background with timeout
|
||||
let server_handle = tokio::spawn(async move {
|
||||
@@ -176,7 +178,7 @@ async fn test_transaction_operations() {
|
||||
// Accept only a few connections for testing
|
||||
for _ in 0..5 {
|
||||
if let Ok((stream, _)) = listener.accept().await {
|
||||
let _ = server.handle(stream).await;
|
||||
let _ = Server::handle(Arc::clone(&server), stream).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@@ -1,21 +1,23 @@
|
||||
use herodb::{server::Server, options::DBOption};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::sleep;
|
||||
|
||||
// Helper function to start a test server with clean data directory
|
||||
async fn start_test_server(test_name: &str) -> (Server, u16) {
|
||||
async fn start_test_server(test_name: &str) -> (Arc<Mutex<Server>>, u16) {
|
||||
use std::sync::atomic::{AtomicU16, Ordering};
|
||||
static PORT_COUNTER: AtomicU16 = AtomicU16::new(16500);
|
||||
|
||||
|
||||
let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst);
|
||||
let test_dir = format!("/tmp/herodb_simple_test_{}", test_name);
|
||||
|
||||
|
||||
// Clean up any existing test data
|
||||
let _ = std::fs::remove_dir_all(&test_dir);
|
||||
std::fs::create_dir_all(&test_dir).unwrap();
|
||||
|
||||
|
||||
let option = DBOption {
|
||||
dir: test_dir,
|
||||
port,
|
||||
@@ -23,8 +25,8 @@ async fn start_test_server(test_name: &str) -> (Server, u16) {
|
||||
encrypt: false,
|
||||
encryption_key: None,
|
||||
};
|
||||
|
||||
let server = Server::new(option).await;
|
||||
|
||||
let server = Arc::new(Mutex::new(Server::new(option).await));
|
||||
(server, port)
|
||||
}
|
||||
|
||||
@@ -54,7 +56,7 @@ async fn connect_to_server(port: u16) -> TcpStream {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_basic_ping_simple() {
|
||||
let (mut server, port) = start_test_server("ping").await;
|
||||
let (server, port) = start_test_server("ping").await;
|
||||
|
||||
// Start server in background
|
||||
tokio::spawn(async move {
|
||||
@@ -64,7 +66,8 @@ async fn test_basic_ping_simple() {
|
||||
|
||||
loop {
|
||||
if let Ok((stream, _)) = listener.accept().await {
|
||||
let _ = server.handle(stream).await;
|
||||
let server_clone = Arc::clone(&server);
|
||||
let _ = Server::handle(server_clone, stream).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -78,7 +81,7 @@ async fn test_basic_ping_simple() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_hset_clean_db() {
|
||||
let (mut server, port) = start_test_server("hset_clean").await;
|
||||
let (server, port) = start_test_server("hset_clean").await;
|
||||
|
||||
// Start server in background
|
||||
tokio::spawn(async move {
|
||||
@@ -88,7 +91,8 @@ async fn test_hset_clean_db() {
|
||||
|
||||
loop {
|
||||
if let Ok((stream, _)) = listener.accept().await {
|
||||
let _ = server.handle(stream).await;
|
||||
let server_clone = Arc::clone(&server);
|
||||
let _ = Server::handle(server_clone, stream).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -110,7 +114,7 @@ async fn test_hset_clean_db() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_type_command_simple() {
|
||||
let (mut server, port) = start_test_server("type").await;
|
||||
let (server, port) = start_test_server("type").await;
|
||||
|
||||
// Start server in background
|
||||
tokio::spawn(async move {
|
||||
@@ -120,7 +124,8 @@ async fn test_type_command_simple() {
|
||||
|
||||
loop {
|
||||
if let Ok((stream, _)) = listener.accept().await {
|
||||
let _ = server.handle(stream).await;
|
||||
let server_clone = Arc::clone(&server);
|
||||
let _ = Server::handle(server_clone, stream).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -149,7 +154,7 @@ async fn test_type_command_simple() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_hexists_simple() {
|
||||
let (mut server, port) = start_test_server("hexists").await;
|
||||
let (server, port) = start_test_server("hexists").await;
|
||||
|
||||
// Start server in background
|
||||
tokio::spawn(async move {
|
||||
@@ -159,7 +164,8 @@ async fn test_hexists_simple() {
|
||||
|
||||
loop {
|
||||
if let Ok((stream, _)) = listener.accept().await {
|
||||
let _ = server.handle(stream).await;
|
||||
let server_clone = Arc::clone(&server);
|
||||
let _ = Server::handle(server_clone, stream).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@@ -2,12 +2,14 @@ use herodb::{options::DBOption, server::Server};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::{sleep, Duration};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
// =========================
|
||||
// Helpers
|
||||
// =========================
|
||||
|
||||
async fn start_test_server(test_name: &str) -> (Server, u16) {
|
||||
async fn start_test_server(test_name: &str) -> (Arc<Mutex<Server>>, u16) {
|
||||
use std::sync::atomic::{AtomicU16, Ordering};
|
||||
static PORT_COUNTER: AtomicU16 = AtomicU16::new(17100);
|
||||
let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst);
|
||||
@@ -24,11 +26,11 @@ async fn start_test_server(test_name: &str) -> (Server, u16) {
|
||||
encryption_key: None,
|
||||
};
|
||||
|
||||
let server = Server::new(option).await;
|
||||
let server = Arc::new(Mutex::new(Server::new(option).await));
|
||||
(server, port)
|
||||
}
|
||||
|
||||
async fn spawn_listener(server: Server, port: u16) {
|
||||
async fn spawn_listener(server: Arc<Mutex<Server>>, port: u16) {
|
||||
tokio::spawn(async move {
|
||||
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
|
||||
.await
|
||||
@@ -36,9 +38,9 @@ async fn spawn_listener(server: Server, port: u16) {
|
||||
loop {
|
||||
match listener.accept().await {
|
||||
Ok((stream, _)) => {
|
||||
let mut s_clone = server.clone();
|
||||
let server_clone = Arc::clone(&server);
|
||||
tokio::spawn(async move {
|
||||
let _ = s_clone.handle(stream).await;
|
||||
let _ = Server::handle(server_clone, stream).await;
|
||||
});
|
||||
}
|
||||
Err(_e) => break,
|
||||
|
Reference in New Issue
Block a user