Compare commits
	
		
			7 Commits
		
	
	
		
			8798bc202e
			...
			052cf2ecdb
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					052cf2ecdb | ||
| 
						 | 
					e5b844deee | ||
| 
						 | 
					764fcb68fa | ||
| 
						 | 
					271c6cb0ae | ||
| 
						 | 
					e84f7b7e3b | ||
| 
						 | 
					7e5da9c6eb | ||
| 
						 | 
					bd77a7db48 | 
							
								
								
									
										926
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										926
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							@@ -24,6 +24,12 @@ cargo build --release
 | 
			
		||||
./target/release/herodb --dir /path/to/db --port 6379
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
## RPC Server
 | 
			
		||||
 | 
			
		||||
HeroDB includes an optional JSON-RPC 2.0 management server for database administration tasks. Enable it with the `--enable-rpc` flag and specify the port with `--rpc-port` (default: 8080).
 | 
			
		||||
 | 
			
		||||
For a complete list of available RPC commands and usage examples, see [RPC_COMMANDS.md](RPC_COMMANDS.md).
 | 
			
		||||
 | 
			
		||||
### Options
 | 
			
		||||
 | 
			
		||||
- `--dir`: Database directory (required)
 | 
			
		||||
@@ -31,6 +37,8 @@ cargo build --release
 | 
			
		||||
- `--debug`: Enable debug logging
 | 
			
		||||
- `--encrypt`: Enable database encryption
 | 
			
		||||
- `--encryption-key`: Master encryption key for encrypted databases
 | 
			
		||||
- `--enable-rpc`: Enable RPC management server
 | 
			
		||||
- `--rpc-port`: RPC server port (default: 8080)
 | 
			
		||||
 | 
			
		||||
### Examples
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										93
									
								
								RPC_COMMANDS.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										93
									
								
								RPC_COMMANDS.md
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,93 @@
 | 
			
		||||
# HeroDB RPC Commands
 | 
			
		||||
 | 
			
		||||
HeroDB provides a JSON-RPC 2.0 interface for database management operations. The RPC server runs on a separate port (default 8080) and can be enabled with the `--enable-rpc` flag.
 | 
			
		||||
 | 
			
		||||
All RPC methods are prefixed with the namespace `herodb`. With the exception fo the `rpc.discover` call (using the `rpc` namespace), which returns the OpenRPC spec.
 | 
			
		||||
 | 
			
		||||
## Available Commands
 | 
			
		||||
 | 
			
		||||
### herodb_listDatabases
 | 
			
		||||
Lists all database indices that exist.
 | 
			
		||||
 | 
			
		||||
**Example:**
 | 
			
		||||
```bash
 | 
			
		||||
curl -X POST http://localhost:8080 \
 | 
			
		||||
  -H "Content-Type: application/json" \
 | 
			
		||||
  -d '{"jsonrpc": "2.0", "method": "herodb_listDatabases", "id": 1}'
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
### herodb_createDatabase
 | 
			
		||||
Creates a new database at the specified index.
 | 
			
		||||
 | 
			
		||||
**Parameters:**
 | 
			
		||||
- `db_index` (number): Database index to create
 | 
			
		||||
 | 
			
		||||
**Example:**
 | 
			
		||||
```bash
 | 
			
		||||
curl -X POST http://localhost:8080 \
 | 
			
		||||
  -H "Content-Type: application/json" \
 | 
			
		||||
  -d '{"jsonrpc": "2.0", "method": "herodb_createDatabase", "params": [1], "id": 1}'
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
### herodb_getDatabaseInfo
 | 
			
		||||
Retrieves detailed information about a specific database.
 | 
			
		||||
 | 
			
		||||
**Parameters:**
 | 
			
		||||
- `db_index` (number): Database index
 | 
			
		||||
 | 
			
		||||
**Example:**
 | 
			
		||||
```bash
 | 
			
		||||
curl -X POST http://localhost:8080 \
 | 
			
		||||
  -H "Content-Type: application/json" \
 | 
			
		||||
  -d '{"jsonrpc": "2.0", "method": "herodb_getDatabaseInfo", "params": [0], "id": 1}'
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
### herodb_configureDatabase
 | 
			
		||||
Configures an existing database with specific settings.
 | 
			
		||||
 | 
			
		||||
**Parameters:**
 | 
			
		||||
- `db_index` (number): Database index
 | 
			
		||||
- `config` (object): Configuration object
 | 
			
		||||
 | 
			
		||||
**Example:**
 | 
			
		||||
```bash
 | 
			
		||||
curl -X POST http://localhost:8080 \
 | 
			
		||||
  -H "Content-Type: application/json" \
 | 
			
		||||
  -d '{"jsonrpc": "2.0", "method": "herodb_configureDatabase", "params": [0, {"name": "test", "max_size": 1048576}], "id": 1}'
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
### herodb_setDatabaseEncryption
 | 
			
		||||
Sets encryption for a specific database index.
 | 
			
		||||
 | 
			
		||||
**Parameters:**
 | 
			
		||||
- `db_index` (number): Database index
 | 
			
		||||
- `encryption_key` (string): Encryption key
 | 
			
		||||
 | 
			
		||||
**Example:**
 | 
			
		||||
```bash
 | 
			
		||||
curl -X POST http://localhost:8080 \
 | 
			
		||||
  -H "Content-Type: application/json" \
 | 
			
		||||
  -d '{"jsonrpc": "2.0", "method": "herodb_setDatabaseEncryption", "params": [10, "my-secret-key"], "id": 1}'
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
### herodb_deleteDatabase
 | 
			
		||||
Deletes a database and its files.
 | 
			
		||||
 | 
			
		||||
**Parameters:**
 | 
			
		||||
- `db_index` (number): Database index to delete
 | 
			
		||||
 | 
			
		||||
**Example:**
 | 
			
		||||
```bash
 | 
			
		||||
curl -X POST http://localhost:8080 \
 | 
			
		||||
  -H "Content-Type: application/json" \
 | 
			
		||||
  -d '{"jsonrpc": "2.0", "method": "herodb_deleteDatabase", "params": [1], "id": 1}'
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
### herodb_getServerStats
 | 
			
		||||
Retrieves server statistics.
 | 
			
		||||
 | 
			
		||||
**Example:**
 | 
			
		||||
