diff --git a/herodb/src/main.rs b/herodb/src/main.rs index 3a3be12..2e1c3f7 100644 --- a/herodb/src/main.rs +++ b/herodb/src/main.rs @@ -5,6 +5,7 @@ use tokio::sync::Mutex; use tokio::net::TcpListener; use herodb::server; +use herodb::server::Server; use herodb::rpc_server; use clap::Parser; @@ -98,8 +99,7 @@ async fn main() { let sc = Arc::clone(&server); tokio::spawn(async move { - let mut server_guard = sc.lock().await; - if let Err(e) = server_guard.handle(stream).await { + if let Err(e) = Server::handle(sc, stream).await { println!("error: {:?}, will close the connection. Bye", e); } }); diff --git a/herodb/src/rpc.rs b/herodb/src/rpc.rs index 1ab27e3..fef642e 100644 --- a/herodb/src/rpc.rs +++ b/herodb/src/rpc.rs @@ -177,11 +177,34 @@ impl RpcServer for RpcServerImpl { let db_path = std::path::PathBuf::from(&self.base_dir).join(format!("{}.db", db_index)); let file_exists = db_path.exists(); - // Get file size if it exists - let size_on_disk = if file_exists { - std::fs::metadata(&db_path).ok().map(|m| m.len()) + // If database doesn't exist, return an error + if !file_exists && db_index != 0 { + return Err(jsonrpsee::types::ErrorObjectOwned::owned( + -32000, + format!("Database {} does not exist", db_index), + None::<()> + )); + } + + // Get file metadata if it exists + let (size_on_disk, created_at) = if file_exists { + if let Ok(metadata) = std::fs::metadata(&db_path) { + let size = Some(metadata.len()); + let created = metadata.created() + .unwrap_or(std::time::SystemTime::UNIX_EPOCH) + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + (size, created) + } else { + (None, 0) + } } else { - None + // Database 0 might not have a file yet + (None, std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs()) }; Ok(DatabaseInfo { @@ -193,10 +216,7 @@ impl RpcServer for RpcServerImpl { storage_path: Some(self.base_dir.clone()), size_on_disk, key_count: None, // Would need to open DB to count keys - created_at: std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_secs(), + created_at, last_access: None, }) } diff --git a/herodb/src/server.rs b/herodb/src/server.rs index 0c128c6..57aac94 100644 --- a/herodb/src/server.rs +++ b/herodb/src/server.rs @@ -167,7 +167,7 @@ impl Server { } pub async fn handle( - &mut self, + server: Arc>, mut stream: tokio::net::TcpStream, ) -> Result<(), DBError> { // Accumulate incoming bytes to handle partial RESP frames @@ -205,31 +205,49 @@ impl Server { // Advance the accumulator to the unparsed remainder acc = remaining.to_string(); - if self.option.debug { - println!("\x1b[34;1mgot command: {:?}, protocol: {:?}\x1b[0m", cmd, protocol); - } else { - println!("got command: {:?}, protocol: {:?}", cmd, protocol); - } - // Check if this is a QUIT command before processing let is_quit = matches!(cmd, Cmd::Quit); - let res = match cmd.run(self).await { - Ok(p) => p, - Err(e) => { - if self.option.debug { - eprintln!("[run error] {:?}", e); - } - Protocol::err(&format!("ERR {}", e.0)) + // Lock the server only for command execution + let (res, debug_info) = { + let mut server_guard = server.lock().await; + + if server_guard.option.debug { + println!("\x1b[34;1mgot command: {:?}, protocol: {:?}\x1b[0m", cmd, protocol); + } else { + println!("got command: {:?}, protocol: {:?}", cmd, protocol); } + + let res = match cmd.run(&mut server_guard).await { + Ok(p) => p, + Err(e) => { + if server_guard.option.debug { + eprintln!("[run error] {:?}", e); + } + Protocol::err(&format!("ERR {}", e.0)) + } + }; + + let debug_info = if server_guard.option.debug { + Some((format!("queued cmd {:?}", server_guard.queued_cmd), format!("going to send response {}", res.encode()))) + } else { + Some((format!("queued cmd {:?}", server_guard.queued_cmd), format!("going to send response {}", res.encode()))) + }; + + (res, debug_info) }; - if self.option.debug { - println!("\x1b[34;1mqueued cmd {:?}\x1b[0m", self.queued_cmd); - println!("\x1b[32;1mgoing to send response {}\x1b[0m", res.encode()); - } else { - print!("queued cmd {:?}", self.queued_cmd); - println!("going to send response {}", res.encode()); + // Print debug info outside the lock + if let Some((queued_info, response_info)) = debug_info { + if let Some((_, response)) = response_info.split_once("going to send response ") { + if queued_info.contains("\x1b[34;1m") { + println!("\x1b[34;1m{}\x1b[0m", queued_info); + println!("\x1b[32;1mgoing to send response {}\x1b[0m", response); + } else { + println!("{}", queued_info); + println!("going to send response {}", response); + } + } } _ = stream.write(res.encode().as_bytes()).await?; diff --git a/herodb/tests/debug_hset.rs b/herodb/tests/debug_hset.rs index 77f3191..2d2afdd 100644 --- a/herodb/tests/debug_hset.rs +++ b/herodb/tests/debug_hset.rs @@ -1,4 +1,6 @@ use herodb::{server::Server, options::DBOption}; +use std::sync::Arc; +use tokio::sync::Mutex; use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; @@ -29,17 +31,20 @@ async fn debug_hset_simple() { encryption_key: None, }; - let mut server = Server::new(option).await; - + let server = Arc::new(Mutex::new(Server::new(option).await)); + // Start server in background tokio::spawn(async move { let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) .await .unwrap(); - + loop { if let Ok((stream, _)) = listener.accept().await { - let _ = server.handle(stream).await; + let server_clone = Arc::clone(&server); + tokio::spawn(async move { + let _ = Server::handle(server_clone, stream).await; + }); } } }); diff --git a/herodb/tests/debug_hset_simple.rs b/herodb/tests/debug_hset_simple.rs index 571554d..2fd37ee 100644 --- a/herodb/tests/debug_hset_simple.rs +++ b/herodb/tests/debug_hset_simple.rs @@ -3,6 +3,8 @@ use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; use tokio::time::sleep; +use std::sync::Arc; +use tokio::sync::Mutex; #[tokio::test] async fn debug_hset_return_value() { @@ -20,17 +22,20 @@ async fn debug_hset_return_value() { encryption_key: None, }; - let mut server = Server::new(option).await; - + let server = Arc::new(Mutex::new(Server::new(option).await)); + // Start server in background tokio::spawn(async move { let listener = tokio::net::TcpListener::bind("127.0.0.1:16390") .await .unwrap(); - + loop { if let Ok((stream, _)) = listener.accept().await { - let _ = server.handle(stream).await; + let server_clone = Arc::clone(&server); + tokio::spawn(async move { + let _ = Server::handle(server_clone, stream).await; + }); } } }); diff --git a/herodb/tests/redis_tests.rs b/herodb/tests/redis_tests.rs index b689a9c..4e88e0d 100644 --- a/herodb/tests/redis_tests.rs +++ b/herodb/tests/redis_tests.rs @@ -1,21 +1,23 @@ use herodb::{server::Server, options::DBOption}; +use std::sync::Arc; +use tokio::sync::Mutex; use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; use tokio::time::sleep; // Helper function to start a test server -async fn start_test_server(test_name: &str) -> (Server, u16) { +async fn start_test_server(test_name: &str) -> (Arc>, u16) { use std::sync::atomic::{AtomicU16, Ordering}; static PORT_COUNTER: AtomicU16 = AtomicU16::new(16379); - + let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst); let test_dir = format!("/tmp/herodb_test_{}", test_name); - + // Clean up and create test directory let _ = std::fs::remove_dir_all(&test_dir); std::fs::create_dir_all(&test_dir).unwrap(); - + let option = DBOption { dir: test_dir, port, @@ -23,8 +25,8 @@ async fn start_test_server(test_name: &str) -> (Server, u16) { encrypt: false, encryption_key: None, }; - - let server = Server::new(option).await; + + let server = Arc::new(Mutex::new(Server::new(option).await)); (server, port) } @@ -54,7 +56,7 @@ async fn send_command(stream: &mut TcpStream, command: &str) -> String { #[tokio::test] async fn test_basic_ping() { - let (mut server, port) = start_test_server("ping").await; + let (server, port) = start_test_server("ping").await; // Start server in background tokio::spawn(async move { @@ -64,7 +66,7 @@ async fn test_basic_ping() { loop { if let Ok((stream, _)) = listener.accept().await { - let _ = server.handle(stream).await; + let _ = Server::handle(Arc::clone(&server), stream).await; } } }); @@ -78,7 +80,7 @@ async fn test_basic_ping() { #[tokio::test] async fn test_string_operations() { - let (mut server, port) = start_test_server("string").await; + let (server, port) = start_test_server("string").await; // Start server in background tokio::spawn(async move { @@ -88,7 +90,7 @@ async fn test_string_operations() { loop { if let Ok((stream, _)) = listener.accept().await { - let _ = server.handle(stream).await; + let _ = Server::handle(Arc::clone(&server), stream).await; } } }); @@ -120,7 +122,7 @@ async fn test_string_operations() { #[tokio::test] async fn test_incr_operations() { - let (mut server, port) = start_test_server("incr").await; + let (server, port) = start_test_server("incr").await; tokio::spawn(async move { let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) @@ -129,7 +131,7 @@ async fn test_incr_operations() { loop { if let Ok((stream, _)) = listener.accept().await { - let _ = server.handle(stream).await; + let _ = Server::handle(Arc::clone(&server), stream).await; } } }); @@ -154,7 +156,7 @@ async fn test_incr_operations() { #[tokio::test] async fn test_hash_operations() { - let (mut server, port) = start_test_server("hash").await; + let (server, port) = start_test_server("hash").await; tokio::spawn(async move { let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) @@ -163,7 +165,7 @@ async fn test_hash_operations() { loop { if let Ok((stream, _)) = listener.accept().await { - let _ = server.handle(stream).await; + let _ = Server::handle(Arc::clone(&server), stream).await; } } }); @@ -229,7 +231,7 @@ async fn test_expiration() { loop { if let Ok((stream, _)) = listener.accept().await { - let _ = server.handle(stream).await; + let _ = Server::handle(Arc::clone(&server), stream).await; } } }); @@ -277,7 +279,7 @@ async fn test_scan_operations() { loop { if let Ok((stream, _)) = listener.accept().await { - let _ = server.handle(stream).await; + let _ = Server::handle(Arc::clone(&server), stream).await; } } }); @@ -313,7 +315,7 @@ async fn test_hscan_operations() { loop { if let Ok((stream, _)) = listener.accept().await { - let _ = server.handle(stream).await; + let _ = Server::handle(Arc::clone(&server), stream).await; } } }); @@ -345,7 +347,7 @@ async fn test_transaction_operations() { loop { if let Ok((stream, _)) = listener.accept().await { - let _ = server.handle(stream).await; + let _ = Server::handle(Arc::clone(&server), stream).await; } } }); @@ -388,7 +390,7 @@ async fn test_discard_transaction() { loop { if let Ok((stream, _)) = listener.accept().await { - let _ = server.handle(stream).await; + let _ = Server::handle(Arc::clone(&server), stream).await; } } }); @@ -425,7 +427,7 @@ async fn test_type_command() { loop { if let Ok((stream, _)) = listener.accept().await { - let _ = server.handle(stream).await; + let _ = Server::handle(Arc::clone(&server), stream).await; } } }); @@ -460,7 +462,7 @@ async fn test_config_commands() { loop { if let Ok((stream, _)) = listener.accept().await { - let _ = server.handle(stream).await; + let _ = Server::handle(Arc::clone(&server), stream).await; } } }); @@ -491,7 +493,7 @@ async fn test_info_command() { loop { if let Ok((stream, _)) = listener.accept().await { - let _ = server.handle(stream).await; + let _ = Server::handle(Arc::clone(&server), stream).await; } } }); @@ -520,7 +522,7 @@ async fn test_error_handling() { loop { if let Ok((stream, _)) = listener.accept().await { - let _ = server.handle(stream).await; + let _ = Server::handle(Arc::clone(&server), stream).await; } } }); @@ -558,7 +560,7 @@ async fn test_list_operations() { loop { if let Ok((stream, _)) = listener.accept().await { - let _ = server.handle(stream).await; + let _ = Server::handle(Arc::clone(&server), stream).await; } } }); diff --git a/herodb/tests/simple_integration_test.rs b/herodb/tests/simple_integration_test.rs index 52e2eb9..8aade33 100644 --- a/herodb/tests/simple_integration_test.rs +++ b/herodb/tests/simple_integration_test.rs @@ -1,23 +1,25 @@ use herodb::{server::Server, options::DBOption}; +use std::sync::Arc; +use tokio::sync::Mutex; use std::time::Duration; use tokio::time::sleep; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; // Helper function to start a test server with clean data directory -async fn start_test_server(test_name: &str) -> (Server, u16) { +async fn start_test_server(test_name: &str) -> (Arc>, u16) { use std::sync::atomic::{AtomicU16, Ordering}; static PORT_COUNTER: AtomicU16 = AtomicU16::new(17000); - + // Get a unique port for this test let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst); - + let test_dir = format!("/tmp/herodb_test_{}", test_name); - + // Clean up any existing test data let _ = std::fs::remove_dir_all(&test_dir); std::fs::create_dir_all(&test_dir).unwrap(); - + let option = DBOption { dir: test_dir, port, @@ -25,8 +27,8 @@ async fn start_test_server(test_name: &str) -> (Server, u16) { encrypt: false, encryption_key: None, }; - - let server = Server::new(option).await; + + let server = Arc::new(Mutex::new(Server::new(option).await)); (server, port) } @@ -42,7 +44,7 @@ async fn send_redis_command(port: u16, command: &str) -> String { #[tokio::test] async fn test_basic_redis_functionality() { - let (mut server, port) = start_test_server("basic").await; + let (server, port) = start_test_server("basic").await; // Start server in background with timeout let server_handle = tokio::spawn(async move { @@ -53,7 +55,7 @@ async fn test_basic_redis_functionality() { // Accept only a few connections for testing for _ in 0..10 { if let Ok((stream, _)) = listener.accept().await { - let _ = server.handle(stream).await; + let _ = Server::handle(Arc::clone(&server), stream).await; } } }); @@ -111,7 +113,7 @@ async fn test_basic_redis_functionality() { #[tokio::test] async fn test_hash_operations() { - let (mut server, port) = start_test_server("hash_ops").await; + let (server, port) = start_test_server("hash_ops").await; // Start server in background with timeout let server_handle = tokio::spawn(async move { @@ -122,7 +124,7 @@ async fn test_hash_operations() { // Accept only a few connections for testing for _ in 0..5 { if let Ok((stream, _)) = listener.accept().await { - let _ = server.handle(stream).await; + let _ = Server::handle(Arc::clone(&server), stream).await; } } }); @@ -165,7 +167,7 @@ async fn test_hash_operations() { #[tokio::test] async fn test_transaction_operations() { - let (mut server, port) = start_test_server("transactions").await; + let (server, port) = start_test_server("transactions").await; // Start server in background with timeout let server_handle = tokio::spawn(async move { @@ -176,7 +178,7 @@ async fn test_transaction_operations() { // Accept only a few connections for testing for _ in 0..5 { if let Ok((stream, _)) = listener.accept().await { - let _ = server.handle(stream).await; + let _ = Server::handle(Arc::clone(&server), stream).await; } } }); diff --git a/herodb/tests/simple_redis_test.rs b/herodb/tests/simple_redis_test.rs index d2f30f6..b5fdaf4 100644 --- a/herodb/tests/simple_redis_test.rs +++ b/herodb/tests/simple_redis_test.rs @@ -1,21 +1,23 @@ use herodb::{server::Server, options::DBOption}; +use std::sync::Arc; +use tokio::sync::Mutex; use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; use tokio::time::sleep; // Helper function to start a test server with clean data directory -async fn start_test_server(test_name: &str) -> (Server, u16) { +async fn start_test_server(test_name: &str) -> (Arc>, u16) { use std::sync::atomic::{AtomicU16, Ordering}; static PORT_COUNTER: AtomicU16 = AtomicU16::new(16500); - + let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst); let test_dir = format!("/tmp/herodb_simple_test_{}", test_name); - + // Clean up any existing test data let _ = std::fs::remove_dir_all(&test_dir); std::fs::create_dir_all(&test_dir).unwrap(); - + let option = DBOption { dir: test_dir, port, @@ -23,8 +25,8 @@ async fn start_test_server(test_name: &str) -> (Server, u16) { encrypt: false, encryption_key: None, }; - - let server = Server::new(option).await; + + let server = Arc::new(Mutex::new(Server::new(option).await)); (server, port) } @@ -54,7 +56,7 @@ async fn connect_to_server(port: u16) -> TcpStream { #[tokio::test] async fn test_basic_ping_simple() { - let (mut server, port) = start_test_server("ping").await; + let (server, port) = start_test_server("ping").await; // Start server in background tokio::spawn(async move { @@ -64,7 +66,8 @@ async fn test_basic_ping_simple() { loop { if let Ok((stream, _)) = listener.accept().await { - let _ = server.handle(stream).await; + let server_clone = Arc::clone(&server); + let _ = Server::handle(server_clone, stream).await; } } }); @@ -78,7 +81,7 @@ async fn test_basic_ping_simple() { #[tokio::test] async fn test_hset_clean_db() { - let (mut server, port) = start_test_server("hset_clean").await; + let (server, port) = start_test_server("hset_clean").await; // Start server in background tokio::spawn(async move { @@ -88,7 +91,8 @@ async fn test_hset_clean_db() { loop { if let Ok((stream, _)) = listener.accept().await { - let _ = server.handle(stream).await; + let server_clone = Arc::clone(&server); + let _ = Server::handle(server_clone, stream).await; } } }); @@ -110,7 +114,7 @@ async fn test_hset_clean_db() { #[tokio::test] async fn test_type_command_simple() { - let (mut server, port) = start_test_server("type").await; + let (server, port) = start_test_server("type").await; // Start server in background tokio::spawn(async move { @@ -120,7 +124,8 @@ async fn test_type_command_simple() { loop { if let Ok((stream, _)) = listener.accept().await { - let _ = server.handle(stream).await; + let server_clone = Arc::clone(&server); + let _ = Server::handle(server_clone, stream).await; } } }); @@ -149,7 +154,7 @@ async fn test_type_command_simple() { #[tokio::test] async fn test_hexists_simple() { - let (mut server, port) = start_test_server("hexists").await; + let (server, port) = start_test_server("hexists").await; // Start server in background tokio::spawn(async move { @@ -159,7 +164,8 @@ async fn test_hexists_simple() { loop { if let Ok((stream, _)) = listener.accept().await { - let _ = server.handle(stream).await; + let server_clone = Arc::clone(&server); + let _ = Server::handle(server_clone, stream).await; } } }); diff --git a/herodb/tests/usage_suite.rs b/herodb/tests/usage_suite.rs index c6f5b38..6f1cc54 100644 --- a/herodb/tests/usage_suite.rs +++ b/herodb/tests/usage_suite.rs @@ -2,12 +2,14 @@ use herodb::{options::DBOption, server::Server}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; use tokio::time::{sleep, Duration}; +use std::sync::Arc; +use tokio::sync::Mutex; // ========================= // Helpers // ========================= -async fn start_test_server(test_name: &str) -> (Server, u16) { +async fn start_test_server(test_name: &str) -> (Arc>, u16) { use std::sync::atomic::{AtomicU16, Ordering}; static PORT_COUNTER: AtomicU16 = AtomicU16::new(17100); let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst); @@ -24,11 +26,11 @@ async fn start_test_server(test_name: &str) -> (Server, u16) { encryption_key: None, }; - let server = Server::new(option).await; + let server = Arc::new(Mutex::new(Server::new(option).await)); (server, port) } -async fn spawn_listener(server: Server, port: u16) { +async fn spawn_listener(server: Arc>, port: u16) { tokio::spawn(async move { let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) .await @@ -36,9 +38,9 @@ async fn spawn_listener(server: Server, port: u16) { loop { match listener.accept().await { Ok((stream, _)) => { - let mut s_clone = server.clone(); + let server_clone = Arc::clone(&server); tokio::spawn(async move { - let _ = s_clone.handle(stream).await; + let _ = Server::handle(server_clone, stream).await; }); } Err(_e) => break,