use redis_rs::{server::Server, options::DBOption}; use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; use tokio::time::sleep; // Helper function to start a test server async fn start_test_server(test_name: &str) -> (Server, u16) { use std::sync::atomic::{AtomicU16, Ordering}; static PORT_COUNTER: AtomicU16 = AtomicU16::new(16379); let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst); let test_dir = format!("/tmp/herodb_test_{}", test_name); // Clean up and create test directory let _ = std::fs::remove_dir_all(&test_dir); std::fs::create_dir_all(&test_dir).unwrap(); let option = DBOption { dir: test_dir, port, debug: true, }; let server = Server::new(option).await; (server, port) } // Helper function to connect to the test server async fn connect_to_server(port: u16) -> TcpStream { let mut attempts = 0; loop { match TcpStream::connect(format!("127.0.0.1:{}", port)).await { Ok(stream) => return stream, Err(_) if attempts < 10 => { attempts += 1; sleep(Duration::from_millis(100)).await; } Err(e) => panic!("Failed to connect to test server: {}", e), } } } // Helper function to send command and get response async fn send_command(stream: &mut TcpStream, command: &str) -> String { stream.write_all(command.as_bytes()).await.unwrap(); let mut buffer = [0; 1024]; let n = stream.read(&mut buffer).await.unwrap(); String::from_utf8_lossy(&buffer[..n]).to_string() } #[tokio::test] async fn test_basic_ping() { let (mut server, port) = start_test_server("ping").await; // Start server in background tokio::spawn(async move { let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) .await .unwrap(); loop { if let Ok((stream, _)) = listener.accept().await { let _ = server.handle(stream).await; } } }); sleep(Duration::from_millis(100)).await; let mut stream = connect_to_server(port).await; let response = send_command(&mut stream, "*1\r\n$4\r\nPING\r\n").await; assert!(response.contains("PONG")); } #[tokio::test] async fn test_string_operations() { let (mut server, port) = start_test_server("string").await; // Start server in background tokio::spawn(async move { let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) .await .unwrap(); loop { if let Ok((stream, _)) = listener.accept().await { let _ = server.handle(stream).await; } } }); sleep(Duration::from_millis(100)).await; let mut stream = connect_to_server(port).await; // Test SET let response = send_command(&mut stream, "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n").await; assert!(response.contains("OK")); // Test GET let response = send_command(&mut stream, "*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n").await; assert!(response.contains("value")); // Test GET non-existent key let response = send_command(&mut stream, "*2\r\n$3\r\nGET\r\n$7\r\nnoexist\r\n").await; assert!(response.contains("$-1")); // NULL response // Test DEL let response = send_command(&mut stream, "*2\r\n$3\r\nDEL\r\n$3\r\nkey\r\n").await; assert!(response.contains("1")); // Test GET after DEL let response = send_command(&mut stream, "*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n").await; assert!(response.contains("$-1")); // NULL response } #[tokio::test] async fn test_incr_operations() { let (mut server, port) = start_test_server("incr").await; tokio::spawn(async move { let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) .await .unwrap(); loop { if let Ok((stream, _)) = listener.accept().await { let _ = server.handle(stream).await; } } }); sleep(Duration::from_millis(100)).await; let mut stream = connect_to_server(port).await; // Test INCR on non-existent key let response = send_command(&mut stream, "*2\r\n$4\r\nINCR\r\n$7\r\ncounter\r\n").await; assert!(response.contains("1")); // Test INCR on existing key let response = send_command(&mut stream, "*2\r\n$4\r\nINCR\r\n$7\r\ncounter\r\n").await; assert!(response.contains("2")); // Test INCR on string value (should fail) send_command(&mut stream, "*3\r\n$3\r\nSET\r\n$6\r\nstring\r\n$5\r\nhello\r\n").await; let response = send_command(&mut stream, "*2\r\n$4\r\nINCR\r\n$6\r\nstring\r\n").await; assert!(response.contains("ERR")); } #[tokio::test] async fn test_hash_operations() { let (mut server, port) = start_test_server("hash").await; tokio::spawn(async move { let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) .await .unwrap(); loop { if let Ok((stream, _)) = listener.accept().await { let _ = server.handle(stream).await; } } }); sleep(Duration::from_millis(100)).await; let mut stream = connect_to_server(port).await; // Test HSET let response = send_command(&mut stream, "*4\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n$6\r\nvalue1\r\n").await; assert!(response.contains("1")); // 1 new field // Test HGET let response = send_command(&mut stream, "*3\r\n$4\r\nHGET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n").await; assert!(response.contains("value1")); // Test HSET multiple fields let response = send_command(&mut stream, "*6\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$6\r\nfield2\r\n$6\r\nvalue2\r\n$6\r\nfield3\r\n$6\r\nvalue3\r\n").await; assert!(response.contains("2")); // 2 new fields // Test HGETALL let response = send_command(&mut stream, "*2\r\n$7\r\nHGETALL\r\n$4\r\nhash\r\n").await; assert!(response.contains("field1")); assert!(response.contains("value1")); assert!(response.contains("field2")); assert!(response.contains("value2")); // Test HLEN let response = send_command(&mut stream, "*2\r\n$4\r\nHLEN\r\n$4\r\nhash\r\n").await; assert!(response.contains("3")); // Test HEXISTS let response = send_command(&mut stream, "*3\r\n$7\r\nHEXISTS\r\n$4\r\nhash\r\n$6\r\nfield1\r\n").await; assert!(response.contains("1")); let response = send_command(&mut stream, "*3\r\n$7\r\nHEXISTS\r\n$4\r\nhash\r\n$7\r\nnoexist\r\n").await; assert!(response.contains("0")); // Test HDEL let response = send_command(&mut stream, "*3\r\n$4\r\nHDEL\r\n$4\r\nhash\r\n$6\r\nfield1\r\n").await; assert!(response.contains("1")); // Test HKEYS let response = send_command(&mut stream, "*2\r\n$5\r\nHKEYS\r\n$4\r\nhash\r\n").await; assert!(response.contains("field2")); assert!(response.contains("field3")); assert!(!response.contains("field1")); // Should be deleted // Test HVALS let response = send_command(&mut stream, "*2\r\n$5\r\nHVALS\r\n$4\r\nhash\r\n").await; assert!(response.contains("value2")); assert!(response.contains("value3")); } #[tokio::test] async fn test_expiration() { let (mut server, port) = start_test_server("expiration").await; tokio::spawn(async move { let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) .await .unwrap(); loop { if let Ok((stream, _)) = listener.accept().await { let _ = server.handle(stream).await; } } }); sleep(Duration::from_millis(100)).await; let mut stream = connect_to_server(port).await; // Test SETEX (expire in 1 second) let response = send_command(&mut stream, "*5\r\n$3\r\nSET\r\n$6\r\nexpkey\r\n$5\r\nvalue\r\n$2\r\nEX\r\n$1\r\n1\r\n").await; assert!(response.contains("OK")); // Test TTL let response = send_command(&mut stream, "*2\r\n$3\r\nTTL\r\n$6\r\nexpkey\r\n").await; assert!(response.contains("1") || response.contains("0")); // Should be 1 or 0 seconds // Test EXISTS let response = send_command(&mut stream, "*2\r\n$6\r\nEXISTS\r\n$6\r\nexpkey\r\n").await; assert!(response.contains("1")); // Wait for expiration sleep(Duration::from_millis(1100)).await; // Test GET after expiration let response = send_command(&mut stream, "*2\r\n$3\r\nGET\r\n$6\r\nexpkey\r\n").await; assert!(response.contains("$-1")); // Should be NULL // Test TTL after expiration let response = send_command(&mut stream, "*2\r\n$3\r\nTTL\r\n$6\r\nexpkey\r\n").await; assert!(response.contains("-2")); // Key doesn't exist // Test EXISTS after expiration let response = send_command(&mut stream, "*2\r\n$6\r\nEXISTS\r\n$6\r\nexpkey\r\n").await; assert!(response.contains("0")); } #[tokio::test] async fn test_scan_operations() { let (mut server, port) = start_test_server("scan").await; tokio::spawn(async move { let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) .await .unwrap(); loop { if let Ok((stream, _)) = listener.accept().await { let _ = server.handle(stream).await; } } }); sleep(Duration::from_millis(100)).await; let mut stream = connect_to_server(port).await; // Set up test data for i in 0..5 { let cmd = format!("*3\r\n$3\r\nSET\r\n$4\r\nkey{}\r\n$6\r\nvalue{}\r\n", i, i); send_command(&mut stream, &cmd).await; } // Test SCAN let response = send_command(&mut stream, "*6\r\n$4\r\nSCAN\r\n$1\r\n0\r\n$5\r\nMATCH\r\n$1\r\n*\r\n$5\r\nCOUNT\r\n$2\r\n10\r\n").await; assert!(response.contains("key")); // Test KEYS let response = send_command(&mut stream, "*2\r\n$4\r\nKEYS\r\n$1\r\n*\r\n").await; assert!(response.contains("key0")); assert!(response.contains("key1")); } #[tokio::test] async fn test_hscan_operations() { let (mut server, port) = start_test_server("hscan").await; tokio::spawn(async move { let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) .await .unwrap(); loop { if let Ok((stream, _)) = listener.accept().await { let _ = server.handle(stream).await; } } }); sleep(Duration::from_millis(100)).await; let mut stream = connect_to_server(port).await; // Set up hash data for i in 0..3 { let cmd = format!("*4\r\n$4\r\nHSET\r\n$8\r\ntesthash\r\n$6\r\nfield{}\r\n$6\r\nvalue{}\r\n", i, i); send_command(&mut stream, &cmd).await; } // Test HSCAN let response = send_command(&mut stream, "*7\r\n$5\r\nHSCAN\r\n$8\r\ntesthash\r\n$1\r\n0\r\n$5\r\nMATCH\r\n$1\r\n*\r\n$5\r\nCOUNT\r\n$2\r\n10\r\n").await; assert!(response.contains("field")); assert!(response.contains("value")); } #[tokio::test] async fn test_transaction_operations() { let (mut server, port) = start_test_server("transaction").await; tokio::spawn(async move { let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) .await .unwrap(); loop { if let Ok((stream, _)) = listener.accept().await { let _ = server.handle(stream).await; } } }); sleep(Duration::from_millis(100)).await; let mut stream = connect_to_server(port).await; // Test MULTI let response = send_command(&mut stream, "*1\r\n$5\r\nMULTI\r\n").await; assert!(response.contains("OK")); // Test queued commands let response = send_command(&mut stream, "*3\r\n$3\r\nSET\r\n$4\r\nkey1\r\n$6\r\nvalue1\r\n").await; assert!(response.contains("QUEUED")); let response = send_command(&mut stream, "*3\r\n$3\r\nSET\r\n$4\r\nkey2\r\n$6\r\nvalue2\r\n").await; assert!(response.contains("QUEUED")); // Test EXEC let response = send_command(&mut stream, "*1\r\n$4\r\nEXEC\r\n").await; assert!(response.contains("OK")); // Should contain results of executed commands // Verify commands were executed let response = send_command(&mut stream, "*2\r\n$3\r\nGET\r\n$4\r\nkey1\r\n").await; assert!(response.contains("value1")); let response = send_command(&mut stream, "*2\r\n$3\r\nGET\r\n$4\r\nkey2\r\n").await; assert!(response.contains("value2")); } #[tokio::test] async fn test_discard_transaction() { let (mut server, port) = start_test_server("discard").await; tokio::spawn(async move { let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) .await .unwrap(); loop { if let Ok((stream, _)) = listener.accept().await { let _ = server.handle(stream).await; } } }); sleep(Duration::from_millis(100)).await; let mut stream = connect_to_server(port).await; // Test MULTI let response = send_command(&mut stream, "*1\r\n$5\r\nMULTI\r\n").await; assert!(response.contains("OK")); // Test queued command let response = send_command(&mut stream, "*3\r\n$3\r\nSET\r\n$7\r\ndiscard\r\n$5\r\nvalue\r\n").await; assert!(response.contains("QUEUED")); // Test DISCARD let response = send_command(&mut stream, "*1\r\n$7\r\nDISCARD\r\n").await; assert!(response.contains("OK")); // Verify command was not executed let response = send_command(&mut stream, "*2\r\n$3\r\nGET\r\n$7\r\ndiscard\r\n").await; assert!(response.contains("$-1")); // Should be NULL } #[tokio::test] async fn test_type_command() { let (mut server, port) = start_test_server("type").await; tokio::spawn(async move { let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) .await .unwrap(); loop { if let Ok((stream, _)) = listener.accept().await { let _ = server.handle(stream).await; } } }); sleep(Duration::from_millis(100)).await; let mut stream = connect_to_server(port).await; // Test string type send_command(&mut stream, "*3\r\n$3\r\nSET\r\n$6\r\nstring\r\n$5\r\nvalue\r\n").await; let response = send_command(&mut stream, "*2\r\n$4\r\nTYPE\r\n$6\r\nstring\r\n").await; assert!(response.contains("string")); // Test hash type send_command(&mut stream, "*4\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$5\r\nfield\r\n$5\r\nvalue\r\n").await; let response = send_command(&mut stream, "*2\r\n$4\r\nTYPE\r\n$4\r\nhash\r\n").await; assert!(response.contains("hash")); // Test non-existent key let response = send_command(&mut stream, "*2\r\n$4\r\nTYPE\r\n$7\r\nnoexist\r\n").await; assert!(response.contains("none")); } #[tokio::test] async fn test_config_commands() { let (mut server, port) = start_test_server("config").await; tokio::spawn(async move { let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) .await .unwrap(); loop { if let Ok((stream, _)) = listener.accept().await { let _ = server.handle(stream).await; } } }); sleep(Duration::from_millis(100)).await; let mut stream = connect_to_server(port).await; // Test CONFIG GET databases let response = send_command(&mut stream, "*3\r\n$6\r\nCONFIG\r\n$3\r\nGET\r\n$9\r\ndatabases\r\n").await; assert!(response.contains("databases")); assert!(response.contains("16")); // Test CONFIG GET dir let response = send_command(&mut stream, "*3\r\n$6\r\nCONFIG\r\n$3\r\nGET\r\n$3\r\ndir\r\n").await; assert!(response.contains("dir")); assert!(response.contains("/tmp/herodb_test_config")); } #[tokio::test] async fn test_info_command() { let (mut server, port) = start_test_server("info").await; tokio::spawn(async move { let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) .await .unwrap(); loop { if let Ok((stream, _)) = listener.accept().await { let _ = server.handle(stream).await; } } }); sleep(Duration::from_millis(100)).await; let mut stream = connect_to_server(port).await; // Test INFO let response = send_command(&mut stream, "*1\r\n$4\r\nINFO\r\n").await; assert!(response.contains("redis_version")); // Test INFO replication let response = send_command(&mut stream, "*2\r\n$4\r\nINFO\r\n$11\r\nreplication\r\n").await; assert!(response.contains("role:master")); } #[tokio::test] async fn test_error_handling() { let (mut server, port) = start_test_server("error").await; tokio::spawn(async move { let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) .await .unwrap(); loop { if let Ok((stream, _)) = listener.accept().await { let _ = server.handle(stream).await; } } }); sleep(Duration::from_millis(100)).await; let mut stream = connect_to_server(port).await; // Test WRONGTYPE error - try to use hash command on string send_command(&mut stream, "*3\r\n$3\r\nSET\r\n$6\r\nstring\r\n$5\r\nvalue\r\n").await; let response = send_command(&mut stream, "*3\r\n$4\r\nHGET\r\n$6\r\nstring\r\n$5\r\nfield\r\n").await; assert!(response.contains("WRONGTYPE")); // Test unknown command let response = send_command(&mut stream, "*1\r\n$7\r\nUNKNOWN\r\n").await; assert!(response.contains("unknown cmd") || response.contains("ERR")); // Test EXEC without MULTI let response = send_command(&mut stream, "*1\r\n$4\r\nEXEC\r\n").await; assert!(response.contains("ERR")); // Test DISCARD without MULTI let response = send_command(&mut stream, "*1\r\n$7\r\nDISCARD\r\n").await; assert!(response.contains("ERR")); } #[tokio::test] async fn test_list_operations() { let (mut server, port) = start_test_server("list").await; tokio::spawn(async move { let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port)) .await .unwrap(); loop { if let Ok((stream, _)) = listener.accept().await { let _ = server.handle(stream).await; } } }); sleep(Duration::from_millis(100)).await; let mut stream = connect_to_server(port).await; // Test LPUSH let response = send_command(&mut stream, "*4\r\n$5\r\nLPUSH\r\n$4\r\nlist\r\n$1\r\na\r\n$1\r\nb\r\n").await; assert!(response.contains("2")); // 2 elements // Test RPUSH let response = send_command(&mut stream, "*4\r\n$5\r\nRPUSH\r\n$4\r\nlist\r\n$1\r\nc\r\n$1\r\nd\r\n").await; assert!(response.contains("4")); // 4 elements // Test LLEN let response = send_command(&mut stream, "*2\r\n$4\r\nLLEN\r\n$4\r\nlist\r\n").await; assert!(response.contains("4")); // Test LRANGE let response = send_command(&mut stream, "*4\r\n$6\r\nLRANGE\r\n$4\r\nlist\r\n$1\r\n0\r\n$2\r\n-1\r\n").await; assert!(response.contains("b")); assert!(response.contains("a")); assert!(response.contains("c")); assert!(response.contains("d")); // Test LINDEX let response = send_command(&mut stream, "*3\r\n$6\r\nLINDEX\r\n$4\r\nlist\r\n$1\r\n0\r\n").await; assert!(response.contains("b")); // Test LPOP let response = send_command(&mut stream, "*2\r\n$4\r\nLPOP\r\n$4\r\nlist\r\n").await; assert!(response.contains("b")); // Test RPOP let response = send_command(&mut stream, "*2\r\n$4\r\nRPOP\r\n$4\r\nlist\r\n").await; assert!(response.contains("d")); // Test LREM send_command(&mut stream, "*3\r\n$5\r\nLPUSH\r\n$4\r\nlist\r\n$1\r\na\r\n").await; // list is now a, c, a let response = send_command(&mut stream, "*4\r\n$4\r\nLREM\r\n$4\r\nlist\r\n$1\r\n1\r\n$1\r\na\r\n").await; assert!(response.contains("1")); // Test LTRIM let response = send_command(&mut stream, "*4\r\n$5\r\nLTRIM\r\n$4\r\nlist\r\n$1\r\n0\r\n$1\r\n0\r\n").await; assert!(response.contains("OK")); let response = send_command(&mut stream, "*2\r\n$4\r\nLLEN\r\n$4\r\nlist\r\n").await; assert!(response.contains("1")); }