```bash
 | 
			
		||||
curl -X POST http://localhost:8080 \
 | 
			
		||||
  -H "Content-Type: application/json" \
 | 
			
		||||
  -d '{"jsonrpc": "2.0", "method": "herodb_getServerStats", "id": 1}'
 | 
			
		||||
@@ -23,6 +23,7 @@ age = "0.10"
 | 
			
		||||
secrecy = "0.8"
 | 
			
		||||
ed25519-dalek = "2"
 | 
			
		||||
base64 = "0.22"
 | 
			
		||||
jsonrpsee = { version = "0.26", features = ["http-client", "ws-client", "server", "macros"] }
 | 
			
		||||
 | 
			
		||||
[dev-dependencies]
 | 
			
		||||
redis = { version = "0.24", features = ["aio", "tokio-comp"] }
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										290
									
								
								herodb/docs/openrpc.json
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										290
									
								
								herodb/docs/openrpc.json
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,290 @@
 | 
			
		||||
{
 | 
			
		||||
  "openrpc": "1.2.6",
 | 
			
		||||
  "info": {
 | 
			
		||||
    "title": "HeroDB RPC API",
 | 
			
		||||
    "version": "0.0.1",
 | 
			
		||||
    "description": "Database management API for HeroDB"
 | 
			
		||||
  },
 | 
			
		||||
  "servers": [
 | 
			
		||||
    {
 | 
			
		||||
      "name": "HeroDB Server",
 | 
			
		||||
      "url": "http://localhost:8080"
 | 
			
		||||
    }
 | 
			
		||||
  ],
 | 
			
		||||
  "methods": [
 | 
			
		||||
    {
 | 
			
		||||
      "name": "herodb_configureDatabase",
 | 
			
		||||
      "summary": "Configure an existing database with specific settings",
 | 
			
		||||
      "params": [
 | 
			
		||||
        {
 | 
			
		||||
          "name": "db_index",
 | 
			
		||||
          "description": "Database index to configure",
 | 
			
		||||
          "schema": {
 | 
			
		||||
            "type": "integer",
 | 
			
		||||
            "minimum": 0
 | 
			
		||||
          },
 | 
			
		||||
          "required": true
 | 
			
		||||
        },
 | 
			
		||||
        {
 | 
			
		||||
          "name": "config",
 | 
			
		||||
          "description": "Configuration object",
 | 
			
		||||
          "schema": {
 | 
			
		||||
            "type": "object",
 | 
			
		||||
            "properties": {
 | 
			
		||||
              "name": {
 | 
			
		||||
                "type": "string"
 | 
			
		||||
              },
 | 
			
		||||
              "storage_path": {
 | 
			
		||||
                "type": "string"
 | 
			
		||||
              },
 | 
			
		||||
              "max_size": {
 | 
			
		||||
                "type": "integer"
 | 
			
		||||
              },
 | 
			
		||||
              "redis_version": {
 | 
			
		||||
                "type": "string"
 | 
			
		||||
              }
 | 
			
		||||
            }
 | 
			
		||||
          },
 | 
			
		||||
          "required": true
 | 
			
		||||
        }
 | 
			
		||||
      ],
 | 
			
		||||
      "result": {
 | 
			
		||||
        "name": "success",
 | 
			
		||||
        "schema": {
 | 
			
		||||
          "type": "boolean"
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    },
 | 
			
		||||
    {
 | 
			
		||||
      "name": "herodb_createDatabase",
 | 
			
		||||
      "summary": "Create/pre-initialize a database at the specified index",
 | 
			
		||||
      "params": [
 | 
			
		||||
        {
 | 
			
		||||
          "name": "db_index",
 | 
			
		||||
          "description": "Database index to create",
 | 
			
		||||
          "schema": {
 | 
			
		||||
            "type": "integer",
 | 
			
		||||
            "minimum": 0
 | 
			
		||||
          },
 | 
			
		||||
          "required": true
 | 
			
		||||
        }
 | 
			
		||||
      ],
 | 
			
		||||
      "result": {
 | 
			
		||||
        "name": "success",
 | 
			
		||||
        "schema": {
 | 
			
		||||
          "type": "boolean"
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    },
 | 
			
		||||
    {
 | 
			
		||||
      "name": "herodb_setDatabaseEncryption",
 | 
			
		||||
      "summary": "Set encryption for a specific database index",
 | 
			
		||||
      "params": [
 | 
			
		||||
        {
 | 
			
		||||
          "name": "db_index",
 | 
			
		||||
          "description": "Database index",
 | 
			
		||||
          "schema": {
 | 
			
		||||
            "type": "integer",
 | 
			
		||||
            "minimum": 0
 | 
			
		||||
          },
 | 
			
		||||
          "required": true
 | 
			
		||||
        },
 | 
			
		||||
        {
 | 
			
		||||
          "name": "encryption_key",
 | 
			
		||||
          "description": "Encryption key (write-only)",
 | 
			
		||||
          "schema": {
 | 
			
		||||
            "type": "string"
 | 
			
		||||
          },
 | 
			
		||||
          "required": true
 | 
			
		||||
        }
 | 
			
		||||
      ],
 | 
			
		||||
      "result": {
 | 
			
		||||
        "name": "success",
 | 
			
		||||
        "schema": {
 | 
			
		||||
          "type": "boolean"
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    },
 | 
			
		||||
    {
 | 
			
		||||
      "name": "herodb_listDatabases",
 | 
			
		||||
      "summary": "List all database indices that exist",
 | 
			
		||||
      "params": [],
 | 
			
		||||
      "result": {
 | 
			
		||||
        "name": "database_indices",
 | 
			
		||||
        "schema": {
 | 
			
		||||
          "type": "array",
 | 
			
		||||
          "items": {
 | 
			
		||||
            "type": "integer"
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    },
 | 
			
		||||
    {
 | 
			
		||||
      "name": "herodb_getDatabaseInfo",
 | 
			
		||||
      "summary": "Get detailed information about a specific database",
 | 
			
		||||
      "params": [
 | 
			
		||||
        {
 | 
			
		||||
          "name": "db_index",
 | 
			
		||||
          "description": "Database index",
 | 
			
		||||
          "schema": {
 | 
			
		||||
            "type": "integer",
 | 
			
		||||
            "minimum": 0
 | 
			
		||||
          },
 | 
			
		||||
          "required": true
 | 
			
		||||
        }
 | 
			
		||||
      ],
 | 
			
		||||
      "result": {
 | 
			
		||||
        "name": "database_info",
 | 
			
		||||
        "schema": {
 | 
			
		||||
          "type": "object",
 | 
			
		||||
          "properties": {
 | 
			
		||||
            "id": {
 | 
			
		||||
              "type": "integer"
 | 
			
		||||
            },
 | 
			
		||||
            "name": {
 | 
			
		||||
              "type": "string",
 | 
			
		||||
              "nullable": true
 | 
			
		||||
            },
 | 
			
		||||
            "backend": {
 | 
			
		||||
              "type": "string",
 | 
			
		||||
              "enum": ["Redb"]
 | 
			
		||||
            },
 | 
			
		||||
            "encrypted": {
 | 
			
		||||
              "type": "boolean"
 | 
			
		||||
            },
 | 
			
		||||
            "redis_version": {
 | 
			
		||||
              "type": "string",
 | 
			
		||||
              "nullable": true
 | 
			
		||||
            },
 | 
			
		||||
            "storage_path": {
 | 
			
		||||
              "type": "string",
 | 
			
		||||
              "nullable": true
 | 
			
		||||
            },
 | 
			
		||||
            "size_on_disk": {
 | 
			
		||||
              "type": "integer",
 | 
			
		||||
              "nullable": true
 | 
			
		||||
            },
 | 
			
		||||
            "key_count": {
 | 
			
		||||
              "type": "integer",
 | 
			
		||||
              "nullable": true
 | 
			
		||||
            },
 | 
			
		||||
            "created_at": {
 | 
			
		||||
              "type": "integer"
 | 
			
		||||
            },
 | 
			
		||||
            "last_access": {
 | 
			
		||||
              "type": "integer",
 | 
			
		||||
              "nullable": true
 | 
			
		||||
            }
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    },
 | 
			
		||||
    {
 | 
			
		||||
      "name": "herodb_deleteDatabase",
 | 
			
		||||
      "summary": "Delete a database and its files",
 | 
			
		||||
      "params": [
 | 
			
		||||
        {
 | 
			
		||||
          "name": "db_index",
 | 
			
		||||
          "description": "Database index to delete",
 | 
			
		||||
          "schema": {
 | 
			
		||||
            "type": "integer",
 | 
			
		||||
            "minimum": 0
 | 
			
		||||
          },
 | 
			
		||||
          "required": true
 | 
			
		||||
        }
 | 
			
		||||
      ],
 | 
			
		||||
      "result": {
 | 
			
		||||
        "name": "success",
 | 
			
		||||
        "schema": {
 | 
			
		||||
          "type": "boolean"
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    },
 | 
			
		||||
    {
 | 
			
		||||
      "name": "herodb_getServerStats",
 | 
			
		||||
      "summary": "Get server statistics",
 | 
			
		||||
      "params": [],
 | 
			
		||||
      "result": {
 | 
			
		||||
        "name": "stats",
 | 
			
		||||
        "schema": {
 | 
			
		||||
          "type": "object",
 | 
			
		||||
          "additionalProperties": {
 | 
			
		||||
            "oneOf": [
 | 
			
		||||
              {"type": "string"},
 | 
			
		||||
              {"type": "integer"},
 | 
			
		||||
              {"type": "boolean"},
 | 
			
		||||
              {"type": "array"}
 | 
			
		||||
            ]
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  ],
 | 
			
		||||
  "components": {
 | 
			
		||||
    "schemas": {
 | 
			
		||||
      "DatabaseConfig": {
 | 
			
		||||
        "type": "object",
 | 
			
		||||
        "properties": {
 | 
			
		||||
          "name": {
 | 
			
		||||
            "type": "string",
 | 
			
		||||
            "nullable": true
 | 
			
		||||
          },
 | 
			
		||||
          "storage_path": {
 | 
			
		||||
            "type": "string",
 | 
			
		||||
            "nullable": true
 | 
			
		||||
          },
 | 
			
		||||
          "max_size": {
 | 
			
		||||
            "type": "integer",
 | 
			
		||||
            "nullable": true
 | 
			
		||||
          },
 | 
			
		||||
          "redis_version": {
 | 
			
		||||
            "type": "string",
 | 
			
		||||
            "nullable": true
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      },
 | 
			
		||||
      "DatabaseInfo": {
 | 
			
		||||
        "type": "object",
 | 
			
		||||
        "properties": {
 | 
			
		||||
          "id": {
 | 
			
		||||
            "type": "integer"
 | 
			
		||||
          },
 | 
			
		||||
          "name": {
 | 
			
		||||
            "type": "string",
 | 
			
		||||
            "nullable": true
 | 
			
		||||
          },
 | 
			
		||||
          "backend": {
 | 
			
		||||
            "type": "string",
 | 
			
		||||
            "enum": ["Redb", "InMemory", "Custom"]
 | 
			
		||||
          },
 | 
			
		||||
          "encrypted": {
 | 
			
		||||
            "type": "boolean"
 | 
			
		||||
          },
 | 
			
		||||
          "redis_version": {
 | 
			
		||||
            "type": "string",
 | 
			
		||||
            "nullable": true
 | 
			
		||||
          },
 | 
			
		||||
          "storage_path": {
 | 
			
		||||
            "type": "string",
 | 
			
		||||
            "nullable": true
 | 
			
		||||
          },
 | 
			
		||||
          "size_on_disk": {
 | 
			
		||||
            "type": "integer",
 | 
			
		||||
            "nullable": true
 | 
			
		||||
          },
 | 
			
		||||
          "key_count": {
 | 
			
		||||
            "type": "integer",
 | 
			
		||||
            "nullable": true
 | 
			
		||||
          },
 | 
			
		||||
          "created_at": {
 | 
			
		||||
            "type": "integer"
 | 
			
		||||
          },
 | 
			
		||||
          "last_access": {
 | 
			
		||||
            "type": "integer",
 | 
			
		||||
            "nullable": true
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
@@ -1,8 +1,11 @@
 | 
			
		||||
pub mod age;   // NEW
 | 
			
		||||
pub mod age;
 | 
			
		||||
pub mod cmd;
 | 
			
		||||
pub mod crypto;
 | 
			
		||||
pub mod error;
 | 
			
		||||
pub mod options;
 | 
			
		||||
pub mod protocol;
 | 
			
		||||
pub mod rpc;
 | 
			
		||||
pub mod rpc_server;
 | 
			
		||||
pub mod server;
 | 
			
		||||
pub mod storage;
 | 
			
		||||
pub mod openrpc_spec;
 | 
			
		||||
 
 | 
			
		||||
@@ -1,8 +1,10 @@
 | 
			
		||||
// #![allow(unused_imports)]
 | 
			
		||||
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use tokio::sync::Mutex;
 | 
			
		||||
use tokio::net::TcpListener;
 | 
			
		||||
 | 
			
		||||
use herodb::server;
 | 
			
		||||
use herodb::server::Server;
 | 
			
		||||
use herodb::rpc_server;
 | 
			
		||||
 | 
			
		||||
use clap::Parser;
 | 
			
		||||
 | 
			
		||||
@@ -30,6 +32,14 @@ struct Args {
 | 
			
		||||
    /// Encrypt the database
 | 
			
		||||
    #[arg(long)]
 | 
			
		||||
    encrypt: bool,
 | 
			
		||||
 | 
			
		||||
    /// Enable RPC management server
 | 
			
		||||
    #[arg(long)]
 | 
			
		||||
    enable_rpc: bool,
 | 
			
		||||
 | 
			
		||||
    /// RPC server port (default: 8080)
 | 
			
		||||
    #[arg(long, default_value = "8080")]
 | 
			
		||||
    rpc_port: u16,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[tokio::main]
 | 
			
		||||
@@ -46,7 +56,7 @@ async fn main() {
 | 
			
		||||
 | 
			
		||||
    // new DB option
 | 
			
		||||
    let option = herodb::options::DBOption {
 | 
			
		||||
        dir: args.dir,
 | 
			
		||||
        dir: args.dir.clone(),
 | 
			
		||||
        port,
 | 
			
		||||
        debug: args.debug,
 | 
			
		||||
        encryption_key: args.encryption_key,
 | 
			
		||||
@@ -54,11 +64,30 @@ async fn main() {
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    // new server
 | 
			
		||||
    let server = server::Server::new(option).await;
 | 
			
		||||
    let server = Arc::new(Mutex::new(server::Server::new(option).await));
 | 
			
		||||
 | 
			
		||||
    // Add a small delay to ensure the port is ready
 | 
			
		||||
    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
 | 
			
		||||
 | 
			
		||||
    // Start RPC server if enabled
 | 
			
		||||
    let _rpc_handle = if args.enable_rpc {
 | 
			
		||||
        let rpc_addr = format!("127.0.0.1:{}", args.rpc_port).parse().unwrap();
 | 
			
		||||
        let base_dir = args.dir.clone();
 | 
			
		||||
 | 
			
		||||
        match rpc_server::start_rpc_server(rpc_addr, Arc::clone(&server), base_dir).await {
 | 
			
		||||
            Ok(handle) => {
 | 
			
		||||
                println!("RPC management server started on port {}", args.rpc_port);
 | 
			
		||||
                Some(handle)
 | 
			
		||||
            }
 | 
			
		||||
            Err(e) => {
 | 
			
		||||
                eprintln!("Failed to start RPC server: {}", e);
 | 
			
		||||
                None
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    } else {
 | 
			
		||||
        None
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    // accept new connections
 | 
			
		||||
    loop {
 | 
			
		||||
        let stream = listener.accept().await;
 | 
			
		||||
@@ -66,9 +95,9 @@ async fn main() {
 | 
			
		||||
            Ok((stream, _)) => {
 | 
			
		||||
                println!("accepted new connection");
 | 
			
		||||
 | 
			
		||||
                let mut sc = server.clone();
 | 
			
		||||
                let sc = Arc::clone(&server);
 | 
			
		||||
                tokio::spawn(async move {
 | 
			
		||||
                    if let Err(e) = sc.handle(stream).await {
 | 
			
		||||
                    if let Err(e) = Server::handle(sc, stream).await {
 | 
			
		||||
                        println!("error: {:?}, will close the connection. Bye", e);
 | 
			
		||||
                    }
 | 
			
		||||
                });
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										2
									
								
								herodb/src/openrpc_spec.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										2
									
								
								herodb/src/openrpc_spec.rs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,2 @@
 | 
			
		||||
/// The OpenRPC specification for the HeroDB JSON-RPC API
 | 
			
		||||
pub const OPENRPC_SPEC: &str = include_str!("../docs/openrpc.json");
 | 
			
		||||
							
								
								
									
										300
									
								
								herodb/src/rpc.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										300
									
								
								herodb/src/rpc.rs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,300 @@
 | 
			
		||||
use std::collections::HashMap;
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use tokio::sync::Mutex;
 | 
			
		||||
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
 | 
			
		||||
use serde::{Deserialize, Serialize};
 | 
			
		||||
use serde_json::Value;
 | 
			
		||||
 | 
			
		||||
use crate::server::Server;
 | 
			
		||||
use crate::openrpc_spec::OPENRPC_SPEC;
 | 
			
		||||
 | 
			
		||||
/// Database backend types
 | 
			
		||||
#[derive(Debug, Clone, Serialize, Deserialize)]
 | 
			
		||||
pub enum BackendType {
 | 
			
		||||
    Redb,
 | 
			
		||||
    // Future: InMemory, Custom(String)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Database configuration
 | 
			
		||||
#[derive(Debug, Clone, Serialize, Deserialize)]
 | 
			
		||||
pub struct DatabaseConfig {
 | 
			
		||||
    pub name: Option<String>,
 | 
			
		||||
    pub storage_path: Option<String>,
 | 
			
		||||
    pub max_size: Option<u64>,
 | 
			
		||||
    pub redis_version: Option<String>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Database information returned by metadata queries
 | 
			
		||||
#[derive(Debug, Clone, Serialize, Deserialize)]
 | 
			
		||||
pub struct DatabaseInfo {
 | 
			
		||||
    pub id: u64,
 | 
			
		||||
    pub name: Option<String>,
 | 
			
		||||
    pub backend: BackendType,
 | 
			
		||||
    pub encrypted: bool,
 | 
			
		||||
    pub redis_version: Option<String>,
 | 
			
		||||
    pub storage_path: Option<String>,
 | 
			
		||||
    pub size_on_disk: Option<u64>,
 | 
			
		||||
    pub key_count: Option<u64>,
 | 
			
		||||
    pub created_at: u64,
 | 
			
		||||
    pub last_access: Option<u64>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// RPC trait for HeroDB management
 | 
			
		||||
#[rpc(client, server, namespace = "herodb")]
 | 
			
		||||
pub trait Rpc {
 | 
			
		||||
    /// Configure an existing database with specific settings
 | 
			
		||||
    #[method(name = "configureDatabase")]
 | 
			
		||||
    async fn configure_database(
 | 
			
		||||
        &self,
 | 
			
		||||
        db_index: u64,
 | 
			
		||||
        config: DatabaseConfig
 | 
			
		||||
    ) -> RpcResult<bool>;
 | 
			
		||||
 | 
			
		||||
    /// Create/pre-initialize a database at the specified index
 | 
			
		||||
    #[method(name = "createDatabase")]
 | 
			
		||||
    async fn create_database(&self, db_index: u64) -> RpcResult<bool>;
 | 
			
		||||
 | 
			
		||||
    /// Set encryption for a specific database index (write-only key)
 | 
			
		||||
    #[method(name = "setDatabaseEncryption")]
 | 
			
		||||
    async fn set_database_encryption(&self, db_index: u64, encryption_key: String) -> RpcResult<bool>;
 | 
			
		||||
 | 
			
		||||
    /// List all database indices that exist
 | 
			
		||||
    #[method(name = "listDatabases")]
 | 
			
		||||
    async fn list_databases(&self) -> RpcResult<Vec<u64>>;
 | 
			
		||||
 | 
			
		||||
    /// Get detailed information about a specific database
 | 
			
		||||
    #[method(name = "getDatabaseInfo")]
 | 
			
		||||
    async fn get_database_info(&self, db_index: u64) -> RpcResult<DatabaseInfo>;
 | 
			
		||||
 | 
			
		||||
    /// Delete a database and its files
 | 
			
		||||
    #[method(name = "deleteDatabase")]
 | 
			
		||||
    async fn delete_database(&self, db_index: u64) -> RpcResult<bool>;
 | 
			
		||||
 | 
			
		||||
    /// Get server statistics
 | 
			
		||||
    #[method(name = "getServerStats")]
 | 
			
		||||
    async fn get_server_stats(&self) -> RpcResult<HashMap<String, serde_json::Value>>;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// RPC Discovery trait for API introspection
 | 
			
		||||
#[rpc(client, server, namespace = "rpc", namespace_separator = ".")]
 | 
			
		||||
pub trait RpcDiscovery {
 | 
			
		||||
    /// Get the OpenRPC specification for API discovery
 | 
			
		||||
    #[method(name = "discover")]
 | 
			
		||||
    async fn discover(&self) -> RpcResult<Value>;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// RPC Server implementation
 | 
			
		||||
#[derive(Clone)]
 | 
			
		||||
pub struct RpcServerImpl {
 | 
			
		||||
    /// Reference to the main Redis server
 | 
			
		||||
    main_server: Arc<Mutex<Server>>,
 | 
			
		||||
    /// Base directory for database files
 | 
			
		||||
    base_dir: String,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl RpcServerImpl {
 | 
			
		||||
    /// Create a new RPC server instance with reference to main server
 | 
			
		||||
    pub fn new(main_server: Arc<Mutex<Server>>, base_dir: String) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            main_server,
 | 
			
		||||
            base_dir,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[jsonrpsee::core::async_trait]
 | 
			
		||||
impl RpcServer for RpcServerImpl {
 | 
			
		||||
    async fn configure_database(
 | 
			
		||||
        &self,
 | 
			
		||||
        db_index: u64,
 | 
			
		||||
        config: DatabaseConfig
 | 
			
		||||
    ) -> RpcResult<bool> {
 | 
			
		||||
        // For now, configuration is mainly informational
 | 
			
		||||
        // In a full implementation, this could set database-specific settings
 | 
			
		||||
        println!("Configured database {} with settings: {:?}", db_index, config);
 | 
			
		||||
        Ok(true)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn create_database(&self, db_index: u64) -> RpcResult<bool> {
 | 
			
		||||
        // Lock the main server to create the database
 | 
			
		||||
        let mut server_guard = self.main_server.lock().await;
 | 
			
		||||
 | 
			
		||||
        // Save the current selected_db to restore it later
 | 
			
		||||
        let original_db = server_guard.selected_db;
 | 
			
		||||
 | 
			
		||||
        // Temporarily set the selected_db to the target database
 | 
			
		||||
        server_guard.selected_db = db_index;
 | 
			
		||||
 | 
			
		||||
        // Call current_storage() which will create the database file if it doesn't exist
 | 
			
		||||
        match server_guard.current_storage() {
 | 
			
		||||
            Ok(_) => {
 | 
			
		||||
                println!("Successfully created database at index {}", db_index);
 | 
			
		||||
 | 
			
		||||
                // Restore the original selected_db
 | 
			
		||||
                server_guard.selected_db = original_db;
 | 
			
		||||
 | 
			
		||||
                Ok(true)
 | 
			
		||||
            }
 | 
			
		||||
            Err(e) => {
 | 
			
		||||
                // Restore the original selected_db even on error
 | 
			
		||||
                server_guard.selected_db = original_db;
 | 
			
		||||
 | 
			
		||||
                Err(jsonrpsee::types::ErrorObjectOwned::owned(
 | 
			
		||||
                    -32000,
 | 
			
		||||
                    format!("Failed to create database {}: {}", db_index, e.0),
 | 
			
		||||
                    None::<()>
 | 
			
		||||
                ))
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn set_database_encryption(&self, db_index: u64, _encryption_key: String) -> RpcResult<bool> {
 | 
			
		||||
        // Note: Encryption is determined at database creation time based on db_index
 | 
			
		||||
        // DB 0-9 are non-encrypted, DB 10+ are encrypted
 | 
			
		||||
        // This method is mainly for documentation/configuration purposes
 | 
			
		||||
        println!("Note: Database {} encryption is determined by index (10+ = encrypted)", db_index);
 | 
			
		||||
        println!("Encryption key provided but not stored (write-only policy)");
 | 
			
		||||
        Ok(db_index >= 10) // Return true if this DB would be encrypted
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn list_databases(&self) -> RpcResult<Vec<u64>> {
 | 
			
		||||
        // Scan the database directory for existing .db files
 | 
			
		||||
        let mut db_indices = Vec::new();
 | 
			
		||||
 | 
			
		||||
        if let Ok(entries) = std::fs::read_dir(&self.base_dir) {
 | 
			
		||||
            for entry in entries.flatten() {
 | 
			
		||||
                if let Some(file_name) = entry.file_name().to_str() {
 | 
			
		||||
                    if let Some(index_str) = file_name.strip_suffix(".db") {
 | 
			
		||||
                        if let Ok(index) = index_str.parse::<u64>() {
 | 
			
		||||
                            db_indices.push(index);
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // Also include database 0 (default) even if file doesn't exist yet
 | 
			
		||||
        if !db_indices.contains(&0) {
 | 
			
		||||
            db_indices.push(0);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        db_indices.sort();
 | 
			
		||||
        Ok(db_indices)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn get_database_info(&self, db_index: u64) -> RpcResult<DatabaseInfo> {
 | 
			
		||||
        // Check if database file exists
 | 
			
		||||
        let db_path = std::path::PathBuf::from(&self.base_dir).join(format!("{}.db", db_index));
 | 
			
		||||
        let file_exists = db_path.exists();
 | 
			
		||||
 | 
			
		||||
        // If database doesn't exist, return an error
 | 
			
		||||
        if !file_exists && db_index != 0 {
 | 
			
		||||
            return Err(jsonrpsee::types::ErrorObjectOwned::owned(
 | 
			
		||||
                -32000,
 | 
			
		||||
                format!("Database {} does not exist", db_index),
 | 
			
		||||
                None::<()>
 | 
			
		||||
            ));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // Get file metadata if it exists
 | 
			
		||||
        let (size_on_disk, created_at) = if file_exists {
 | 
			
		||||
            if let Ok(metadata) = std::fs::metadata(&db_path) {
 | 
			
		||||
                let size = Some(metadata.len());
 | 
			
		||||
                let created = metadata.created()
 | 
			
		||||
                    .unwrap_or(std::time::SystemTime::UNIX_EPOCH)
 | 
			
		||||
                    .duration_since(std::time::UNIX_EPOCH)
 | 
			
		||||
                    .unwrap_or_default()
 | 
			
		||||
                    .as_secs();
 | 
			
		||||
                (size, created)
 | 
			
		||||
            } else {
 | 
			
		||||
                (None, 0)
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            // Database 0 might not have a file yet
 | 
			
		||||
            (None, std::time::SystemTime::now()
 | 
			
		||||
                .duration_since(std::time::UNIX_EPOCH)
 | 
			
		||||
                .unwrap()
 | 
			
		||||
                .as_secs())
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        Ok(DatabaseInfo {
 | 
			
		||||
            id: db_index,
 | 
			
		||||
            name: None, // Could be extended to store names
 | 
			
		||||
            backend: BackendType::Redb,
 | 
			
		||||
            encrypted: db_index >= 10, // Based on HeroDB's encryption rule
 | 
			
		||||
            redis_version: Some("7.0".to_string()),
 | 
			
		||||
            storage_path: Some(self.base_dir.clone()),
 | 
			
		||||
            size_on_disk,
 | 
			
		||||
            key_count: None, // Would need to open DB to count keys
 | 
			
		||||
            created_at,
 | 
			
		||||
            last_access: None,
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn delete_database(&self, db_index: u64) -> RpcResult<bool> {
 | 
			
		||||
        // Don't allow deletion of database 0 (default)
 | 
			
		||||
        if db_index == 0 {
 | 
			
		||||
            return Err(jsonrpsee::types::ErrorObjectOwned::owned(
 | 
			
		||||
                -32000,
 | 
			
		||||
                "Cannot delete default database (index 0)".to_string(),
 | 
			
		||||
                None::<()>
 | 
			
		||||
            ));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let db_path = std::path::PathBuf::from(&self.base_dir).join(format!("{}.db", db_index));
 | 
			
		||||
 | 
			
		||||
        if db_path.exists() {
 | 
			
		||||
            match std::fs::remove_file(&db_path) {
 | 
			
		||||
                Ok(_) => {
 | 
			
		||||
                    println!("Deleted database file: {}", db_path.display());
 | 
			
		||||
                    Ok(true)
 | 
			
		||||
                }
 | 
			
		||||
                Err(e) => {
 | 
			
		||||
                    Err(jsonrpsee::types::ErrorObjectOwned::owned(
 | 
			
		||||
                        -32000,
 | 
			
		||||
                        format!("Failed to delete database {}: {}", db_index, e),
 | 
			
		||||
                        None::<()>
 | 
			
		||||
                    ))
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        } else {
 | 
			
		||||
            Ok(false) // Database didn't exist
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    async fn get_server_stats(&self) -> RpcResult<HashMap<String, serde_json::Value>> {
 | 
			
		||||
        let mut stats = HashMap::new();
 | 
			
		||||
 | 
			
		||||
        // Get list of databases
 | 
			
		||||
        let databases = self.list_databases().await.unwrap_or_default();
 | 
			
		||||
 | 
			
		||||
        stats.insert("total_databases".to_string(), serde_json::json!(databases.len()));
 | 
			
		||||
        stats.insert("database_indices".to_string(), serde_json::json!(databases));
 | 
			
		||||
        stats.insert("uptime".to_string(), serde_json::json!(
 | 
			
		||||
            std::time::SystemTime::now()
 | 
			
		||||
                .duration_since(std::time::UNIX_EPOCH)
 | 
			
		||||
                .unwrap()
 | 
			
		||||
                .as_secs()
 | 
			
		||||
        ));
 | 
			
		||||
        let server_guard = self.main_server.lock().await;
 | 
			
		||||
        stats.insert("server_port".to_string(), serde_json::json!(server_guard.option.port));
 | 
			
		||||
        stats.insert("data_directory".to_string(), serde_json::json!(self.base_dir));
 | 
			
		||||
 | 
			
		||||
        Ok(stats)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[jsonrpsee::core::async_trait]
 | 
			
		||||
impl RpcDiscoveryServer for RpcServerImpl {
 | 
			
		||||
    async fn discover(&self) -> RpcResult<Value> {
 | 
			
		||||
        // Parse the OpenRPC spec JSON and return it
 | 
			
		||||
        match serde_json::from_str(OPENRPC_SPEC) {
 | 
			
		||||
            Ok(spec) => Ok(spec),
 | 
			
		||||
            Err(e) => Err(jsonrpsee::types::ErrorObjectOwned::owned(
 | 
			
		||||
                -32000,
 | 
			
		||||
                format!("Failed to parse OpenRPC specification: {}", e),
 | 
			
		||||
                None::<()>
 | 
			
		||||
            ))
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										63
									
								
								herodb/src/rpc_server.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										63
									
								
								herodb/src/rpc_server.rs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,63 @@
 | 
			
		||||
use std::net::SocketAddr;
 | 
			
		||||
use jsonrpsee::server::{ServerBuilder, ServerHandle};
 | 
			
		||||
use jsonrpsee::RpcModule;
 | 
			
		||||
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use tokio::sync::Mutex;
 | 
			
		||||
use crate::rpc::{RpcServer, RpcDiscoveryServer, RpcServerImpl};
 | 
			
		||||
use crate::server::Server;
 | 
			
		||||
 | 
			
		||||
/// Start the RPC server on the specified address
 | 
			
		||||
pub async fn start_rpc_server(
 | 
			
		||||
    addr: SocketAddr,
 | 
			
		||||
    main_server: Arc<Mutex<Server>>,
 | 
			
		||||
    base_dir: String
 | 
			
		||||
) -> Result<ServerHandle, Box<dyn std::error::Error + Send + Sync>> {
 | 
			
		||||
    // Create the RPC server implementation
 | 
			
		||||
    let rpc_impl = RpcServerImpl::new(main_server, base_dir);
 | 
			
		||||
 | 
			
		||||
    // Create the RPC module
 | 
			
		||||
    let mut module = RpcModule::new(());
 | 
			
		||||
    module.merge(RpcServer::into_rpc(rpc_impl.clone()))?;
 | 
			
		||||
    module.merge(RpcDiscoveryServer::into_rpc(rpc_impl))?;
 | 
			
		||||
 | 
			
		||||
    // Build the server with both HTTP and WebSocket support
 | 
			
		||||
    let server = ServerBuilder::default()
 | 
			
		||||
        .build(addr)
 | 
			
		||||
        .await?;
 | 
			
		||||
 | 
			
		||||
    // Start the server
 | 
			
		||||
    let handle = server.start(module);
 | 
			
		||||
 | 
			
		||||
    println!("RPC server started on {}", addr);
 | 
			
		||||
 | 
			
		||||
    Ok(handle)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[cfg(test)]
 | 
			
		||||
mod tests {
 | 
			
		||||
    use super::*;
 | 
			
		||||
    use std::time::Duration;
 | 
			
		||||
 | 
			
		||||
    #[tokio::test]
 | 
			
		||||
    async fn test_rpc_server_startup() {
 | 
			
		||||
        let addr = "127.0.0.1:0".parse().unwrap(); // Use port 0 for auto-assignment
 | 
			
		||||
        let base_dir = "/tmp/test_rpc".to_string();
 | 
			
		||||
 | 
			
		||||
        let main_server = Arc::new(Mutex::new(crate::server::Server::new(crate::options::DBOption {
 | 
			
		||||
            dir: "/tmp".to_string(),
 | 
			
		||||
            port: 0,
 | 
			
		||||
            debug: false,
 | 
			
		||||
            encryption_key: None,
 | 
			
		||||
            encrypt: false,
 | 
			
		||||
        }).await));
 | 
			
		||||
        let handle = start_rpc_server(addr, main_server, base_dir).await.unwrap();
 | 
			
		||||
 | 
			
		||||
        // Give the server a moment to start
 | 
			
		||||
        tokio::time::sleep(Duration::from_millis(100)).await;
 | 
			
		||||
 | 
			
		||||
        // Stop the server
 | 
			
		||||
        handle.stop().unwrap();
 | 
			
		||||
        handle.stopped().await;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@@ -167,7 +167,7 @@ impl Server {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub async fn handle(
 | 
			
		||||
        &mut self,
 | 
			
		||||
        server: Arc<Mutex<Server>>,
 | 
			
		||||
        mut stream: tokio::net::TcpStream,
 | 
			
		||||
    ) -> Result<(), DBError> {
 | 
			
		||||
        // Accumulate incoming bytes to handle partial RESP frames
 | 
			
		||||
@@ -205,31 +205,49 @@ impl Server {
 | 
			
		||||
                // Advance the accumulator to the unparsed remainder
 | 
			
		||||
                acc = remaining.to_string();
 | 
			
		||||
 | 
			
		||||
                if self.option.debug {
 | 
			
		||||
                    println!("\x1b[34;1mgot command: {:?}, protocol: {:?}\x1b[0m", cmd, protocol);
 | 
			
		||||
                } else {
 | 
			
		||||
                    println!("got command: {:?}, protocol: {:?}", cmd, protocol);
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                // Check if this is a QUIT command before processing
 | 
			
		||||
                let is_quit = matches!(cmd, Cmd::Quit);
 | 
			
		||||
 | 
			
		||||
                let res = match cmd.run(self).await {
 | 
			
		||||
                    Ok(p) => p,
 | 
			
		||||
                    Err(e) => {
 | 
			
		||||
                        if self.option.debug {
 | 
			
		||||
                            eprintln!("[run error] {:?}", e);
 | 
			
		||||
                        }
 | 
			
		||||
                        Protocol::err(&format!("ERR {}", e.0))
 | 
			
		||||
                // Lock the server only for command execution
 | 
			
		||||
                let (res, debug_info) = {
 | 
			
		||||
                    let mut server_guard = server.lock().await;
 | 
			
		||||
 | 
			
		||||
                    if server_guard.option.debug {
 | 
			
		||||
                        println!("\x1b[34;1mgot command: {:?}, protocol: {:?}\x1b[0m", cmd, protocol);
 | 
			
		||||
                    } else {
 | 
			
		||||
                        println!("got command: {:?}, protocol: {:?}", cmd, protocol);
 | 
			
		||||
                    }
 | 
			
		||||
 | 
			
		||||
                    let res = match cmd.run(&mut server_guard).await {
 | 
			
		||||
                        Ok(p) => p,
 | 
			
		||||
                        Err(e) => {
 | 
			
		||||
                            if server_guard.option.debug {
 | 
			
		||||
                                eprintln!("[run error] {:?}", e);
 | 
			
		||||
                            }
 | 
			
		||||
                            Protocol::err(&format!("ERR {}", e.0))
 | 
			
		||||
                        }
 | 
			
		||||
                    };
 | 
			
		||||
 | 
			
		||||
                    let debug_info = if server_guard.option.debug {
 | 
			
		||||
                        Some((format!("queued cmd {:?}", server_guard.queued_cmd), format!("going to send response {}", res.encode())))
 | 
			
		||||
                    } else {
 | 
			
		||||
                        Some((format!("queued cmd {:?}", server_guard.queued_cmd), format!("going to send response {}", res.encode())))
 | 
			
		||||
                    };
 | 
			
		||||
 | 
			
		||||
                    (res, debug_info)
 | 
			
		||||
                };
 | 
			
		||||
 | 
			
		||||
                if self.option.debug {
 | 
			
		||||
                    println!("\x1b[34;1mqueued cmd {:?}\x1b[0m", self.queued_cmd);
 | 
			
		||||
                    println!("\x1b[32;1mgoing to send response {}\x1b[0m", res.encode());
 | 
			
		||||
                } else {
 | 
			
		||||
                    print!("queued cmd {:?}", self.queued_cmd);
 | 
			
		||||
                    println!("going to send response {}", res.encode());
 | 
			
		||||
                // Print debug info outside the lock
 | 
			
		||||
                if let Some((queued_info, response_info)) = debug_info {
 | 
			
		||||
                    if let Some((_, response)) = response_info.split_once("going to send response ") {
 | 
			
		||||
                        if queued_info.contains("\x1b[34;1m") {
 | 
			
		||||
                            println!("\x1b[34;1m{}\x1b[0m", queued_info);
 | 
			
		||||
                            println!("\x1b[32;1mgoing to send response {}\x1b[0m", response);
 | 
			
		||||
                        } else {
 | 
			
		||||
                            println!("{}", queued_info);
 | 
			
		||||
                            println!("going to send response {}", response);
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                _ = stream.write(res.encode().as_bytes()).await?;
 | 
			
		||||
 
 | 
			
		||||
@@ -1,4 +1,6 @@
 | 
			
		||||
use herodb::{server::Server, options::DBOption};
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use tokio::sync::Mutex;
 | 
			
		||||
use std::time::Duration;
 | 
			
		||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
 | 
			
		||||
use tokio::net::TcpStream;
 | 
			
		||||
@@ -29,17 +31,20 @@ async fn debug_hset_simple() {
 | 
			
		||||
        encryption_key: None,
 | 
			
		||||
    };
 | 
			
		||||
    
 | 
			
		||||
    let mut server = Server::new(option).await;
 | 
			
		||||
    
 | 
			
		||||
    let server = Arc::new(Mutex::new(Server::new(option).await));
 | 
			
		||||
 | 
			
		||||
    // Start server in background
 | 
			
		||||
    tokio::spawn(async move {
 | 
			
		||||
        let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
 | 
			
		||||
            .await
 | 
			
		||||
            .unwrap();
 | 
			
		||||
        
 | 
			
		||||
 | 
			
		||||
        loop {
 | 
			
		||||
            if let Ok((stream, _)) = listener.accept().await {
 | 
			
		||||
                let _ = server.handle(stream).await;
 | 
			
		||||
                let server_clone = Arc::clone(&server);
 | 
			
		||||
                tokio::spawn(async move {
 | 
			
		||||
                    let _ = Server::handle(server_clone, stream).await;
 | 
			
		||||
                });
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
 
 | 
			
		||||
@@ -3,6 +3,8 @@ use std::time::Duration;
 | 
			
		||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
 | 
			
		||||
use tokio::net::TcpStream;
 | 
			
		||||
use tokio::time::sleep;
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use tokio::sync::Mutex;
 | 
			
		||||
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn debug_hset_return_value() {
 | 
			
		||||
@@ -20,17 +22,20 @@ async fn debug_hset_return_value() {
 | 
			
		||||
        encryption_key: None,
 | 
			
		||||
    };
 | 
			
		||||
    
 | 
			
		||||
    let mut server = Server::new(option).await;
 | 
			
		||||
    
 | 
			
		||||
    let server = Arc::new(Mutex::new(Server::new(option).await));
 | 
			
		||||
 | 
			
		||||
    // Start server in background
 | 
			
		||||
    tokio::spawn(async move {
 | 
			
		||||
        let listener = tokio::net::TcpListener::bind("127.0.0.1:16390")
 | 
			
		||||
            .await
 | 
			
		||||
            .unwrap();
 | 
			
		||||
        
 | 
			
		||||
 | 
			
		||||
        loop {
 | 
			
		||||
            if let Ok((stream, _)) = listener.accept().await {
 | 
			
		||||
                let _ = server.handle(stream).await;
 | 
			
		||||
                let server_clone = Arc::clone(&server);
 | 
			
		||||
                tokio::spawn(async move {
 | 
			
		||||
                    let _ = Server::handle(server_clone, stream).await;
 | 
			
		||||
                });
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
 
 | 
			
		||||
@@ -1,21 +1,23 @@
 | 
			
		||||
use herodb::{server::Server, options::DBOption};
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use tokio::sync::Mutex;
 | 
			
		||||
use std::time::Duration;
 | 
			
		||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
 | 
			
		||||
use tokio::net::TcpStream;
 | 
			
		||||
use tokio::time::sleep;
 | 
			
		||||
 | 
			
		||||
// Helper function to start a test server
 | 
			
		||||
async fn start_test_server(test_name: &str) -> (Server, u16) {
 | 
			
		||||
async fn start_test_server(test_name: &str) -> (Arc<Mutex<Server>>, u16) {
 | 
			
		||||
    use std::sync::atomic::{AtomicU16, Ordering};
 | 
			
		||||
    static PORT_COUNTER: AtomicU16 = AtomicU16::new(16379);
 | 
			
		||||
    
 | 
			
		||||
 | 
			
		||||
    let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst);
 | 
			
		||||
    let test_dir = format!("/tmp/herodb_test_{}", test_name);
 | 
			
		||||
    
 | 
			
		||||
 | 
			
		||||
    // Clean up and create test directory
 | 
			
		||||
    let _ = std::fs::remove_dir_all(&test_dir);
 | 
			
		||||
    std::fs::create_dir_all(&test_dir).unwrap();
 | 
			
		||||
    
 | 
			
		||||
 | 
			
		||||
    let option = DBOption {
 | 
			
		||||
        dir: test_dir,
 | 
			
		||||
        port,
 | 
			
		||||
@@ -23,8 +25,8 @@ async fn start_test_server(test_name: &str) -> (Server, u16) {
 | 
			
		||||
        encrypt: false,
 | 
			
		||||
        encryption_key: None,
 | 
			
		||||
    };
 | 
			
		||||
    
 | 
			
		||||
    let server = Server::new(option).await;
 | 
			
		||||
 | 
			
		||||
    let server = Arc::new(Mutex::new(Server::new(option).await));
 | 
			
		||||
    (server, port)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -54,7 +56,7 @@ async fn send_command(stream: &mut TcpStream, command: &str) -> String {
 | 
			
		||||
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn test_basic_ping() {
 | 
			
		||||
    let (mut server, port) = start_test_server("ping").await;
 | 
			
		||||
    let (server, port) = start_test_server("ping").await;
 | 
			
		||||
    
 | 
			
		||||
    // Start server in background
 | 
			
		||||
    tokio::spawn(async move {
 | 
			
		||||
@@ -64,7 +66,7 @@ async fn test_basic_ping() {
 | 
			
		||||
        
 | 
			
		||||
        loop {
 | 
			
		||||
            if let Ok((stream, _)) = listener.accept().await {
 | 
			
		||||
                let _ = server.handle(stream).await;
 | 
			
		||||
                let _ = Server::handle(Arc::clone(&server), stream).await;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
@@ -78,7 +80,7 @@ async fn test_basic_ping() {
 | 
			
		||||
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn test_string_operations() {
 | 
			
		||||
    let (mut server, port) = start_test_server("string").await;
 | 
			
		||||
    let (server, port) = start_test_server("string").await;
 | 
			
		||||
    
 | 
			
		||||
    // Start server in background
 | 
			
		||||
    tokio::spawn(async move {
 | 
			
		||||
@@ -88,7 +90,7 @@ async fn test_string_operations() {
 | 
			
		||||
        
 | 
			
		||||
        loop {
 | 
			
		||||
            if let Ok((stream, _)) = listener.accept().await {
 | 
			
		||||
                let _ = server.handle(stream).await;
 | 
			
		||||
                let _ = Server::handle(Arc::clone(&server), stream).await;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
@@ -120,7 +122,7 @@ async fn test_string_operations() {
 | 
			
		||||
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn test_incr_operations() {
 | 
			
		||||
    let (mut server, port) = start_test_server("incr").await;
 | 
			
		||||
    let (server, port) = start_test_server("incr").await;
 | 
			
		||||
    
 | 
			
		||||
    tokio::spawn(async move {
 | 
			
		||||
        let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
 | 
			
		||||
@@ -129,7 +131,7 @@ async fn test_incr_operations() {
 | 
			
		||||
        
 | 
			
		||||
        loop {
 | 
			
		||||
            if let Ok((stream, _)) = listener.accept().await {
 | 
			
		||||
                let _ = server.handle(stream).await;
 | 
			
		||||
                let _ = Server::handle(Arc::clone(&server), stream).await;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
@@ -154,7 +156,7 @@ async fn test_incr_operations() {
 | 
			
		||||
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn test_hash_operations() {
 | 
			
		||||
    let (mut server, port) = start_test_server("hash").await;
 | 
			
		||||
    let (server, port) = start_test_server("hash").await;
 | 
			
		||||
    
 | 
			
		||||
    tokio::spawn(async move {
 | 
			
		||||
        let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
 | 
			
		||||
@@ -163,7 +165,7 @@ async fn test_hash_operations() {
 | 
			
		||||
        
 | 
			
		||||
        loop {
 | 
			
		||||
            if let Ok((stream, _)) = listener.accept().await {
 | 
			
		||||
                let _ = server.handle(stream).await;
 | 
			
		||||
                let _ = Server::handle(Arc::clone(&server), stream).await;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
@@ -220,7 +222,7 @@ async fn test_hash_operations() {
 | 
			
		||||
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn test_expiration() {
 | 
			
		||||
    let (mut server, port) = start_test_server("expiration").await;
 | 
			
		||||
    let (server, port) = start_test_server("expiration").await;
 | 
			
		||||
    
 | 
			
		||||
    tokio::spawn(async move {
 | 
			
		||||
        let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
 | 
			
		||||
@@ -229,7 +231,7 @@ async fn test_expiration() {
 | 
			
		||||
        
 | 
			
		||||
        loop {
 | 
			
		||||
            if let Ok((stream, _)) = listener.accept().await {
 | 
			
		||||
                let _ = server.handle(stream).await;
 | 
			
		||||
                let _ = Server::handle(Arc::clone(&server), stream).await;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
@@ -268,7 +270,7 @@ async fn test_expiration() {
 | 
			
		||||
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn test_scan_operations() {
 | 
			
		||||
    let (mut server, port) = start_test_server("scan").await;
 | 
			
		||||
    let (server, port) = start_test_server("scan").await;
 | 
			
		||||
    
 | 
			
		||||
    tokio::spawn(async move {
 | 
			
		||||
        let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
 | 
			
		||||
@@ -277,7 +279,7 @@ async fn test_scan_operations() {
 | 
			
		||||
        
 | 
			
		||||
        loop {
 | 
			
		||||
            if let Ok((stream, _)) = listener.accept().await {
 | 
			
		||||
                let _ = server.handle(stream).await;
 | 
			
		||||
                let _ = Server::handle(Arc::clone(&server), stream).await;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
@@ -304,7 +306,7 @@ async fn test_scan_operations() {
 | 
			
		||||
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn test_hscan_operations() {
 | 
			
		||||
    let (mut server, port) = start_test_server("hscan").await;
 | 
			
		||||
    let (server, port) = start_test_server("hscan").await;
 | 
			
		||||
    
 | 
			
		||||
    tokio::spawn(async move {
 | 
			
		||||
        let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
 | 
			
		||||
@@ -313,7 +315,7 @@ async fn test_hscan_operations() {
 | 
			
		||||
        
 | 
			
		||||
        loop {
 | 
			
		||||
            if let Ok((stream, _)) = listener.accept().await {
 | 
			
		||||
                let _ = server.handle(stream).await;
 | 
			
		||||
                let _ = Server::handle(Arc::clone(&server), stream).await;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
@@ -336,7 +338,7 @@ async fn test_hscan_operations() {
 | 
			
		||||
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn test_transaction_operations() {
 | 
			
		||||
    let (mut server, port) = start_test_server("transaction").await;
 | 
			
		||||
    let (server, port) = start_test_server("transaction").await;
 | 
			
		||||
    
 | 
			
		||||
    tokio::spawn(async move {
 | 
			
		||||
        let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
 | 
			
		||||
@@ -345,7 +347,7 @@ async fn test_transaction_operations() {
 | 
			
		||||
        
 | 
			
		||||
        loop {
 | 
			
		||||
            if let Ok((stream, _)) = listener.accept().await {
 | 
			
		||||
                let _ = server.handle(stream).await;
 | 
			
		||||
                let _ = Server::handle(Arc::clone(&server), stream).await;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
@@ -379,7 +381,7 @@ async fn test_transaction_operations() {
 | 
			
		||||
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn test_discard_transaction() {
 | 
			
		||||
    let (mut server, port) = start_test_server("discard").await;
 | 
			
		||||
    let (server, port) = start_test_server("discard").await;
 | 
			
		||||
    
 | 
			
		||||
    tokio::spawn(async move {
 | 
			
		||||
        let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
 | 
			
		||||
@@ -388,7 +390,7 @@ async fn test_discard_transaction() {
 | 
			
		||||
        
 | 
			
		||||
        loop {
 | 
			
		||||
            if let Ok((stream, _)) = listener.accept().await {
 | 
			
		||||
                let _ = server.handle(stream).await;
 | 
			
		||||
                let _ = Server::handle(Arc::clone(&server), stream).await;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
@@ -416,7 +418,7 @@ async fn test_discard_transaction() {
 | 
			
		||||
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn test_type_command() {
 | 
			
		||||
    let (mut server, port) = start_test_server("type").await;
 | 
			
		||||
    let (server, port) = start_test_server("type").await;
 | 
			
		||||
    
 | 
			
		||||
    tokio::spawn(async move {
 | 
			
		||||
        let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
 | 
			
		||||
@@ -425,7 +427,7 @@ async fn test_type_command() {
 | 
			
		||||
        
 | 
			
		||||
        loop {
 | 
			
		||||
            if let Ok((stream, _)) = listener.accept().await {
 | 
			
		||||
                let _ = server.handle(stream).await;
 | 
			
		||||
                let _ = Server::handle(Arc::clone(&server), stream).await;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
@@ -451,7 +453,7 @@ async fn test_type_command() {
 | 
			
		||||
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn test_config_commands() {
 | 
			
		||||
    let (mut server, port) = start_test_server("config").await;
 | 
			
		||||
    let (server, port) = start_test_server("config").await;
 | 
			
		||||
    
 | 
			
		||||
    tokio::spawn(async move {
 | 
			
		||||
        let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
 | 
			
		||||
@@ -460,7 +462,7 @@ async fn test_config_commands() {
 | 
			
		||||
        
 | 
			
		||||
        loop {
 | 
			
		||||
            if let Ok((stream, _)) = listener.accept().await {
 | 
			
		||||
                let _ = server.handle(stream).await;
 | 
			
		||||
                let _ = Server::handle(Arc::clone(&server), stream).await;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
@@ -482,7 +484,7 @@ async fn test_config_commands() {
 | 
			
		||||
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn test_info_command() {
 | 
			
		||||
    let (mut server, port) = start_test_server("info").await;
 | 
			
		||||
    let (server, port) = start_test_server("info").await;
 | 
			
		||||
    
 | 
			
		||||
    tokio::spawn(async move {
 | 
			
		||||
        let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
 | 
			
		||||
@@ -491,7 +493,7 @@ async fn test_info_command() {
 | 
			
		||||
        
 | 
			
		||||
        loop {
 | 
			
		||||
            if let Ok((stream, _)) = listener.accept().await {
 | 
			
		||||
                let _ = server.handle(stream).await;
 | 
			
		||||
                let _ = Server::handle(Arc::clone(&server), stream).await;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
@@ -511,7 +513,7 @@ async fn test_info_command() {
 | 
			
		||||
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn test_error_handling() {
 | 
			
		||||
    let (mut server, port) = start_test_server("error").await;
 | 
			
		||||
    let (server, port) = start_test_server("error").await;
 | 
			
		||||
    
 | 
			
		||||
    tokio::spawn(async move {
 | 
			
		||||
        let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
 | 
			
		||||
@@ -520,7 +522,7 @@ async fn test_error_handling() {
 | 
			
		||||
        
 | 
			
		||||
        loop {
 | 
			
		||||
            if let Ok((stream, _)) = listener.accept().await {
 | 
			
		||||
                let _ = server.handle(stream).await;
 | 
			
		||||
                let _ = Server::handle(Arc::clone(&server), stream).await;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
@@ -549,7 +551,7 @@ async fn test_error_handling() {
 | 
			
		||||
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn test_list_operations() {
 | 
			
		||||
    let (mut server, port) = start_test_server("list").await;
 | 
			
		||||
    let (server, port) = start_test_server("list").await;
 | 
			
		||||
    
 | 
			
		||||
    tokio::spawn(async move {
 | 
			
		||||
        let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
 | 
			
		||||
@@ -558,7 +560,7 @@ async fn test_list_operations() {
 | 
			
		||||
        
 | 
			
		||||
        loop {
 | 
			
		||||
            if let Ok((stream, _)) = listener.accept().await {
 | 
			
		||||
                let _ = server.handle(stream).await;
 | 
			
		||||
                let _ = Server::handle(Arc::clone(&server), stream).await;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										37
									
								
								herodb/tests/rpc_tests.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										37
									
								
								herodb/tests/rpc_tests.rs
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,37 @@
 | 
			
		||||
use herodb::rpc::{BackendType, DatabaseConfig};
 | 
			
		||||
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn test_rpc_server_basic() {
 | 
			
		||||
    // This test would require starting the RPC server in a separate thread
 | 
			
		||||
    // For now, we'll just test that the types compile correctly
 | 
			
		||||
 | 
			
		||||
    // Test serialization of types
 | 
			
		||||
    let backend = BackendType::Redb;
 | 
			
		||||
    let config = DatabaseConfig {
 | 
			
		||||
        name: Some("test_db".to_string()),
 | 
			
		||||
        storage_path: Some("/tmp/test".to_string()),
 | 
			
		||||
        max_size: Some(1024 * 1024),
 | 
			
		||||
        redis_version: Some("7.0".to_string()),
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    let backend_json = serde_json::to_string(&backend).unwrap();
 | 
			
		||||
    let config_json = serde_json::to_string(&config).unwrap();
 | 
			
		||||
 | 
			
		||||
    assert_eq!(backend_json, "\"Redb\"");
 | 
			
		||||
    assert!(config_json.contains("test_db"));
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn test_database_config_serialization() {
 | 
			
		||||
    let config = DatabaseConfig {
 | 
			
		||||
        name: Some("my_db".to_string()),
 | 
			
		||||
        storage_path: None,
 | 
			
		||||
        max_size: Some(1000000),
 | 
			
		||||
        redis_version: Some("7.0".to_string()),
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    let json = serde_json::to_value(&config).unwrap();
 | 
			
		||||
    assert_eq!(json["name"], "my_db");
 | 
			
		||||
    assert_eq!(json["max_size"], 1000000);
 | 
			
		||||
    assert_eq!(json["redis_version"], "7.0");
 | 
			
		||||
}
 | 
			
		||||
@@ -1,23 +1,25 @@
 | 
			
		||||
use herodb::{server::Server, options::DBOption};
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use tokio::sync::Mutex;
 | 
			
		||||
use std::time::Duration;
 | 
			
		||||
use tokio::time::sleep;
 | 
			
		||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
 | 
			
		||||
use tokio::net::TcpStream;
 | 
			
		||||
 | 
			
		||||
// Helper function to start a test server with clean data directory
 | 
			
		||||
async fn start_test_server(test_name: &str) -> (Server, u16) {
 | 
			
		||||
async fn start_test_server(test_name: &str) -> (Arc<Mutex<Server>>, u16) {
 | 
			
		||||
    use std::sync::atomic::{AtomicU16, Ordering};
 | 
			
		||||
    static PORT_COUNTER: AtomicU16 = AtomicU16::new(17000);
 | 
			
		||||
    
 | 
			
		||||
 | 
			
		||||
    // Get a unique port for this test
 | 
			
		||||
    let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst);
 | 
			
		||||
    
 | 
			
		||||
 | 
			
		||||
    let test_dir = format!("/tmp/herodb_test_{}", test_name);
 | 
			
		||||
    
 | 
			
		||||
 | 
			
		||||
    // Clean up any existing test data
 | 
			
		||||
    let _ = std::fs::remove_dir_all(&test_dir);
 | 
			
		||||
    std::fs::create_dir_all(&test_dir).unwrap();
 | 
			
		||||
    
 | 
			
		||||
 | 
			
		||||
    let option = DBOption {
 | 
			
		||||
        dir: test_dir,
 | 
			
		||||
        port,
 | 
			
		||||
@@ -25,8 +27,8 @@ async fn start_test_server(test_name: &str) -> (Server, u16) {
 | 
			
		||||
        encrypt: false,
 | 
			
		||||
        encryption_key: None,
 | 
			
		||||
    };
 | 
			
		||||
    
 | 
			
		||||
    let server = Server::new(option).await;
 | 
			
		||||
 | 
			
		||||
    let server = Arc::new(Mutex::new(Server::new(option).await));
 | 
			
		||||
    (server, port)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -42,7 +44,7 @@ async fn send_redis_command(port: u16, command: &str) -> String {
 | 
			
		||||
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn test_basic_redis_functionality() {
 | 
			
		||||
    let (mut server, port) = start_test_server("basic").await;
 | 
			
		||||
    let (server, port) = start_test_server("basic").await;
 | 
			
		||||
    
 | 
			
		||||
    // Start server in background with timeout
 | 
			
		||||
    let server_handle = tokio::spawn(async move {
 | 
			
		||||
@@ -53,7 +55,7 @@ async fn test_basic_redis_functionality() {
 | 
			
		||||
        // Accept only a few connections for testing
 | 
			
		||||
        for _ in 0..10 {
 | 
			
		||||
            if let Ok((stream, _)) = listener.accept().await {
 | 
			
		||||
                let _ = server.handle(stream).await;
 | 
			
		||||
                let _ = Server::handle(Arc::clone(&server), stream).await;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
@@ -111,7 +113,7 @@ async fn test_basic_redis_functionality() {
 | 
			
		||||
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn test_hash_operations() {
 | 
			
		||||
    let (mut server, port) = start_test_server("hash_ops").await;
 | 
			
		||||
    let (server, port) = start_test_server("hash_ops").await;
 | 
			
		||||
    
 | 
			
		||||
    // Start server in background with timeout
 | 
			
		||||
    let server_handle = tokio::spawn(async move {
 | 
			
		||||
@@ -122,7 +124,7 @@ async fn test_hash_operations() {
 | 
			
		||||
        // Accept only a few connections for testing
 | 
			
		||||
        for _ in 0..5 {
 | 
			
		||||
            if let Ok((stream, _)) = listener.accept().await {
 | 
			
		||||
                let _ = server.handle(stream).await;
 | 
			
		||||
                let _ = Server::handle(Arc::clone(&server), stream).await;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
@@ -165,7 +167,7 @@ async fn test_hash_operations() {
 | 
			
		||||
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn test_transaction_operations() {
 | 
			
		||||
    let (mut server, port) = start_test_server("transactions").await;
 | 
			
		||||
    let (server, port) = start_test_server("transactions").await;
 | 
			
		||||
    
 | 
			
		||||
    // Start server in background with timeout
 | 
			
		||||
    let server_handle = tokio::spawn(async move {
 | 
			
		||||
@@ -176,7 +178,7 @@ async fn test_transaction_operations() {
 | 
			
		||||
        // Accept only a few connections for testing
 | 
			
		||||
        for _ in 0..5 {
 | 
			
		||||
            if let Ok((stream, _)) = listener.accept().await {
 | 
			
		||||
                let _ = server.handle(stream).await;
 | 
			
		||||
                let _ = Server::handle(Arc::clone(&server), stream).await;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
 
 | 
			
		||||
@@ -1,21 +1,23 @@
 | 
			
		||||
use herodb::{server::Server, options::DBOption};
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use tokio::sync::Mutex;
 | 
			
		||||
use std::time::Duration;
 | 
			
		||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
 | 
			
		||||
use tokio::net::TcpStream;
 | 
			
		||||
use tokio::time::sleep;
 | 
			
		||||
 | 
			
		||||
// Helper function to start a test server with clean data directory
 | 
			
		||||
async fn start_test_server(test_name: &str) -> (Server, u16) {
 | 
			
		||||
async fn start_test_server(test_name: &str) -> (Arc<Mutex<Server>>, u16) {
 | 
			
		||||
    use std::sync::atomic::{AtomicU16, Ordering};
 | 
			
		||||
    static PORT_COUNTER: AtomicU16 = AtomicU16::new(16500);
 | 
			
		||||
    
 | 
			
		||||
 | 
			
		||||
    let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst);
 | 
			
		||||
    let test_dir = format!("/tmp/herodb_simple_test_{}", test_name);
 | 
			
		||||
    
 | 
			
		||||
 | 
			
		||||
    // Clean up any existing test data
 | 
			
		||||
    let _ = std::fs::remove_dir_all(&test_dir);
 | 
			
		||||
    std::fs::create_dir_all(&test_dir).unwrap();
 | 
			
		||||
    
 | 
			
		||||
 | 
			
		||||
    let option = DBOption {
 | 
			
		||||
        dir: test_dir,
 | 
			
		||||
        port,
 | 
			
		||||
@@ -23,8 +25,8 @@ async fn start_test_server(test_name: &str) -> (Server, u16) {
 | 
			
		||||
        encrypt: false,
 | 
			
		||||
        encryption_key: None,
 | 
			
		||||
    };
 | 
			
		||||
    
 | 
			
		||||
    let server = Server::new(option).await;
 | 
			
		||||
 | 
			
		||||
    let server = Arc::new(Mutex::new(Server::new(option).await));
 | 
			
		||||
    (server, port)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -54,7 +56,7 @@ async fn connect_to_server(port: u16) -> TcpStream {
 | 
			
		||||
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn test_basic_ping_simple() {
 | 
			
		||||
    let (mut server, port) = start_test_server("ping").await;
 | 
			
		||||
    let (server, port) = start_test_server("ping").await;
 | 
			
		||||
    
 | 
			
		||||
    // Start server in background
 | 
			
		||||
    tokio::spawn(async move {
 | 
			
		||||
@@ -64,7 +66,8 @@ async fn test_basic_ping_simple() {
 | 
			
		||||
        
 | 
			
		||||
        loop {
 | 
			
		||||
            if let Ok((stream, _)) = listener.accept().await {
 | 
			
		||||
                let _ = server.handle(stream).await;
 | 
			
		||||
                let server_clone = Arc::clone(&server);
 | 
			
		||||
                let _ = Server::handle(server_clone, stream).await;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
@@ -78,7 +81,7 @@ async fn test_basic_ping_simple() {
 | 
			
		||||
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn test_hset_clean_db() {
 | 
			
		||||
    let (mut server, port) = start_test_server("hset_clean").await;
 | 
			
		||||
    let (server, port) = start_test_server("hset_clean").await;
 | 
			
		||||
    
 | 
			
		||||
    // Start server in background
 | 
			
		||||
    tokio::spawn(async move {
 | 
			
		||||
@@ -88,7 +91,8 @@ async fn test_hset_clean_db() {
 | 
			
		||||
        
 | 
			
		||||
        loop {
 | 
			
		||||
            if let Ok((stream, _)) = listener.accept().await {
 | 
			
		||||
                let _ = server.handle(stream).await;
 | 
			
		||||
                let server_clone = Arc::clone(&server);
 | 
			
		||||
                let _ = Server::handle(server_clone, stream).await;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
@@ -110,7 +114,7 @@ async fn test_hset_clean_db() {
 | 
			
		||||
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn test_type_command_simple() {
 | 
			
		||||
    let (mut server, port) = start_test_server("type").await;
 | 
			
		||||
    let (server, port) = start_test_server("type").await;
 | 
			
		||||
    
 | 
			
		||||
    // Start server in background
 | 
			
		||||
    tokio::spawn(async move {
 | 
			
		||||
@@ -120,7 +124,8 @@ async fn test_type_command_simple() {
 | 
			
		||||
        
 | 
			
		||||
        loop {
 | 
			
		||||
            if let Ok((stream, _)) = listener.accept().await {
 | 
			
		||||
                let _ = server.handle(stream).await;
 | 
			
		||||
                let server_clone = Arc::clone(&server);
 | 
			
		||||
                let _ = Server::handle(server_clone, stream).await;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
@@ -149,7 +154,7 @@ async fn test_type_command_simple() {
 | 
			
		||||
 | 
			
		||||
#[tokio::test]
 | 
			
		||||
async fn test_hexists_simple() {
 | 
			
		||||
    let (mut server, port) = start_test_server("hexists").await;
 | 
			
		||||
    let (server, port) = start_test_server("hexists").await;
 | 
			
		||||
    
 | 
			
		||||
    // Start server in background
 | 
			
		||||
    tokio::spawn(async move {
 | 
			
		||||
@@ -159,7 +164,8 @@ async fn test_hexists_simple() {
 | 
			
		||||
        
 | 
			
		||||
        loop {
 | 
			
		||||
            if let Ok((stream, _)) = listener.accept().await {
 | 
			
		||||
                let _ = server.handle(stream).await;
 | 
			
		||||
                let server_clone = Arc::clone(&server);
 | 
			
		||||
                let _ = Server::handle(server_clone, stream).await;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    });
 | 
			
		||||
 
 | 
			
		||||
@@ -2,12 +2,14 @@ use herodb::{options::DBOption, server::Server};
 | 
			
		||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
 | 
			
		||||
use tokio::net::TcpStream;
 | 
			
		||||
use tokio::time::{sleep, Duration};
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use tokio::sync::Mutex;
 | 
			
		||||
 | 
			
		||||
// =========================
 | 
			
		||||
// Helpers
 | 
			
		||||
// =========================
 | 
			
		||||
 | 
			
		||||
async fn start_test_server(test_name: &str) -> (Server, u16) {
 | 
			
		||||
async fn start_test_server(test_name: &str) -> (Arc<Mutex<Server>>, u16) {
 | 
			
		||||
    use std::sync::atomic::{AtomicU16, Ordering};
 | 
			
		||||
    static PORT_COUNTER: AtomicU16 = AtomicU16::new(17100);
 | 
			
		||||
    let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst);
 | 
			
		||||
@@ -24,11 +26,11 @@ async fn start_test_server(test_name: &str) -> (Server, u16) {
 | 
			
		||||
        encryption_key: None,
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    let server = Server::new(option).await;
 | 
			
		||||
    let server = Arc::new(Mutex::new(Server::new(option).await));
 | 
			
		||||
    (server, port)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
async fn spawn_listener(server: Server, port: u16) {
 | 
			
		||||
async fn spawn_listener(server: Arc<Mutex<Server>>, port: u16) {
 | 
			
		||||
    tokio::spawn(async move {
 | 
			
		||||
        let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
 | 
			
		||||
            .await
 | 
			
		||||
@@ -36,9 +38,9 @@ async fn spawn_listener(server: Server, port: u16) {
 | 
			
		||||
        loop {
 | 
			
		||||
            match listener.accept().await {
 | 
			
		||||
                Ok((stream, _)) => {
 | 
			
		||||
                    let mut s_clone = server.clone();
 | 
			
		||||
                    let server_clone = Arc::clone(&server);
 | 
			
		||||
                    tokio::spawn(async move {
 | 
			
		||||
                        let _ = s_clone.handle(stream).await;
 | 
			
		||||
                        let _ = Server::handle(server_clone, stream).await;
 | 
			
		||||
                    });
 | 
			
		||||
                }
 | 
			
		||||
                Err(_e) => break,
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user