Compare commits

...

5 Commits

Author SHA1 Message Date
0511dddd99 ... 2025-08-16 08:50:28 +02:00
bec9b20ec7 ... 2025-08-16 08:41:19 +02:00
ad255a9f51 ... 2025-08-16 08:28:52 +02:00
7bcb673361 ... 2025-08-16 08:25:25 +02:00
0f6e595000 ... 2025-08-16 07:54:55 +02:00
24 changed files with 3488 additions and 155 deletions

354
Cargo.lock generated
View File

@ -178,6 +178,36 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0"
[[package]]
name = "combine"
version = "4.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd"
dependencies = [
"bytes",
"memchr",
]
[[package]]
name = "displaydoc"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "form_urlencoded"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456"
dependencies = [
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.31"
@ -285,18 +315,137 @@ version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024"
[[package]]
name = "icu_collections"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "200072f5d0e3614556f94a9930d5dc3e0662a652823904c3a75dc3b0af7fee47"
dependencies = [
"displaydoc",
"potential_utf",
"yoke",
"zerofrom",
"zerovec",
]
[[package]]
name = "icu_locale_core"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0cde2700ccaed3872079a65fb1a78f6c0a36c91570f28755dda67bc8f7d9f00a"
dependencies = [
"displaydoc",
"litemap",
"tinystr",
"writeable",
"zerovec",
]
[[package]]
name = "icu_normalizer"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "436880e8e18df4d7bbc06d58432329d6458cc84531f7ac5f024e93deadb37979"
dependencies = [
"displaydoc",
"icu_collections",
"icu_normalizer_data",
"icu_properties",
"icu_provider",
"smallvec",
"zerovec",
]
[[package]]
name = "icu_normalizer_data"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00210d6893afc98edb752b664b8890f0ef174c8adbb8d0be9710fa66fbbf72d3"
[[package]]
name = "icu_properties"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "016c619c1eeb94efb86809b015c58f479963de65bdb6253345c1a1276f22e32b"
dependencies = [
"displaydoc",
"icu_collections",
"icu_locale_core",
"icu_properties_data",
"icu_provider",
"potential_utf",
"zerotrie",
"zerovec",
]
[[package]]
name = "icu_properties_data"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "298459143998310acd25ffe6810ed544932242d3f07083eee1084d83a71bd632"
[[package]]
name = "icu_provider"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03c80da27b5f4187909049ee2d72f276f0d9f99a42c306bd0131ecfe04d8e5af"
dependencies = [
"displaydoc",
"icu_locale_core",
"stable_deref_trait",
"tinystr",
"writeable",
"yoke",
"zerofrom",
"zerotrie",
"zerovec",
]
[[package]]
name = "idna"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "686f825264d630750a544639377bae737628043f20d38bbc029e8f29ea968a7e"
dependencies = [
"idna_adapter",
"smallvec",
"utf8_iter",
]
[[package]]
name = "idna_adapter"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344"
dependencies = [
"icu_normalizer",
"icu_properties",
]
[[package]]
name = "is_terminal_polyfill"
version = "1.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
[[package]]
name = "itoa"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c"
[[package]]
name = "libc"
version = "0.2.155"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c"
[[package]]
name = "litemap"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "241eaef5fd12c88705a01fc1066c48c4b36e0dd4377dcdc7ec3942cea7a69956"
[[package]]
name = "lock_api"
version = "0.4.12"
@ -375,6 +524,12 @@ dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "percent-encoding"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "pin-project-lite"
version = "0.2.14"
@ -387,6 +542,15 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "potential_utf"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5a7c30837279ca13e7c867e9e40053bc68740f988cb07f7ca6df43cc734b585"
dependencies = [
"zerovec",
]
[[package]]
name = "proc-macro2"
version = "1.0.86"
@ -414,6 +578,21 @@ dependencies = [
"libc",
]
[[package]]
name = "redis"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c580d9cbbe1d1b479e8d67cf9daf6a62c957e6846048408b80b43ac3f6af84cd"
dependencies = [
"combine",
"itoa",
"percent-encoding",
"ryu",
"sha1_smol",
"socket2 0.4.10",
"url",
]
[[package]]
name = "redis-rs"
version = "0.0.1"
@ -425,6 +604,7 @@ dependencies = [
"clap",
"futures",
"redb",
"redis",
"serde",
"thiserror",
"tokio",
@ -445,6 +625,12 @@ version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]]
name = "ryu"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f"
[[package]]
name = "scopeguard"
version = "1.2.0"
@ -471,6 +657,12 @@ dependencies = [
"syn",
]
[[package]]
name = "sha1_smol"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d"
[[package]]
name = "signal-hook-registry"
version = "1.4.2"
@ -495,6 +687,16 @@ version = "1.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
[[package]]
name = "socket2"
version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "socket2"
version = "0.5.7"
@ -505,6 +707,12 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "strsim"
version = "0.11.1"
@ -522,6 +730,17 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "synstructure"
version = "0.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "thiserror"
version = "1.0.61"
@ -542,6 +761,16 @@ dependencies = [
"syn",
]
[[package]]
name = "tinystr"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d4f6d1145dcb577acf783d4e601bc1d76a13337bb54e6233add580b07344c8b"
dependencies = [
"displaydoc",
"zerovec",
]
[[package]]
name = "tokio"
version = "1.38.0"
@ -556,7 +785,7 @@ dependencies = [
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"socket2 0.5.7",
"tokio-macros",
"windows-sys 0.48.0",
]
@ -578,6 +807,23 @@ version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b"
[[package]]
name = "url"
version = "2.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60"
dependencies = [
"form_urlencoded",
"idna",
"percent-encoding",
]
[[package]]
name = "utf8_iter"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
[[package]]
name = "utf8parse"
version = "0.2.2"
@ -590,6 +836,28 @@ version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-sys"
version = "0.48.0"
@ -728,3 +996,87 @@ name = "windows_x86_64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]]
name = "writeable"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea2f10b9bb0928dfb1b42b65e1f9e36f7f54dbdf08457afefb38afcdec4fa2bb"
[[package]]
name = "yoke"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f41bb01b8226ef4bfd589436a297c53d118f65921786300e427be8d487695cc"
dependencies = [
"serde",
"stable_deref_trait",
"yoke-derive",
"zerofrom",
]
[[package]]
name = "yoke-derive"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38da3c9736e16c5d3c8c597a9aaa5d1fa565d0532ae05e27c24aa62fb32c0ab6"
dependencies = [
"proc-macro2",
"quote",
"syn",
"synstructure",
]
[[package]]
name = "zerofrom"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50cc42e0333e05660c3587f3bf9d0478688e15d870fab3346451ce7f8c9fbea5"
dependencies = [
"zerofrom-derive",
]
[[package]]
name = "zerofrom-derive"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502"
dependencies = [
"proc-macro2",
"quote",
"syn",
"synstructure",
]
[[package]]
name = "zerotrie"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36f0bbd478583f79edad978b407914f61b2972f5af6fa089686016be8f9af595"
dependencies = [
"displaydoc",
"yoke",
"zerofrom",
]
[[package]]
name = "zerovec"
version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7aa2bd55086f1ab526693ecbe444205da57e25f4489879da80635a46d90e73b"
dependencies = [
"yoke",
"zerofrom",
"zerovec-derive",
]
[[package]]
name = "zerovec-derive"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b96237efa0c878c64bd89c436f661be4e46b2f3eff1ebb976f7ef2321d2f58f"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

View File

@ -16,3 +16,5 @@ redb = "2.1.3"
serde = { version = "1.0", features = ["derive"] }
bincode = "1.3.3"
[dev-dependencies]
redis = "0.24"

View File

@ -1,64 +0,0 @@
title: 从 0 到 1 由 Rust 构建 Redis
description: 从 0 到 1 由 Rust 构建 Redis
theme: just-the-docs
url: https://fangpin.github.io/redis-rs
aux_links:
GitHub: https://fangpin.github.io/redis-rs
# logo: "/assets/images/just-the-docs.png"
search_enabled: true
search:
# Split pages into sections that can be searched individually
# Supports 1 - 6, default: 2
heading_level: 2
# Maximum amount of previews per search result
# Default: 3
previews: 3
# Maximum amount of words to display before a matched word in the preview
# Default: 5
preview_words_before: 5
# Maximum amount of words to display after a matched word in the preview
# Default: 10
preview_words_after: 10
# Set the search token separator
# Default: /[\s\-/]+/
# Example: enable support for hyphenated search words
tokenizer_separator: /[\s/]+/
# Display the relative url in search results
# Supports true (default) or false
rel_url: true
# Enable or disable the search button that appears in the bottom right corner of every page
# Supports true or false (default)
button: false
# Heading anchor links appear on hover over h1-h6 tags in page content
# allowing users to deep link to a particular heading on a page.
#
# Supports true (default) or false
heading_anchors: true
# Footer content
# appears at the bottom of every page's main content
# Note: The footer_content option is deprecated and will be removed in a future major release. Please use `_includes/footer_custom.html` for more robust markup / liquid-based content.
footer_content: "Copyright © 2017-2024 Pin Fang"
# Footer last edited timestamp
last_edit_timestamp: true # show or hide edit time - page must have `last_modified_date` defined in the frontmatter
last_edit_time_format: "%b %e %Y at %I:%M %p" # uses ruby's time format: https://ruby-doc.org/stdlib-2.7.0/libdoc/time/rdoc/Time.html
# code
compress_html:
ignore:
envs: all
kramdown:
syntax_highlighter_opts:
block:
line_numbers: true

100
instructions/encrypt.md Normal file
View File

@ -0,0 +1,100 @@
Perfect — heres a tiny “factory” you can drop in.
### Cargo.toml
```toml
[dependencies]
chacha20poly1305 = { version = "0.10", features = ["xchacha20"] }
rand = "0.8"
sha2 = "0.10"
```
### `crypto_factory.rs`
```rust
use chacha20poly1305::{
aead::{Aead, KeyInit, OsRng},
XChaCha20Poly1305, Key, XNonce,
};
use rand::RngCore;
use sha2::{Digest, Sha256};
const VERSION: u8 = 1;
const NONCE_LEN: usize = 24;
const TAG_LEN: usize = 16;
#[derive(Debug)]
pub enum CryptoError {
Format, // wrong length / header
Version(u8), // unknown version
Decrypt, // wrong key or corrupted data
}
/// Super-simple factory: new(secret) + encrypt(bytes) + decrypt(bytes)
pub struct CryptoFactory {
key: Key<XChaCha20Poly1305>,
}
impl CryptoFactory {
/// Accepts any secret bytes; turns them into a 32-byte key (SHA-256).
/// (If your secret is already 32 bytes, this is still fine.)
pub fn new<S: AsRef<[u8]>>(secret: S) -> Self {
let mut h = Sha256::new();
h.update(b"xchacha20poly1305-factory:v1"); // domain separation
h.update(secret.as_ref());
let digest = h.finalize(); // 32 bytes
let key = Key::<XChaCha20Poly1305>::from_slice(&digest).to_owned();
Self { key }
}
/// Output layout: [version:1][nonce:24][ciphertext||tag]
pub fn encrypt(&self, plaintext: &[u8]) -> Vec<u8> {
let cipher = XChaCha20Poly1305::new(&self.key);
let mut nonce_bytes = [0u8; NONCE_LEN];
OsRng.fill_bytes(&mut nonce_bytes);
let nonce = XNonce::from_slice(&nonce_bytes);
let mut out = Vec::with_capacity(1 + NONCE_LEN + plaintext.len() + TAG_LEN);
out.push(VERSION);
out.extend_from_slice(&nonce_bytes);
let ct = cipher.encrypt(nonce, plaintext).expect("encrypt");
out.extend_from_slice(&ct);
out
}
pub fn decrypt(&self, blob: &[u8]) -> Result<Vec<u8>, CryptoError> {
if blob.len() < 1 + NONCE_LEN + TAG_LEN {
return Err(CryptoError::Format);
}
let ver = blob[0];
if ver != VERSION {
return Err(CryptoError::Version(ver));
}
let nonce = XNonce::from_slice(&blob[1..1 + NONCE_LEN]);
let ct = &blob[1 + NONCE_LEN..];
let cipher = XChaCha20Poly1305::new(&self.key);
cipher.decrypt(nonce, ct).map_err(|_| CryptoError::Decrypt)
}
}
```
### Tiny usage example
```rust
fn main() {
let f = CryptoFactory::new(b"super-secret-key-material");
let val = b"\x00\xFFbinary\x01\x02\x03";
let blob = f.encrypt(val);
let roundtrip = f.decrypt(&blob).unwrap();
assert_eq!(roundtrip, val);
}
```
Thats it: `new(secret)`, `encrypt(bytes)`, `decrypt(bytes)`.
You can stash the returned `blob` directly in your storage layer behind Redis.

View File

@ -0,0 +1,150 @@
]
# INFO
**What it does**
Returns server stats in a human-readable text block, optionally filtered by sections. Typical sections: `server`, `clients`, `memory`, `persistence`, `stats`, `replication`, `cpu`, `commandstats`, `latencystats`, `cluster`, `modules`, `keyspace`, `errorstats`. Special args: `all`, `default`, `everything`. The reply is a **Bulk String** with `# <Section>` headers and `key:value` lines. ([Redis][1])
**Syntax**
```
INFO [section [section ...]]
```
**Return (RESP2/RESP3)**: Bulk String. ([Redis][1])
**RESP request/response**
```
# Request: whole default set
*1\r\n$4\r\nINFO\r\n
# Request: a specific section, e.g., clients
*2\r\n$4\r\nINFO\r\n$7\r\nclients\r\n
# Response (prefix shown; body is long)
$1234\r\n# Server\r\nredis_version:7.4.0\r\n...\r\n# Clients\r\nconnected_clients:3\r\n...\r\n
```
(Reply type/format per RESP spec and the INFO page.) ([Redis][2])
---
# Connection “name” (there is **no** top-level `NAME` command)
Redis doesnt have a standalone `NAME` command. Connection names are handled via `CLIENT SETNAME` and retrieved via `CLIENT GETNAME`. ([Redis][3])
## CLIENT SETNAME
Assigns a human label to the current connection (shown in `CLIENT LIST`, logs, etc.). No spaces allowed in the name; empty string clears it. Length is limited by Redis string limits (practically huge). **Reply**: Simple String `OK`. ([Redis][4])
**Syntax**
```
CLIENT SETNAME connection-name
```
**RESP**
```
# Set the name "myapp"
*3\r\n$6\r\nCLIENT\r\n$7\r\nSETNAME\r\n$5\r\nmyapp\r\n
# Reply
+OK\r\n
```
## CLIENT GETNAME
Returns the current connections name or **Null Bulk String** if unset. ([Redis][5])
**Syntax**
```
CLIENT GETNAME
```
**RESP**
```
# Before SETNAME:
*2\r\n$6\r\nCLIENT\r\n$7\r\nGETNAME\r\n
$-1\r\n # nil (no name)
# After SETNAME myapp:
*2\r\n$6\r\nCLIENT\r\n$7\r\nGETNAME\r\n
$5\r\nmyapp\r\n
```
(Null/Bulk String encoding per RESP spec.) ([Redis][2])
---
# CLIENT (container command + key subcommands)
`CLIENT` is a **container**; use subcommands like `CLIENT LIST`, `CLIENT INFO`, `CLIENT ID`, `CLIENT KILL`, `CLIENT TRACKING`, etc. Call `CLIENT HELP` to enumerate them. ([Redis][3])
## CLIENT LIST
Shows all connections as a single **Bulk String**: one line per client with `field=value` pairs (includes `id`, `addr`, `name`, `db`, `user`, `resp`, and more). Filters: `TYPE` and `ID`. **Return**: Bulk String (RESP2/RESP3). ([Redis][6])
**Syntax**
```
CLIENT LIST [TYPE <NORMAL|MASTER|REPLICA|PUBSUB>] [ID client-id ...]
```
**RESP**
```
*2\r\n$6\r\nCLIENT\r\n$4\r\nLIST\r\n
# Reply (single Bulk String; example with one line shown)
$188\r\nid=7 addr=127.0.0.1:60840 laddr=127.0.0.1:6379 fd=8 name=myapp age=12 idle=3 flags=N db=0 ...\r\n
```
## CLIENT INFO
Returns info for **this** connection only (same format/fields as a single line of `CLIENT LIST`). **Return**: Bulk String. Available since 6.2.0. ([Redis][7])
**Syntax**
```
CLIENT INFO
```
**RESP**
```
*2\r\n$6\r\nCLIENT\r\n$4\r\nINFO\r\n
$160\r\nid=7 addr=127.0.0.1:60840 laddr=127.0.0.1:6379 fd=8 name=myapp db=0 user=default resp=2 ...\r\n
```
---
# RESP notes youll need for your parser
* **Requests** are Arrays: `*N\r\n` followed by `N` Bulk Strings for verb/args.
* **Common replies here**: Simple String (`+OK\r\n`), Bulk String (`$<len>\r\n...\r\n`), and **Null Bulk String** (`$-1\r\n`). (These cover `INFO`, `CLIENT LIST/INFO`, `CLIENT GETNAME`, `CLIENT SETNAME`.) ([Redis][2])
---
## Sources (checked)
* INFO command (syntax, sections, behavior). ([Redis][1])
* RESP spec (request/response framing, Bulk/Null Bulk Strings). ([Redis][2])
* CLIENT container + subcommands index. ([Redis][3])
* CLIENT LIST (fields, bulk-string return, filters). ([Redis][6])
* CLIENT INFO (exists since 6.2, reply format). ([Redis][7])
* CLIENT SETNAME (no spaces; clears with empty string; huge length OK). ([Redis][4])
* CLIENT GETNAME (nil if unset). ([Redis][5])
If you want, I can fold this into a tiny Rust “command + RESP” test harness that exercises `INFO`, `CLIENT SETNAME/GETNAME`, `CLIENT LIST`, and `CLIENT INFO` against your in-mem RESP parser.
[1]: https://redis.io/docs/latest/commands/info/ "INFO | Docs"
[2]: https://redis.io/docs/latest/develop/reference/protocol-spec/?utm_source=chatgpt.com "Redis serialization protocol specification | Docs"
[3]: https://redis.io/docs/latest/commands/client/ "CLIENT | Docs"
[4]: https://redis.io/docs/latest/commands/client-setname/?utm_source=chatgpt.com "CLIENT SETNAME | Docs"
[5]: https://redis.io/docs/latest/commands/client-getname/?utm_source=chatgpt.com "CLIENT GETNAME | Docs"
[6]: https://redis.io/docs/latest/commands/client-list/ "CLIENT LIST | Docs"
[7]: https://redis.io/docs/latest/commands/client-info/?utm_source=chatgpt.com "CLIENT INFO | Docs"

View File

@ -248,3 +248,4 @@ Protocol and reply structure same as SCAN.
| `SSCAN` | Iterate over set members | `[cursor, array]` |
| `ZSCAN` | Iterate over sorted set | `[cursor, array]` |
##

259
instructions/redis_lists.md Normal file
View File

@ -0,0 +1,259 @@
# 1) Data model & basics
* A **queue** is a List at key `queue:<name>`.
* Common patterns:
* **Producer**: `LPUSH queue item` (or `RPUSH`)
* **Consumer (non-blocking)**: `RPOP queue` (or `LPOP`)
* **Consumer (blocking)**: `BRPOP queue timeout` (or `BLPOP`)
* If a key doesnt exist, its treated as an **empty list**; push **creates** the list; when the **last element is popped, the key is deleted**. ([Redis][1])
---
# 2) Commands to implement (queues via Lists)
## LPUSH / RPUSH
Prepend/append one or more elements. Create the list if it doesnt exist.
**Return**: Integer = new length of the list.
**Syntax**
```
LPUSH key element [element ...]
RPUSH key element [element ...]
```
**RESP (example)**
```
*3\r\n$5\r\nLPUSH\r\n$5\r\nqueue\r\n$5\r\njob-1\r\n
:1\r\n
```
Refs: semantics & multi-arg ordering. ([Redis][1])
### LPUSHX / RPUSHX (optional but useful)
Like LPUSH/RPUSH, **but only if the list exists**.
**Return**: Integer = new length (0 if key didnt exist).
```
LPUSHX key element [element ...]
RPUSHX key element [element ...]
```
Refs: command index. ([Redis][2])
---
## LPOP / RPOP
Remove & return one (default) or **up to COUNT** elements since Redis 6.2.
If the list is empty or missing, **Null** is returned (Null Bulk or Null Array if COUNT>1).
**Return**:
* No COUNT: Bulk String or Null Bulk.
* With COUNT: Array of Bulk Strings (possibly empty) or Null Array if key missing.
**Syntax**
```
LPOP key [count]
RPOP key [count]
```
**RESP (no COUNT)**
```
*2\r\n$4\r\nRPOP\r\n$5\r\nqueue\r\n
$5\r\njob-1\r\n # or $-1\r\n if empty
```
**RESP (COUNT=2)**
```
*3\r\n$4\r\nLPOP\r\n$5\r\nqueue\r\n$1\r\n2\r\n
*2\r\n$5\r\njob-2\r\n$5\r\njob-3\r\n # or *-1\r\n if key missing
```
Refs: LPOP w/ COUNT; general pop semantics. ([Redis][3])
---
## BLPOP / BRPOP (blocking consumers)
Block until an element is available in any of the given lists or until `timeout` (seconds, **double**, `0` = forever).
**Return** on success: **Array \[key, element]**.
**Return** on timeout: **Null Array**.
**Syntax**
```
BLPOP key [key ...] timeout
BRPOP key [key ...] timeout
```
**RESP**
```
*3\r\n$5\r\nBRPOP\r\n$5\r\nqueue\r\n$1\r\n0\r\n # block forever
# Success reply
*2\r\n$5\r\nqueue\r\n$5\r\njob-4\r\n
# Timeout reply
*-1\r\n
```
**Implementation notes**
* If any listed key is non-empty at call time, reply **immediately** from the first non-empty key **by the commands key order**.
* Otherwise, put the client into a **blocked state** (register per-key waiters). On any `LPUSH/RPUSH` to those keys, **wake the earliest waiter** and serve it atomically.
* If timeout expires, return **Null Array** and clear the blocked state.
Refs: timeout semantics and return shape. ([Redis][4])
---
## LMOVE / BLMOVE (atomic move; replaces RPOPLPUSH/BRPOPLPUSH)
Atomically **pop from one side** of `source` and **push to one side** of `destination`.
* Use for **reliable queues** (move to a *processing* list).
* `BLMOVE` blocks like `BLPOP` when `source` is empty.
**Syntax**
```
LMOVE source destination LEFT|RIGHT LEFT|RIGHT
BLMOVE source destination LEFT|RIGHT LEFT|RIGHT timeout
```
**Return**: Bulk String element moved, or Null if `source` empty (LMOVE); `BLMOVE` blocks/Null on timeout.
**RESP (LMOVE RIGHT->LEFT)**
```
*5\r\n$5\r\nLMOVE\r\n$6\r\nsource\r\n$3\r\ndst\r\n$5\r\nRIGHT\r\n$4\r\nLEFT\r\n
$5\r\njob-5\r\n
```
**Notes**
* Prefer `LMOVE/BLMOVE` over deprecated `RPOPLPUSH/BRPOPLPUSH`.
* Pattern: consumer `LMOVE queue processing RIGHT LEFT` → work → `LREM processing 1 <elem>` to ACK; a reaper can requeue stale items.
Refs: LMOVE/BLMOVE behavior and reliable-queue pattern; deprecation of RPOPLPUSH. ([Redis][5])
*(Compat: you can still implement `RPOPLPUSH source dest` and `BRPOPLPUSH source dest timeout`, but mark them deprecated and map to LMOVE/BLMOVE.)* ([Redis][6])
---
## LLEN (length)
Useful for metrics/backpressure.
```
LLEN key
```
**RESP**
```
*2\r\n$4\r\nLLEN\r\n$5\r\nqueue\r\n
:3\r\n
```
Refs: list overview mentioning LLEN. ([Redis][7])
---
## LREM (ack for “reliable” processing)
Remove occurrences of `element` from the list (head→tail scan).
Use `count=1` to ACK a single processed item from `processing`.
```
LREM key count element
```
**RESP**
```
*4\r\n$4\r\nLREM\r\n$9\r\nprocessing\r\n$1\r\n1\r\n$5\r\njob-5\r\n
:1\r\n
```
Refs: reliable pattern mentions LREM to ACK. ([Redis][5])
---
## LTRIM (bounded queues / retention)
Keep only `[start, stop]` range; everything else is dropped.
Use to cap queue length after pushes.
```
LTRIM key start stop
```
**RESP**
```
*4\r\n$5\r\nLTRIM\r\n$5\r\nqueue\r\n$2\r\n0\r\n$3\r\n999\r\n
+OK\r\n
```
Refs: list overview includes LTRIM for retention. ([Redis][7])
---
## LRANGE / LINDEX (debugging / peeking)
* `LRANGE key start stop` → Array of elements (non-destructive).
* `LINDEX key index` → one element or Null.
These arent required for queue semantics, but handy. ([Redis][7])
---
# 3) Errors & types
* Wrong type: `-WRONGTYPE Operation against a key holding the wrong kind of value\r\n`
* Non-existing key:
* Push: creates the list (returns new length).
* Pop (non-blocking): returns **Null**.
* Blocking pop: **Null Array** on timeout. ([Redis][1])
---
# 4) Blocking engine (implementation sketch)
1. **Call time**: scan keys in user order. If a non-empty list is found, pop & reply immediately.
2. **Otherwise**: register the client as **blocked** on those keys with `deadline = now + timeout` (or infinite).
3. **On push to any key**: if waiters exist, **wake one** (FIFO) and serve its pop **atomically** with the push result.
4. **On timer**: for each blocked client whose deadline passed, reply `Null Array` and clear state.
5. **Connection close**: remove from any wait queues.
Refs for timeout/block semantics. ([Redis][4])
---
# 5) Reliable queue pattern (recommended)
* **Consume**: `LMOVE queue processing RIGHT LEFT` (or `BLMOVE ... 0`).
* **Process** the job.
* **ACK**: `LREM processing 1 <job>` when done.
* **Reaper**: auxiliary task that detects stale jobs (e.g., track job IDs + timestamps in a ZSET) and requeues them. (Lists dont include timestamps; pairing with a ZSET is standard practice.)
Refs: LMOVE docs pattern. ([Redis][5])
---
# 6) Minimal test matrix
* Push/pop happy path (both ends), with/without COUNT.
* Blocking pop: immediate availability, block + timeout, wake on push, multiple keys order, FIFO across multiple waiters.
* LMOVE/BLMOVE: RIGHT→LEFT pipeline, block + wake, cross-list atomicity, ACK via LREM.
* Type errors and key deletion on last pop.

25
run_tests.sh Executable file
View File

@ -0,0 +1,25 @@
#!/bin/bash
echo "🧪 Running HeroDB Redis Compatibility Tests"
echo "=========================================="
echo ""
echo "1⃣ Running Simple Redis Tests (4 tests)..."
echo "----------------------------------------------"
cargo test --test simple_redis_test -- --nocapture
echo ""
echo "2⃣ Running Comprehensive Redis Integration Tests (13 tests)..."
echo "----------------------------------------------------------------"
cargo test --test redis_integration_tests -- --nocapture
cargo test --test redis_basic_client -- --nocapture
cargo test --test debug_hset -- --nocapture
cargo test --test debug_hset_simple -- --nocapture
echo ""
echo "3⃣ Running All Tests..."
echo "------------------------"
cargo test -- --nocapture
echo ""
echo "✅ Test execution completed!"

View File

@ -28,14 +28,31 @@ pub enum Cmd {
HLen(String),
HMGet(String, Vec<String>),
HSetNx(String, String, String),
HScan(String, u64, Option<String>, Option<u64>), // key, cursor, pattern, count
Scan(u64, Option<String>, Option<u64>), // cursor, pattern, count
Unknow,
Ttl(String),
Exists(String),
Quit,
Client(Vec<String>),
ClientSetName(String),
ClientGetName,
// List commands
LPush(String, Vec<String>),
RPush(String, Vec<String>),
LPop(String, Option<u64>),
RPop(String, Option<u64>),
LLen(String),
LRem(String, i64, String),
LTrim(String, i64, i64),
LIndex(String, i64),
LRange(String, i64, i64),
Unknow(String),
}
impl Cmd {
pub fn from(s: &str) -> Result<(Self, Protocol), DBError> {
let protocol = Protocol::from(s)?;
match protocol.clone().0 {
pub fn from(s: &str) -> Result<(Self, Protocol, &str), DBError> {
let (protocol, remaining) = Protocol::from(s)?;
match protocol.clone() {
Protocol::Array(p) => {
let cmd = p.into_iter().map(|x| x.decode()).collect::<Vec<_>>();
if cmd.is_empty() {
@ -57,6 +74,12 @@ impl Cmd {
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
}
}
"setex" => {
if cmd.len() != 4 {
return Err(DBError(format!("wrong number of arguments for SETEX command")));
}
Cmd::SetEx(cmd[1].clone(), cmd[3].clone(), cmd[2].parse().unwrap())
}
"config" => {
if cmd.len() != 3 || cmd[1].to_lowercase() != "get" {
return Err(DBError(format!("unsupported cmd {:?}", cmd)));
@ -117,7 +140,7 @@ impl Cmd {
}
let mut pairs = Vec::new();
let mut i = 2;
while i < cmd.len() - 1 {
while i + 1 < cmd.len() {
pairs.push((cmd[i].clone(), cmd[i + 1].clone()));
i += 2;
}
@ -177,6 +200,44 @@ impl Cmd {
}
Cmd::HSetNx(cmd[1].clone(), cmd[2].clone(), cmd[3].clone())
}
"hscan" => {
if cmd.len() < 3 {
return Err(DBError(format!("wrong number of arguments for HSCAN command")));
}
let key = cmd[1].clone();
let cursor = cmd[2].parse::<u64>().map_err(|_|
DBError("ERR invalid cursor".to_string()))?;
let mut pattern = None;
let mut count = None;
let mut i = 3;
while i < cmd.len() {
match cmd[i].to_lowercase().as_str() {
"match" => {
if i + 1 >= cmd.len() {
return Err(DBError("ERR syntax error".to_string()));
}
pattern = Some(cmd[i + 1].clone());
i += 2;
}
"count" => {
if i + 1 >= cmd.len() {
return Err(DBError("ERR syntax error".to_string()));
}
count = Some(cmd[i + 1].parse::<u64>().map_err(|_|
DBError("ERR value is not an integer or out of range".to_string()))?);
i += 2;
}
_ => {
return Err(DBError(format!("ERR syntax error")));
}
}
}
Cmd::HScan(key, cursor, pattern, count)
}
"scan" => {
if cmd.len() < 2 {
return Err(DBError(format!("wrong number of arguments for SCAN command")));
@ -214,21 +275,133 @@ impl Cmd {
Cmd::Scan(cursor, pattern, count)
}
_ => Cmd::Unknow,
"ttl" => {
if cmd.len() != 2 {
return Err(DBError(format!("wrong number of arguments for TTL command")));
}
Cmd::Ttl(cmd[1].clone())
}
"exists" => {
if cmd.len() != 2 {
return Err(DBError(format!("wrong number of arguments for EXISTS command")));
}
Cmd::Exists(cmd[1].clone())
}
"quit" => {
if cmd.len() != 1 {
return Err(DBError(format!("wrong number of arguments for QUIT command")));
}
Cmd::Quit
}
"client" => {
if cmd.len() > 1 {
match cmd[1].to_lowercase().as_str() {
"setname" => {
if cmd.len() == 3 {
Cmd::ClientSetName(cmd[2].clone())
} else {
return Err(DBError("wrong number of arguments for 'client setname' command".to_string()));
}
}
"getname" => {
if cmd.len() == 2 {
Cmd::ClientGetName
} else {
return Err(DBError("wrong number of arguments for 'client getname' command".to_string()));
}
}
_ => Cmd::Client(cmd[1..].to_vec()),
}
} else {
Cmd::Client(vec![])
}
}
"lpush" => {
if cmd.len() < 3 {
return Err(DBError(format!("wrong number of arguments for LPUSH command")));
}
Cmd::LPush(cmd[1].clone(), cmd[2..].to_vec())
}
"rpush" => {
if cmd.len() < 3 {
return Err(DBError(format!("wrong number of arguments for RPUSH command")));
}
Cmd::RPush(cmd[1].clone(), cmd[2..].to_vec())
}
"lpop" => {
if cmd.len() < 2 || cmd.len() > 3 {
return Err(DBError(format!("wrong number of arguments for LPOP command")));
}
let count = if cmd.len() == 3 {
Some(cmd[2].parse::<u64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?)
} else {
None
};
Cmd::LPop(cmd[1].clone(), count)
}
"rpop" => {
if cmd.len() < 2 || cmd.len() > 3 {
return Err(DBError(format!("wrong number of arguments for RPOP command")));
}
let count = if cmd.len() == 3 {
Some(cmd[2].parse::<u64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?)
} else {
None
};
Cmd::RPop(cmd[1].clone(), count)
}
"llen" => {
if cmd.len() != 2 {
return Err(DBError(format!("wrong number of arguments for LLEN command")));
}
Cmd::LLen(cmd[1].clone())
}
"lrem" => {
if cmd.len() != 4 {
return Err(DBError(format!("wrong number of arguments for LREM command")));
}
let count = cmd[2].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
Cmd::LRem(cmd[1].clone(), count, cmd[3].clone())
}
"ltrim" => {
if cmd.len() != 4 {
return Err(DBError(format!("wrong number of arguments for LTRIM command")));
}
let start = cmd[2].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
let stop = cmd[3].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
Cmd::LTrim(cmd[1].clone(), start, stop)
}
"lindex" => {
if cmd.len() != 3 {
return Err(DBError(format!("wrong number of arguments for LINDEX command")));
}
let index = cmd[2].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
Cmd::LIndex(cmd[1].clone(), index)
}
"lrange" => {
if cmd.len() != 4 {
return Err(DBError(format!("wrong number of arguments for LRANGE command")));
}
let start = cmd[2].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
let stop = cmd[3].parse::<i64>().map_err(|_| DBError("ERR value is not an integer or out of range".to_string()))?;
Cmd::LRange(cmd[1].clone(), start, stop)
}
_ => Cmd::Unknow(cmd[0].clone()),
},
protocol.0,
protocol,
remaining
))
}
_ => Err(DBError(format!(
"fail to parse as cmd for {:?}",
protocol.0
protocol
))),
}
}
pub async fn run(
&self,
server: &Server,
server: &mut Server,
protocol: Protocol,
queued_cmd: &mut Option<Vec<(Cmd, Protocol)>>,
) -> Result<Protocol, DBError> {
@ -282,15 +455,125 @@ impl Cmd {
Cmd::HLen(key) => hlen_cmd(server, key).await,
Cmd::HMGet(key, fields) => hmget_cmd(server, key, fields).await,
Cmd::HSetNx(key, field, value) => hsetnx_cmd(server, key, field, value).await,
Cmd::HScan(key, cursor, pattern, count) => hscan_cmd(server, key, cursor, pattern.as_deref(), count).await,
Cmd::Scan(cursor, pattern, count) => scan_cmd(server, cursor, pattern.as_deref(), count).await,
Cmd::Unknow => Ok(Protocol::err("unknown cmd")),
Cmd::Ttl(key) => ttl_cmd(server, key).await,
Cmd::Exists(key) => exists_cmd(server, key).await,
Cmd::Quit => Ok(Protocol::SimpleString("OK".to_string())),
Cmd::Client(_) => Ok(Protocol::SimpleString("OK".to_string())),
Cmd::ClientSetName(name) => client_setname_cmd(server, name).await,
Cmd::ClientGetName => client_getname_cmd(server).await,
// List commands
Cmd::LPush(key, elements) => lpush_cmd(server, key, elements).await,
Cmd::RPush(key, elements) => rpush_cmd(server, key, elements).await,
Cmd::LPop(key, count) => lpop_cmd(server, key, count).await,
Cmd::RPop(key, count) => rpop_cmd(server, key, count).await,
Cmd::LLen(key) => llen_cmd(server, key).await,
Cmd::LRem(key, count, element) => lrem_cmd(server, key, *count, element).await,
Cmd::LTrim(key, start, stop) => ltrim_cmd(server, key, *start, *stop).await,
Cmd::LIndex(key, index) => lindex_cmd(server, key, *index).await,
Cmd::LRange(key, start, stop) => lrange_cmd(server, key, *start, *stop).await,
Cmd::Unknow(s) => {
println!("\x1b[31;1munknown command: {}\x1b[0m", s);
Ok(Protocol::err(&format!("ERR unknown command '{}'", s)))
}
}
}
}
async fn lindex_cmd(server: &Server, key: &str, index: i64) -> Result<Protocol, DBError> {
match server.storage.lindex(key, index) {
Ok(Some(element)) => Ok(Protocol::BulkString(element)),
Ok(None) => Ok(Protocol::Null),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn lrange_cmd(server: &Server, key: &str, start: i64, stop: i64) -> Result<Protocol, DBError> {
match server.storage.lrange(key, start, stop) {
Ok(elements) => Ok(Protocol::Array(elements.into_iter().map(Protocol::BulkString).collect())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn ltrim_cmd(server: &Server, key: &str, start: i64, stop: i64) -> Result<Protocol, DBError> {
match server.storage.ltrim(key, start, stop) {
Ok(_) => Ok(Protocol::SimpleString("OK".to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn lrem_cmd(server: &Server, key: &str, count: i64, element: &str) -> Result<Protocol, DBError> {
match server.storage.lrem(key, count, element) {
Ok(removed_count) => Ok(Protocol::SimpleString(removed_count.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn llen_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
match server.storage.llen(key) {
Ok(len) => Ok(Protocol::SimpleString(len.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn lpop_cmd(server: &Server, key: &str, count: &Option<u64>) -> Result<Protocol, DBError> {
match server.storage.lpop(key, *count) {
Ok(Some(elements)) => {
if count.is_some() {
Ok(Protocol::Array(elements.into_iter().map(Protocol::BulkString).collect()))
} else {
Ok(Protocol::BulkString(elements[0].clone()))
}
},
Ok(None) => {
if count.is_some() {
Ok(Protocol::Array(vec![]))
} else {
Ok(Protocol::Null)
}
},
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn rpop_cmd(server: &Server, key: &str, count: &Option<u64>) -> Result<Protocol, DBError> {
match server.storage.rpop(key, *count) {
Ok(Some(elements)) => {
if count.is_some() {
Ok(Protocol::Array(elements.into_iter().map(Protocol::BulkString).collect()))
} else {
Ok(Protocol::BulkString(elements[0].clone()))
}
},
Ok(None) => {
if count.is_some() {
Ok(Protocol::Array(vec![]))
} else {
Ok(Protocol::Null)
}
},
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn lpush_cmd(server: &Server, key: &str, elements: &[String]) -> Result<Protocol, DBError> {
match server.storage.lpush(key, elements.to_vec()) {
Ok(len) => Ok(Protocol::SimpleString(len.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn rpush_cmd(server: &Server, key: &str, elements: &[String]) -> Result<Protocol, DBError> {
match server.storage.rpush(key, elements.to_vec()) {
Ok(len) => Ok(Protocol::SimpleString(len.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn exec_cmd(
queued_cmd: &mut Option<Vec<(Cmd, Protocol)>>,
server: &Server,
server: &mut Server,
) -> Result<Protocol, DBError> {
if queued_cmd.is_some() {
let mut vec = Vec::new();
@ -332,7 +615,11 @@ fn config_get_cmd(name: &String, server: &Server) -> Result<Protocol, DBError> {
Protocol::BulkString(name.clone()),
Protocol::BulkString("herodb.redb".to_string()),
])),
_ => Err(DBError(format!("unsupported config {:?}", name))),
"databases" => Ok(Protocol::Array(vec![
Protocol::BulkString(name.clone()),
Protocol::BulkString("16".to_string()),
])),
_ => Ok(Protocol::Array(vec![])), // Return empty array for unknown configs instead of error
}
}
@ -497,3 +784,43 @@ async fn scan_cmd(server: &Server, cursor: &u64, pattern: Option<&str>, count: &
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn hscan_cmd(server: &Server, key: &str, cursor: &u64, pattern: Option<&str>, count: &Option<u64>) -> Result<Protocol, DBError> {
match server.storage.hscan(key, *cursor, pattern, *count) {
Ok((next_cursor, fields)) => {
let mut result = Vec::new();
result.push(Protocol::BulkString(next_cursor.to_string()));
result.push(Protocol::Array(
fields.into_iter().map(Protocol::BulkString).collect(),
));
Ok(Protocol::Array(result))
}
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn ttl_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
match server.storage.ttl(key) {
Ok(ttl) => Ok(Protocol::SimpleString(ttl.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn exists_cmd(server: &Server, key: &str) -> Result<Protocol, DBError> {
match server.storage.exists(key) {
Ok(exists) => Ok(Protocol::SimpleString(if exists { "1" } else { "0" }.to_string())),
Err(e) => Ok(Protocol::err(&e.0)),
}
}
async fn client_setname_cmd(server: &mut Server, name: &str) -> Result<Protocol, DBError> {
server.client_name = Some(name.to_string());
Ok(Protocol::SimpleString("OK".to_string()))
}
async fn client_getname_cmd(server: &Server) -> Result<Protocol, DBError> {
match &server.client_name {
Some(name) => Ok(Protocol::BulkString(name.clone())),
None => Ok(Protocol::Null),
}
}

View File

@ -4,7 +4,6 @@ use tokio::sync::mpsc;
use redb;
use bincode;
use crate::protocol::Protocol;
// todo: more error types
#[derive(Debug)]

View File

@ -1,6 +1,6 @@
mod cmd;
pub mod cmd;
pub mod error;
pub mod options;
mod protocol;
pub mod protocol;
pub mod server;
mod storage;
pub mod storage;

View File

@ -18,6 +18,10 @@ struct Args {
/// The port of the Redis server, default is 6379 if not specified
#[arg(long)]
port: Option<u16>,
/// Enable debug mode
#[arg(long)]
debug: bool,
}
#[tokio::main]
@ -36,11 +40,15 @@ async fn main() {
let option = redis_rs::options::DBOption {
dir: args.dir,
port,
debug: args.debug,
};
// new server
let server = server::Server::new(option).await;
// Add a small delay to ensure the port is ready
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// accept new connections
loop {
let stream = listener.accept().await;

View File

@ -2,4 +2,5 @@
pub struct DBOption {
pub dir: String,
pub port: u16,
pub debug: bool,
}

View File

@ -17,7 +17,7 @@ impl fmt::Display for Protocol {
}
impl Protocol {
pub fn from(protocol: &str) -> Result<(Self, usize), DBError> {
pub fn from(protocol: &str) -> Result<(Self, &str), DBError> {
let ret = match protocol.chars().nth(0) {
Some('+') => Self::parse_simple_string_sfx(&protocol[1..]),
Some('$') => Self::parse_bulk_string_sfx(&protocol[1..]),
@ -27,10 +27,7 @@ impl Protocol {
protocol
))),
};
match ret {
Ok((p, s)) => Ok((p, s + 1)),
Err(e) => Err(e),
}
ret
}
pub fn from_vec(array: Vec<&str>) -> Self {
@ -91,9 +88,9 @@ impl Protocol {
}
}
fn parse_simple_string_sfx(protocol: &str) -> Result<(Self, usize), DBError> {
fn parse_simple_string_sfx(protocol: &str) -> Result<(Self, &str), DBError> {
match protocol.find("\r\n") {
Some(x) => Ok((Self::SimpleString(protocol[..x].to_string()), x + 2)),
Some(x) => Ok((Self::SimpleString(protocol[..x].to_string()), &protocol[x + 2..])),
_ => Err(DBError(format!(
"[new simple string] unsupported protocol: {:?}",
protocol
@ -101,27 +98,20 @@ impl Protocol {
}
}
fn parse_bulk_string_sfx(protocol: &str) -> Result<(Self, usize), DBError> {
if let Some(len) = protocol.find("\r\n") {
let size = Self::parse_usize(&protocol[..len])?;
if let Some(data_len) = protocol[len + 2..].find("\r\n") {
let s = Self::parse_string(&protocol[len + 2..len + 2 + data_len])?;
if size != s.len() {
Err(DBError(format!(
"[new bulk string] unmatched string length in prototocl {:?}",
protocol,
)))
} else {
Ok((
Protocol::BulkString(s.to_lowercase()),
len + 2 + data_len + 2,
))
}
} else {
Err(DBError(format!(
"[new bulk string] unsupported protocol: {:?}",
protocol
fn parse_bulk_string_sfx(protocol: &str) -> Result<(Self, &str), DBError> {
if let Some(len_end) = protocol.find("\r\n") {
let size = Self::parse_usize(&protocol[..len_end])?;
let data_start = len_end + 2;
let data_end = data_start + size;
let s = Self::parse_string(&protocol[data_start..data_end])?;
if protocol.len() < data_end + 2 || &protocol[data_end..data_end+2] != "\r\n" {
Err(DBError(format!(
"[new bulk string] unmatched string length in prototocl {:?}",
protocol,
)))
} else {
Ok((Protocol::BulkString(s), &protocol[data_end + 2..]))
}
} else {
Err(DBError(format!(
@ -131,46 +121,41 @@ impl Protocol {
}
}
fn parse_array_sfx(s: &str) -> Result<(Self, usize), DBError> {
let mut offset = 0;
match s.find("\r\n") {
Some(x) => {
let array_len = s[..x].parse::<usize>()?;
offset += x + 2;
let mut vec = vec![];
for _ in 0..array_len {
match Protocol::from(&s[offset..]) {
Ok((p, len)) => {
offset += len;
vec.push(p);
}
Err(e) => {
return Err(e);
}
}
}
Ok((Protocol::Array(vec), offset))
fn parse_array_sfx(s: &str) -> Result<(Self, &str), DBError> {
if let Some(len_end) = s.find("\r\n") {
let array_len = s[..len_end].parse::<usize>()?;
let mut remaining = &s[len_end + 2..];
let mut vec = vec![];
for _ in 0..array_len {
let (p, rem) = Protocol::from(remaining)?;
vec.push(p);
remaining = rem;
}
_ => Err(DBError(format!(
Ok((Protocol::Array(vec), remaining))
} else {
Err(DBError(format!(
"[new array] unsupported protocol: {:?}",
s
))),
)))
}
}
fn parse_usize(protocol: &str) -> Result<usize, DBError> {
match protocol.len() {
0 => Err(DBError(format!("parse usize error: {:?}", protocol))),
_ => Ok(protocol
if protocol.is_empty() {
Err(DBError("Cannot parse usize from empty string".to_string()))
} else {
protocol
.parse::<usize>()
.map_err(|_| DBError(format!("parse usize error: {}", protocol)))?),
.map_err(|_| DBError(format!("Failed to parse usize from: {}", protocol)))
}
}
fn parse_string(protocol: &str) -> Result<String, DBError> {
match protocol.len() {
0 => Err(DBError(format!("parse usize error: {:?}", protocol))),
_ => Ok(protocol.to_string()),
if protocol.is_empty() {
// Allow empty strings, but handle appropriately
Ok("".to_string())
} else {
Ok(protocol.to_string())
}
}
}

View File

@ -14,6 +14,7 @@ use crate::storage::Storage;
pub struct Server {
pub storage: Arc<Storage>,
pub option: options::DBOption,
pub client_name: Option<String>,
}
impl Server {
@ -28,6 +29,7 @@ impl Server {
Server {
storage: Arc::new(storage),
option,
client_name: None,
}
}
@ -39,28 +41,58 @@ impl Server {
let mut queued_cmd: Option<Vec<(Cmd, Protocol)>> = None;
loop {
if let Ok(len) = stream.read(&mut buf).await {
if len == 0 {
let len = match stream.read(&mut buf).await {
Ok(0) => {
println!("[handle] connection closed");
return Ok(());
}
let s = str::from_utf8(&buf[..len])?;
let (cmd, protocol) =
Cmd::from(s).unwrap_or((Cmd::Unknow, Protocol::err("unknow cmd")));
println!("got command: {:?}, protocol: {:?}", cmd, protocol);
Ok(len) => len,
Err(e) => {
println!("[handle] read error: {:?}", e);
return Err(e.into());
}
};
let mut s = str::from_utf8(&buf[..len])?;
while !s.is_empty() {
let (cmd, protocol, remaining) = match Cmd::from(s) {
Ok((cmd, protocol, remaining)) => (cmd, protocol, remaining),
Err(e) => {
println!("\x1b[31;1mprotocol error: {:?}\x1b[0m", e);
(Cmd::Unknow("protocol_error".to_string()), Protocol::err(&format!("protocol error: {}", e.0)), "")
}
};
s = remaining;
if self.option.debug {
println!("\x1b[34;1mgot command: {:?}, protocol: {:?}\x1b[0m", cmd, protocol);
} else {
println!("got command: {:?}, protocol: {:?}", cmd, protocol);
}
// Check if this is a QUIT command before processing
let is_quit = matches!(cmd, Cmd::Quit);
let res = cmd
.run(self, protocol, &mut queued_cmd)
.run(&mut self.clone(), protocol.clone(), &mut queued_cmd)
.await
.unwrap_or(Protocol::err("unknow cmd"));
print!("queued cmd {:?}", queued_cmd);
.unwrap_or(Protocol::err("unknown cmd from server"));
if self.option.debug {
println!("\x1b[34;1mqueued cmd {:?}\x1b[0m", queued_cmd);
println!("\x1b[32;1mgoing to send response {}\x1b[0m", res.encode());
} else {
print!("queued cmd {:?}", queued_cmd);
println!("going to send response {}", res.encode());
}
println!("going to send response {}", res.encode());
_ = stream.write(res.encode().as_bytes()).await?;
} else {
println!("[handle] going to break");
break;
// If this was a QUIT command, close the connection
if is_quit {
println!("[handle] QUIT command received, closing connection");
return Ok(());
}
}
}
Ok(())

View File

@ -3,7 +3,7 @@ use std::{
time::{SystemTime, UNIX_EPOCH},
};
use redb::{Database, Error, ReadableTable, Table, TableDefinition, WriteTransaction, ReadTransaction};
use redb::{Database, ReadableTable, TableDefinition};
use serde::{Deserialize, Serialize};
use crate::error::DBError;
@ -12,6 +12,7 @@ use crate::error::DBError;
const TYPES_TABLE: TableDefinition<&str, &str> = TableDefinition::new("types");
const STRINGS_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("strings");
const HASHES_TABLE: TableDefinition<(&str, &str), &str> = TableDefinition::new("hashes");
const LISTS_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("lists");
const STREAMS_META_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("streams_meta");
const STREAMS_DATA_TABLE: TableDefinition<(&str, &str), &[u8]> = TableDefinition::new("streams_data");
@ -26,6 +27,12 @@ pub struct StreamEntry {
pub fields: Vec<(String, String)>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ListValue {
pub elements: Vec<String>,
}
#[inline]
pub fn now_in_millis() -> u128 {
let start = SystemTime::now();
@ -47,6 +54,7 @@ impl Storage {
let _ = write_txn.open_table(TYPES_TABLE)?;
let _ = write_txn.open_table(STRINGS_TABLE)?;
let _ = write_txn.open_table(HASHES_TABLE)?;
let _ = write_txn.open_table(LISTS_TABLE)?;
let _ = write_txn.open_table(STREAMS_META_TABLE)?;
let _ = write_txn.open_table(STREAMS_DATA_TABLE)?;
}
@ -143,6 +151,7 @@ impl Storage {
let mut types_table = write_txn.open_table(TYPES_TABLE)?;
let mut strings_table = write_txn.open_table(STRINGS_TABLE)?;
let mut hashes_table = write_txn.open_table(HASHES_TABLE)?;
let mut lists_table = write_txn.open_table(LISTS_TABLE)?;
// Remove from type table
types_table.remove(key.as_str())?;
@ -165,6 +174,9 @@ impl Storage {
for (hash_key, field) in to_remove {
hashes_table.remove((hash_key.as_str(), field.as_str()))?;
}
// Remove from lists table
lists_table.remove(key.as_str())?;
}
write_txn.commit()?;
@ -493,7 +505,6 @@ impl Storage {
// Stop if we've returned enough keys
if returned_keys >= count {
current_cursor += 1;
break;
}
}
@ -502,8 +513,546 @@ impl Storage {
}
// If we've reached the end of iteration, return cursor 0 to indicate completion
let next_cursor = if returned_keys < count { 0 } else { current_cursor };
let next_cursor = if iter.next().is_none() { 0 } else { current_cursor };
Ok((next_cursor, keys))
}
}
pub fn hscan(&self, key: &str, cursor: u64, pattern: Option<&str>, count: Option<u64>) -> Result<(u64, Vec<String>), DBError> {
let read_txn = self.db.begin_read()?;
// Check if key exists and is a hash
let types_table = read_txn.open_table(TYPES_TABLE)?;
match types_table.get(key)? {
Some(type_val) if type_val.value() == "hash" => {
let hashes_table = read_txn.open_table(HASHES_TABLE)?;
let count = count.unwrap_or(10);
let mut fields = Vec::new();
let mut current_cursor = 0u64;
let mut returned_fields = 0u64;
let mut iter = hashes_table.iter()?;
while let Some(entry) = iter.next() {
let entry = entry?;
let (hash_key, field) = entry.0.value();
let value = entry.1.value();
if hash_key != key {
continue;
}
// Skip fields until we reach the cursor position
if current_cursor < cursor {
current_cursor += 1;
continue;
}
// Check if field matches pattern
let matches = match pattern {
Some(pat) => {
if pat == "*" {
true
} else if pat.contains('*') {
let pattern_parts: Vec<&str> = pat.split('*').collect();
if pattern_parts.len() == 2 {
let prefix = pattern_parts[0];
let suffix = pattern_parts[1];
field.starts_with(prefix) && field.ends_with(suffix)
} else {
field.contains(&pat.replace('*', ""))
}
} else {
field.contains(pat)
}
}
None => true,
};
if matches {
fields.push(field.to_string());
fields.push(value.to_string());
returned_fields += 1;
if returned_fields >= count {
break;
}
}
current_cursor += 1;
}
let next_cursor = if iter.next().is_none() { 0 } else { current_cursor };
Ok((next_cursor, fields))
}
Some(_) => Err(DBError("WRONGTYPE Operation against a key holding the wrong kind of value".to_string())),
None => Ok((0, Vec::new())),
}
}
pub fn ttl(&self, key: &str) -> Result<i64, DBError> {
let read_txn = self.db.begin_read()?;
// Check if key exists
let types_table = read_txn.open_table(TYPES_TABLE)?;
match types_table.get(key)? {
Some(type_val) if type_val.value() == "string" => {
let strings_table = read_txn.open_table(STRINGS_TABLE)?;
match strings_table.get(key)? {
Some(data) => {
let string_value: StringValue = bincode::deserialize(data.value())?;
match string_value.expires_at_ms {
Some(expires_at) => {
let now = now_in_millis();
if now > expires_at {
Ok(-2) // Key expired
} else {
Ok(((expires_at - now) / 1000) as i64) // TTL in seconds
}
}
None => Ok(-1), // No expiration
}
}
None => Ok(-2), // Key doesn't exist
}
}
Some(_) => Ok(-1), // Other types don't have TTL implemented yet
None => Ok(-2), // Key doesn't exist
}
}
pub fn exists(&self, key: &str) -> Result<bool, DBError> {
let read_txn = self.db.begin_read()?;
let types_table = read_txn.open_table(TYPES_TABLE)?;
match types_table.get(key)? {
Some(_) => {
// For string types, check if not expired
if let Some(type_val) = types_table.get(key)? {
if type_val.value() == "string" {
let strings_table = read_txn.open_table(STRINGS_TABLE)?;
if let Some(data) = strings_table.get(key)? {
let string_value: StringValue = bincode::deserialize(data.value())?;
if let Some(expires_at) = string_value.expires_at_ms {
if now_in_millis() > expires_at {
return Ok(false); // Expired
}
}
}
}
}
Ok(true)
}
None => Ok(false),
}
}
// List operations
pub fn lpush(&self, key: &str, elements: Vec<String>) -> Result<u64, DBError> {
let write_txn = self.db.begin_write()?;
let mut new_len = 0u64;
{
let mut types_table = write_txn.open_table(TYPES_TABLE)?;
let mut lists_table = write_txn.open_table(LISTS_TABLE)?;
let existing_type = match types_table.get(key)? {
Some(type_val) => Some(type_val.value().to_string()),
None => None,
};
match existing_type {
Some(ref type_str) if type_str != "list" => {
return Err(DBError("WRONGTYPE Operation against a key holding the wrong kind of value".to_string()));
}
None => {
types_table.insert(key, "list")?;
}
_ => {}
}
let mut list_value: ListValue = match lists_table.get(key)? {
Some(data) => bincode::deserialize(data.value())?,
None => ListValue { elements: Vec::new() },
};
for element in elements.into_iter().rev() {
list_value.elements.insert(0, element);
}
new_len = list_value.elements.len() as u64;
let serialized = bincode::serialize(&list_value)?;
lists_table.insert(key, serialized.as_slice())?;
}
write_txn.commit()?;
Ok(new_len)
}
pub fn rpush(&self, key: &str, elements: Vec<String>) -> Result<u64, DBError> {
let write_txn = self.db.begin_write()?;
let mut new_len = 0u64;
{
let mut types_table = write_txn.open_table(TYPES_TABLE)?;
let mut lists_table = write_txn.open_table(LISTS_TABLE)?;
let existing_type = match types_table.get(key)? {
Some(type_val) => Some(type_val.value().to_string()),
None => None,
};
match existing_type {
Some(ref type_str) if type_str != "list" => {
return Err(DBError("WRONGTYPE Operation against a key holding the wrong kind of value".to_string()));
}
None => {
types_table.insert(key, "list")?;
}
_ => {}
}
let mut list_value: ListValue = match lists_table.get(key)? {
Some(data) => bincode::deserialize(data.value())?,
None => ListValue { elements: Vec::new() },
};
for element in elements {
list_value.elements.push(element);
}
new_len = list_value.elements.len() as u64;
let serialized = bincode::serialize(&list_value)?;
lists_table.insert(key, serialized.as_slice())?;
}
write_txn.commit()?;
Ok(new_len)
}
pub fn lpop(&self, key: &str, count: Option<u64>) -> Result<Option<Vec<String>>, DBError> {
let write_txn = self.db.begin_write()?;
let mut result_elements = Vec::new();
{
let mut types_table = write_txn.open_table(TYPES_TABLE)?;
let mut lists_table = write_txn.open_table(LISTS_TABLE)?;
let existing_type = match types_table.get(key)? {
Some(type_val) => Some(type_val.value().to_string()),
None => None,
};
match existing_type {
Some(ref type_str) if type_str != "list" => {
return Err(DBError("WRONGTYPE Operation against a key holding the wrong kind of value".to_string()));
}
Some(_) => {
let mut list_value: ListValue = match lists_table.get(key)? {
Some(data) => bincode::deserialize(data.value())?,
None => return Ok(None), // Key exists but list is empty (shouldn't happen if type is "list")
};
let num_to_pop = count.unwrap_or(1) as usize;
for _ in 0..num_to_pop {
if !list_value.elements.is_empty() {
result_elements.push(list_value.elements.remove(0));
} else {
break;
}
}
if list_value.elements.is_empty() {
lists_table.remove(key)?;
types_table.remove(key)?;
} else {
let serialized = bincode::serialize(&list_value)?;
lists_table.insert(key, serialized.as_slice())?;
}
}
None => return Ok(None),
}
}
write_txn.commit()?;
if result_elements.is_empty() {
Ok(None)
} else {
Ok(Some(result_elements))
}
}
pub fn rpop(&self, key: &str, count: Option<u64>) -> Result<Option<Vec<String>>, DBError> {
let write_txn = self.db.begin_write()?;
let mut result_elements = Vec::new();
{
let mut types_table = write_txn.open_table(TYPES_TABLE)?;
let mut lists_table = write_txn.open_table(LISTS_TABLE)?;
let existing_type = match types_table.get(key)? {
Some(type_val) => Some(type_val.value().to_string()),
None => None,
};
match existing_type {
Some(ref type_str) if type_str != "list" => {
return Err(DBError("WRONGTYPE Operation against a key holding the wrong kind of value".to_string()));
}
Some(_) => {
let mut list_value: ListValue = match lists_table.get(key)? {
Some(data) => bincode::deserialize(data.value())?,
None => return Ok(None),
};
let num_to_pop = count.unwrap_or(1) as usize;
for _ in 0..num_to_pop {
if let Some(element) = list_value.elements.pop() {
result_elements.push(element);
} else {
break;
}
}
if list_value.elements.is_empty() {
lists_table.remove(key)?;
types_table.remove(key)?;
} else {
let serialized = bincode::serialize(&list_value)?;
lists_table.insert(key, serialized.as_slice())?;
}
}
None => return Ok(None),
}
}
write_txn.commit()?;
if result_elements.is_empty() {
Ok(None)
} else {
Ok(Some(result_elements))
}
}
pub fn llen(&self, key: &str) -> Result<u64, DBError> {
let read_txn = self.db.begin_read()?;
let types_table = read_txn.open_table(TYPES_TABLE)?;
match types_table.get(key)? {
Some(type_val) if type_val.value() == "list" => {
let lists_table = read_txn.open_table(LISTS_TABLE)?;
match lists_table.get(key)? {
Some(data) => {
let list_value: ListValue = bincode::deserialize(data.value())?;
Ok(list_value.elements.len() as u64)
}
None => Ok(0), // Key exists but list is empty
}
}
Some(_) => Err(DBError("WRONGTYPE Operation against a key holding the wrong kind of value".to_string())),
None => Ok(0), // Key does not exist
}
}
pub fn lrem(&self, key: &str, count: i64, element: &str) -> Result<u64, DBError> {
let write_txn = self.db.begin_write()?;
let mut removed_count = 0u64;
{
let mut types_table = write_txn.open_table(TYPES_TABLE)?;
let mut lists_table = write_txn.open_table(LISTS_TABLE)?;
let existing_type = match types_table.get(key)? {
Some(type_val) => Some(type_val.value().to_string()),
None => None,
};
match existing_type {
Some(ref type_str) if type_str != "list" => {
return Err(DBError("WRONGTYPE Operation against a key holding the wrong kind of value".to_string()));
}
Some(_) => {
let mut list_value: ListValue = match lists_table.get(key)? {
Some(data) => bincode::deserialize(data.value())?,
None => return Ok(0),
};
let initial_len = list_value.elements.len();
if count > 0 {
let mut i = 0;
let mut removed = 0;
while i < list_value.elements.len() && removed < count {
if list_value.elements[i] == element {
list_value.elements.remove(i);
removed += 1;
} else {
i += 1;
}
}
} else if count < 0 {
let mut i = list_value.elements.len() as i32 - 1;
let mut removed = 0;
while i >= 0 && removed < -count {
if list_value.elements[i as usize] == element {
list_value.elements.remove(i as usize);
removed += 1;
}
i -= 1;
}
} else { // count == 0
list_value.elements.retain(|el| el != element);
}
removed_count = (initial_len - list_value.elements.len()) as u64;
if list_value.elements.is_empty() {
lists_table.remove(key)?;
types_table.remove(key)?;
} else {
let serialized = bincode::serialize(&list_value)?;
lists_table.insert(key, serialized.as_slice())?;
}
}
None => return Ok(0),
}
}
write_txn.commit()?;
Ok(removed_count)
}
pub fn ltrim(&self, key: &str, start: i64, stop: i64) -> Result<(), DBError> {
let write_txn = self.db.begin_write()?;
{
let mut types_table = write_txn.open_table(TYPES_TABLE)?;
let mut lists_table = write_txn.open_table(LISTS_TABLE)?;
let existing_type = match types_table.get(key)? {
Some(type_val) => Some(type_val.value().to_string()),
None => None,
};
match existing_type {
Some(ref type_str) if type_str != "list" => {
return Err(DBError("WRONGTYPE Operation against a key holding the wrong kind of value".to_string()));
}
Some(_) => {
let mut list_value: ListValue = match lists_table.get(key)? {
Some(data) => bincode::deserialize(data.value())?,
None => return Ok(()),
};
let len = list_value.elements.len() as i64;
let mut start = start;
let mut stop = stop;
if start < 0 {
start += len;
}
if stop < 0 {
stop += len;
}
if start < 0 {
start = 0;
}
if start > stop || start >= len {
list_value.elements.clear();
} else {
if stop >= len {
stop = len - 1;
}
let start = start as usize;
let stop = stop as usize;
list_value.elements = list_value.elements.drain(start..=stop).collect();
}
if list_value.elements.is_empty() {
lists_table.remove(key)?;
types_table.remove(key)?;
} else {
let serialized = bincode::serialize(&list_value)?;
lists_table.insert(key, serialized.as_slice())?;
}
}
None => {}
}
}
write_txn.commit()?;
Ok(())
}
pub fn lindex(&self, key: &str, index: i64) -> Result<Option<String>, DBError> {
let read_txn = self.db.begin_read()?;
let types_table = read_txn.open_table(TYPES_TABLE)?;
match types_table.get(key)? {
Some(type_val) if type_val.value() == "list" => {
let lists_table = read_txn.open_table(LISTS_TABLE)?;
match lists_table.get(key)? {
Some(data) => {
let list_value: ListValue = bincode::deserialize(data.value())?;
let len = list_value.elements.len() as i64;
let mut index = index;
if index < 0 {
index += len;
}
if index < 0 || index >= len {
Ok(None)
} else {
Ok(list_value.elements.get(index as usize).cloned())
}
}
None => Ok(None),
}
}
Some(_) => Err(DBError("WRONGTYPE Operation against a key holding the wrong kind of value".to_string())),
None => Ok(None),
}
}
pub fn lrange(&self, key: &str, start: i64, stop: i64) -> Result<Vec<String>, DBError> {
let read_txn = self.db.begin_read()?;
let types_table = read_txn.open_table(TYPES_TABLE)?;
match types_table.get(key)? {
Some(type_val) if type_val.value() == "list" => {
let lists_table = read_txn.open_table(LISTS_TABLE)?;
match lists_table.get(key)? {
Some(data) => {
let list_value: ListValue = bincode::deserialize(data.value())?;
let len = list_value.elements.len() as i64;
let mut start = start;
let mut stop = stop;
if start < 0 {
start += len;
}
if stop < 0 {
stop += len;
}
if start < 0 {
start = 0;
}
if start > stop || start >= len {
Ok(Vec::new())
} else {
if stop >= len {
stop = len - 1;
}
Ok(list_value.elements[start as usize..=stop as usize].to_vec())
}
}
None => Ok(Vec::new()),
}
}
Some(_) => Err(DBError("WRONGTYPE Operation against a key holding the wrong kind of value".to_string())),
None => Ok(Vec::new()),
}
}
}

View File

@ -14,7 +14,7 @@ NC='\033[0m' # No Color
# Configuration
DB_DIR="./test_db"
PORT=6379
PORT=6381
SERVER_PID=""
# Function to print colored output

60
tests/debug_hset.rs Normal file
View File

@ -0,0 +1,60 @@
use redis_rs::{server::Server, options::DBOption};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::sleep;
// Helper function to send command and get response
async fn send_command(stream: &mut TcpStream, command: &str) -> String {
stream.write_all(command.as_bytes()).await.unwrap();
let mut buffer = [0; 1024];
let n = stream.read(&mut buffer).await.unwrap();
String::from_utf8_lossy(&buffer[..n]).to_string()
}
#[tokio::test]
async fn debug_hset_simple() {
// Clean up any existing test database
let test_dir = "/tmp/herodb_debug_hset";
let _ = std::fs::remove_dir_all(test_dir);
std::fs::create_dir_all(test_dir).unwrap();
let port = 16500;
let option = DBOption {
dir: test_dir.to_string(),
port,
debug: false,
};
let mut server = Server::new(option).await;
// Start server in background
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.unwrap();
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
}
}
});
sleep(Duration::from_millis(200)).await;
let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port)).await.unwrap();
// Test simple HSET
println!("Testing HSET...");
let response = send_command(&mut stream, "*4\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n$6\r\nvalue1\r\n").await;
println!("HSET response: {}", response);
assert!(response.contains("1"), "Expected '1' but got: {}", response);
// Test HGET
println!("Testing HGET...");
let response = send_command(&mut stream, "*3\r\n$4\r\nHGET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n").await;
println!("HGET response: {}", response);
assert!(response.contains("value1"), "Expected 'value1' but got: {}", response);
}

View File

@ -0,0 +1,54 @@
use redis_rs::{server::Server, options::DBOption};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::sleep;
#[tokio::test]
async fn debug_hset_return_value() {
let test_dir = "/tmp/herodb_debug_hset_return";
// Clean up any existing test data
let _ = std::fs::remove_dir_all(&test_dir);
std::fs::create_dir_all(&test_dir).unwrap();
let option = DBOption {
dir: test_dir.to_string(),
port: 16390,
debug: false,
};
let mut server = Server::new(option).await;
// Start server in background
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind("127.0.0.1:16390")
.await
.unwrap();
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
}
}
});
sleep(Duration::from_millis(200)).await;
// Connect and test HSET
let mut stream = TcpStream::connect("127.0.0.1:16390").await.unwrap();
// Send HSET command
let cmd = "*4\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n$6\r\nvalue1\r\n";
stream.write_all(cmd.as_bytes()).await.unwrap();
let mut buffer = [0; 1024];
let n = stream.read(&mut buffer).await.unwrap();
let response = String::from_utf8_lossy(&buffer[..n]);
println!("HSET response: {}", response);
println!("Response bytes: {:?}", &buffer[..n]);
// Check if response contains "1"
assert!(response.contains("1"), "Expected response to contain '1', got: {}", response);
}

35
tests/debug_protocol.rs Normal file
View File

@ -0,0 +1,35 @@
use redis_rs::protocol::Protocol;
use redis_rs::cmd::Cmd;
#[test]
fn test_protocol_parsing() {
// Test TYPE command parsing
let type_cmd = "*2\r\n$4\r\nTYPE\r\n$7\r\nnoexist\r\n";
println!("Parsing TYPE command: {}", type_cmd.replace("\r\n", "\\r\\n"));
match Protocol::from(type_cmd) {
Ok((protocol, _)) => {
println!("Protocol parsed successfully: {:?}", protocol);
match Cmd::from(type_cmd) {
Ok((cmd, _, _)) => println!("Command parsed successfully: {:?}", cmd),
Err(e) => println!("Command parsing failed: {:?}", e),
}
}
Err(e) => println!("Protocol parsing failed: {:?}", e),
}
// Test HEXISTS command parsing
let hexists_cmd = "*3\r\n$7\r\nHEXISTS\r\n$4\r\nhash\r\n$7\r\nnoexist\r\n";
println!("\nParsing HEXISTS command: {}", hexists_cmd.replace("\r\n", "\\r\\n"));
match Protocol::from(hexists_cmd) {
Ok((protocol, _)) => {
println!("Protocol parsed successfully: {:?}", protocol);
match Cmd::from(hexists_cmd) {
Ok((cmd, _, _)) => println!("Command parsed successfully: {:?}", cmd),
Err(e) => println!("Command parsing failed: {:?}", e),
}
}
Err(e) => println!("Protocol parsing failed: {:?}", e),
}
}

View File

@ -0,0 +1,461 @@
use redis::{Client, Commands, Connection};
use std::process::{Child, Command};
use std::time::Duration;
use tokio::time::sleep;
// Helper function to get Redis connection, retrying until successful
fn get_redis_connection(port: u16) -> Connection {
let connection_info = format!("redis://127.0.0.1:{}", port);
let client = Client::open(connection_info).unwrap();
let mut attempts = 0;
loop {
match client.get_connection() {
Ok(mut conn) => {
if redis::cmd("PING").query::<String>(&mut conn).is_ok() {
return conn;
}
}
Err(e) => {
if attempts >= 20 {
panic!(
"Failed to connect to Redis server after 20 attempts: {}",
e
);
}
}
}
attempts += 1;
std::thread::sleep(Duration::from_millis(100));
}
}
// A guard to ensure the server process is killed when it goes out of scope
struct ServerProcessGuard {
process: Child,
test_dir: String,
}
impl Drop for ServerProcessGuard {
fn drop(&mut self) {
println!("Killing server process (pid: {})...", self.process.id());
if let Err(e) = self.process.kill() {
eprintln!("Failed to kill server process: {}", e);
}
match self.process.wait() {
Ok(status) => println!("Server process exited with: {}", status),
Err(e) => eprintln!("Failed to wait on server process: {}", e),
}
// Clean up the specific test directory
println!("Cleaning up test directory: {}", self.test_dir);
if let Err(e) = std::fs::remove_dir_all(&self.test_dir) {
eprintln!("Failed to clean up test directory: {}", e);
}
}
}
// Helper to set up the server and return a connection
fn setup_server() -> (ServerProcessGuard, u16) {
use std::sync::atomic::{AtomicU16, Ordering};
static PORT_COUNTER: AtomicU16 = AtomicU16::new(16400);
let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst);
let test_dir = format!("/tmp/herodb_test_{}", port);
// Clean up previous test data
if std::path::Path::new(&test_dir).exists() {
let _ = std::fs::remove_dir_all(&test_dir);
}
std::fs::create_dir_all(&test_dir).unwrap();
// Start the server in a subprocess
let child = Command::new("cargo")
.args(&[
"run",
"--",
"--dir",
&test_dir,
"--port",
&port.to_string(),
])
.spawn()
.expect("Failed to start server process");
// Create a new guard that also owns the test directory path
let guard = ServerProcessGuard {
process: child,
test_dir,
};
// Give the server a moment to start
std::thread::sleep(Duration::from_millis(100));
(guard, port)
}
#[tokio::test]
async fn all_tests() {
let (_server_guard, port) = setup_server();
let mut conn = get_redis_connection(port);
// Run all tests using the same connection
cleanup_keys(&mut conn).await;
test_basic_ping(&mut conn).await;
cleanup_keys(&mut conn).await;
test_string_operations(&mut conn).await;
cleanup_keys(&mut conn).await;
test_incr_operations(&mut conn).await;
// cleanup_keys(&mut conn).await;
// test_hash_operations(&mut conn).await;
cleanup_keys(&mut conn).await;
test_expiration(&mut conn).await;
cleanup_keys(&mut conn).await;
test_scan_operations(&mut conn).await;
cleanup_keys(&mut conn).await;
test_scan_with_count(&mut conn).await;
cleanup_keys(&mut conn).await;
test_hscan_operations(&mut conn).await;
cleanup_keys(&mut conn).await;
test_transaction_operations(&mut conn).await;
cleanup_keys(&mut conn).await;
test_discard_transaction(&mut conn).await;
cleanup_keys(&mut conn).await;
test_type_command(&mut conn).await;
cleanup_keys(&mut conn).await;
test_config_commands(&mut conn).await;
cleanup_keys(&mut conn).await;
test_info_command(&mut conn).await;
cleanup_keys(&mut conn).await;
test_error_handling(&mut conn).await;
// Clean up keys after all tests
cleanup_keys(&mut conn).await;
}
async fn cleanup_keys(conn: &mut Connection) {
let keys: Vec<String> = redis::cmd("KEYS").arg("*").query(conn).unwrap();
if !keys.is_empty() {
let _: () = redis::cmd("DEL").arg(keys).query(conn).unwrap();
}
}
async fn test_basic_ping(conn: &mut Connection) {
let result: String = redis::cmd("PING").query(conn).unwrap();
assert_eq!(result, "PONG");
}
async fn test_string_operations(conn: &mut Connection) {
// Test SET
let _: () = conn.set("key", "value").unwrap();
// Test GET
let result: String = conn.get("key").unwrap();
assert_eq!(result, "value");
// Test GET non-existent key
let result: Option<String> = conn.get("noexist").unwrap();
assert_eq!(result, None);
// Test DEL
let deleted: i32 = conn.del("key").unwrap();
assert_eq!(deleted, 1);
// Test GET after DEL
let result: Option<String> = conn.get("key").unwrap();
assert_eq!(result, None);
}
async fn test_incr_operations(conn: &mut Connection) {
// Test INCR on non-existent key
let result: i32 = redis::cmd("INCR").arg("counter").query(conn).unwrap();
assert_eq!(result, 1);
// Test INCR on existing key
let result: i32 = redis::cmd("INCR").arg("counter").query(conn).unwrap();
assert_eq!(result, 2);
// Test INCR on string value (should fail)
let _: () = conn.set("string", "hello").unwrap();
let result: Result<i32, _> = redis::cmd("INCR").arg("string").query(conn);
assert!(result.is_err());
}
async fn test_hash_operations(conn: &mut Connection) {
// Test HSET
let result: i32 = conn.hset("hash", "field1", "value1").unwrap();
assert_eq!(result, 1); // 1 new field
// Test HGET
let result: String = conn.hget("hash", "field1").unwrap();
assert_eq!(result, "value1");
// Test HSET multiple fields
let _: () = conn.hset_multiple("hash", &[("field2", "value2"), ("field3", "value3")]).unwrap();
// Test HGETALL
let result: std::collections::HashMap<String, String> = conn.hgetall("hash").unwrap();
assert_eq!(result.len(), 3);
assert_eq!(result.get("field1").unwrap(), "value1");
assert_eq!(result.get("field2").unwrap(), "value2");
assert_eq!(result.get("field3").unwrap(), "value3");
// Test HLEN
let result: i32 = conn.hlen("hash").unwrap();
assert_eq!(result, 3);
// Test HEXISTS
let result: bool = conn.hexists("hash", "field1").unwrap();
assert_eq!(result, true);
let result: bool = conn.hexists("hash", "noexist").unwrap();
assert_eq!(result, false);
// Test HDEL
let result: i32 = conn.hdel("hash", "field1").unwrap();
assert_eq!(result, 1);
// Test HKEYS
let mut result: Vec<String> = conn.hkeys("hash").unwrap();
result.sort();
assert_eq!(result, vec!["field2", "field3"]);
// Test HVALS
let mut result: Vec<String> = conn.hvals("hash").unwrap();
result.sort();
assert_eq!(result, vec!["value2", "value3"]);
}
async fn test_expiration(conn: &mut Connection) {
// Test SETEX (expire in 1 second)
let _: () = conn.set_ex("expkey", "value", 1).unwrap();
// Test TTL
let result: i32 = conn.ttl("expkey").unwrap();
assert!(result == 1 || result == 0); // Should be 1 or 0 seconds
// Test EXISTS
let result: bool = conn.exists("expkey").unwrap();
assert_eq!(result, true);
// Wait for expiration
sleep(Duration::from_millis(1100)).await;
// Test GET after expiration
let result: Option<String> = conn.get("expkey").unwrap();
assert_eq!(result, None);
// Test TTL after expiration
let result: i32 = conn.ttl("expkey").unwrap();
assert_eq!(result, -2); // Key doesn't exist
// Test EXISTS after expiration
let result: bool = conn.exists("expkey").unwrap();
assert_eq!(result, false);
}
async fn test_scan_operations(conn: &mut Connection) {
// Set up test data
for i in 0..5 {
let _: () = conn.set(format!("key{}", i), format!("value{}", i)).unwrap();
}
// Test SCAN
let result: (u64, Vec<String>) = redis::cmd("SCAN")
.arg(0)
.arg("MATCH")
.arg("*")
.arg("COUNT")
.arg(10)
.query(conn)
.unwrap();
let (cursor, keys) = result;
assert_eq!(cursor, 0); // Should complete in one scan
assert_eq!(keys.len(), 5);
// Test KEYS
let mut result: Vec<String> = redis::cmd("KEYS").arg("*").query(conn).unwrap();
result.sort();
assert_eq!(result, vec!["key0", "key1", "key2", "key3", "key4"]);
}
async fn test_scan_with_count(conn: &mut Connection) {
// Clean up previous keys
let keys: Vec<String> = redis::cmd("KEYS").arg("scan_key*").query(conn).unwrap();
if !keys.is_empty() {
let _: () = redis::cmd("DEL").arg(keys).query(conn).unwrap();
}
// Set up test data
for i in 0..15 {
let _: () = conn.set(format!("scan_key{}", i), i).unwrap();
}
let mut cursor = 0;
let mut all_keys = std::collections::HashSet::new();
// First SCAN
let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg("scan_key*")
.arg("COUNT")
.arg(5)
.query(conn)
.unwrap();
assert_ne!(next_cursor, 0);
assert_eq!(keys.len(), 5);
for key in keys {
all_keys.insert(key);
}
cursor = next_cursor;
// Second SCAN
let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg("scan_key*")
.arg("COUNT")
.arg(5)
.query(conn)
.unwrap();
assert_ne!(next_cursor, 0);
assert_eq!(keys.len(), 5);
for key in keys {
all_keys.insert(key);
}
cursor = next_cursor;
// Final SCAN
let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
.arg(cursor)
.arg("MATCH")
.arg("scan_key*")
.arg("COUNT")
.arg(5)
.query(conn)
.unwrap();
assert_eq!(next_cursor, 0);
assert_eq!(keys.len(), 5);
for key in keys {
all_keys.insert(key);
}
assert_eq!(all_keys.len(), 15);
}
async fn test_hscan_operations(conn: &mut Connection) {
// Set up hash data
for i in 0..3 {
let _: () = conn.hset("testhash", format!("field{}", i), format!("value{}", i)).unwrap();
}
// Test HSCAN
let result: (u64, Vec<String>) = redis::cmd("HSCAN")
.arg("testhash")
.arg(0)
.arg("MATCH")
.arg("*")
.arg("COUNT")
.arg(10)
.query(conn)
.unwrap();
let (cursor, fields) = result;
assert_eq!(cursor, 0); // Should complete in one scan
assert_eq!(fields.len(), 6); // 3 field-value pairs = 6 elements
}
async fn test_transaction_operations(conn: &mut Connection) {
// Test MULTI/EXEC
let _: () = redis::cmd("MULTI").query(conn).unwrap();
let _: () = redis::cmd("SET").arg("key1").arg("value1").query(conn).unwrap();
let _: () = redis::cmd("SET").arg("key2").arg("value2").query(conn).unwrap();
let _: Vec<String> = redis::cmd("EXEC").query(conn).unwrap();
// Verify commands were executed
let result: String = conn.get("key1").unwrap();
assert_eq!(result, "value1");
let result: String = conn.get("key2").unwrap();
assert_eq!(result, "value2");
}
async fn test_discard_transaction(conn: &mut Connection) {
// Test MULTI/DISCARD
let _: () = redis::cmd("MULTI").query(conn).unwrap();
let _: () = redis::cmd("SET").arg("discard").arg("value").query(conn).unwrap();
let _: () = redis::cmd("DISCARD").query(conn).unwrap();
// Verify command was not executed
let result: Option<String> = conn.get("discard").unwrap();
assert_eq!(result, None);
}
async fn test_type_command(conn: &mut Connection) {
// Test string type
let _: () = conn.set("string", "value").unwrap();
let result: String = redis::cmd("TYPE").arg("string").query(conn).unwrap();
assert_eq!(result, "string");
// Test hash type
let _: () = conn.hset("hash", "field", "value").unwrap();
let result: String = redis::cmd("TYPE").arg("hash").query(conn).unwrap();
assert_eq!(result, "hash");
// Test non-existent key
let result: String = redis::cmd("TYPE").arg("noexist").query(conn).unwrap();
assert_eq!(result, "none");
}
async fn test_config_commands(conn: &mut Connection) {
// Test CONFIG GET databases
let result: Vec<String> = redis::cmd("CONFIG")
.arg("GET")
.arg("databases")
.query(conn)
.unwrap();
assert_eq!(result, vec!["databases", "16"]);
// Test CONFIG GET dir
let result: Vec<String> = redis::cmd("CONFIG")
.arg("GET")
.arg("dir")
.query(conn)
.unwrap();
assert_eq!(result[0], "dir");
assert!(result[1].contains("/tmp/herodb_test_"));
}
async fn test_info_command(conn: &mut Connection) {
// Test INFO
let result: String = redis::cmd("INFO").query(conn).unwrap();
assert!(result.contains("redis_version"));
// Test INFO replication
let result: String = redis::cmd("INFO").arg("replication").query(conn).unwrap();
assert!(result.contains("role:master"));
}
async fn test_error_handling(conn: &mut Connection) {
// Test WRONGTYPE error - try to use hash command on string
let _: () = conn.set("string", "value").unwrap();
let result: Result<String, _> = conn.hget("string", "field");
assert!(result.is_err());
// Test unknown command
let result: Result<String, _> = redis::cmd("UNKNOWN").query(conn);
assert!(result.is_err());
// Test EXEC without MULTI
let result: Result<Vec<String>, _> = redis::cmd("EXEC").query(conn);
assert!(result.is_err());
// Test DISCARD without MULTI
let result: Result<(), _> = redis::cmd("DISCARD").query(conn);
assert!(result.is_err());
}

609
tests/redis_tests.rs Normal file
View File

@ -0,0 +1,609 @@
use redis_rs::{server::Server, options::DBOption};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::sleep;
// Helper function to start a test server
async fn start_test_server(test_name: &str) -> (Server, u16) {
use std::sync::atomic::{AtomicU16, Ordering};
static PORT_COUNTER: AtomicU16 = AtomicU16::new(16379);
let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst);
let test_dir = format!("/tmp/herodb_test_{}", test_name);
// Clean up and create test directory
let _ = std::fs::remove_dir_all(&test_dir);
std::fs::create_dir_all(&test_dir).unwrap();
let option = DBOption {
dir: test_dir,
port,
debug: true,
};
let server = Server::new(option).await;
(server, port)
}
// Helper function to connect to the test server
async fn connect_to_server(port: u16) -> TcpStream {
let mut attempts = 0;
loop {
match TcpStream::connect(format!("127.0.0.1:{}", port)).await {
Ok(stream) => return stream,
Err(_) if attempts < 10 => {
attempts += 1;
sleep(Duration::from_millis(100)).await;
}
Err(e) => panic!("Failed to connect to test server: {}", e),
}
}
}
// Helper function to send command and get response
async fn send_command(stream: &mut TcpStream, command: &str) -> String {
stream.write_all(command.as_bytes()).await.unwrap();
let mut buffer = [0; 1024];
let n = stream.read(&mut buffer).await.unwrap();
String::from_utf8_lossy(&buffer[..n]).to_string()
}
#[tokio::test]
async fn test_basic_ping() {
let (mut server, port) = start_test_server("ping").await;
// Start server in background
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.unwrap();
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
}
}
});
sleep(Duration::from_millis(100)).await;
let mut stream = connect_to_server(port).await;
let response = send_command(&mut stream, "*1\r\n$4\r\nPING\r\n").await;
assert!(response.contains("PONG"));
}
#[tokio::test]
async fn test_string_operations() {
let (mut server, port) = start_test_server("string").await;
// Start server in background
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.unwrap();
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
}
}
});
sleep(Duration::from_millis(100)).await;
let mut stream = connect_to_server(port).await;
// Test SET
let response = send_command(&mut stream, "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n").await;
assert!(response.contains("OK"));
// Test GET
let response = send_command(&mut stream, "*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n").await;
assert!(response.contains("value"));
// Test GET non-existent key
let response = send_command(&mut stream, "*2\r\n$3\r\nGET\r\n$7\r\nnoexist\r\n").await;
assert!(response.contains("$-1")); // NULL response
// Test DEL
let response = send_command(&mut stream, "*2\r\n$3\r\nDEL\r\n$3\r\nkey\r\n").await;
assert!(response.contains("1"));
// Test GET after DEL
let response = send_command(&mut stream, "*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n").await;
assert!(response.contains("$-1")); // NULL response
}
#[tokio::test]
async fn test_incr_operations() {
let (mut server, port) = start_test_server("incr").await;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.unwrap();
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
}
}
});
sleep(Duration::from_millis(100)).await;
let mut stream = connect_to_server(port).await;
// Test INCR on non-existent key
let response = send_command(&mut stream, "*2\r\n$4\r\nINCR\r\n$7\r\ncounter\r\n").await;
assert!(response.contains("1"));
// Test INCR on existing key
let response = send_command(&mut stream, "*2\r\n$4\r\nINCR\r\n$7\r\ncounter\r\n").await;
assert!(response.contains("2"));
// Test INCR on string value (should fail)
send_command(&mut stream, "*3\r\n$3\r\nSET\r\n$6\r\nstring\r\n$5\r\nhello\r\n").await;
let response = send_command(&mut stream, "*2\r\n$4\r\nINCR\r\n$6\r\nstring\r\n").await;
assert!(response.contains("ERR"));
}
#[tokio::test]
async fn test_hash_operations() {
let (mut server, port) = start_test_server("hash").await;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.unwrap();
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
}
}
});
sleep(Duration::from_millis(100)).await;
let mut stream = connect_to_server(port).await;
// Test HSET
let response = send_command(&mut stream, "*4\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n$6\r\nvalue1\r\n").await;
assert!(response.contains("1")); // 1 new field
// Test HGET
let response = send_command(&mut stream, "*3\r\n$4\r\nHGET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n").await;
assert!(response.contains("value1"));
// Test HSET multiple fields
let response = send_command(&mut stream, "*6\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$6\r\nfield2\r\n$6\r\nvalue2\r\n$6\r\nfield3\r\n$6\r\nvalue3\r\n").await;
assert!(response.contains("2")); // 2 new fields
// Test HGETALL
let response = send_command(&mut stream, "*2\r\n$7\r\nHGETALL\r\n$4\r\nhash\r\n").await;
assert!(response.contains("field1"));
assert!(response.contains("value1"));
assert!(response.contains("field2"));
assert!(response.contains("value2"));
// Test HLEN
let response = send_command(&mut stream, "*2\r\n$4\r\nHLEN\r\n$4\r\nhash\r\n").await;
assert!(response.contains("3"));
// Test HEXISTS
let response = send_command(&mut stream, "*3\r\n$7\r\nHEXISTS\r\n$4\r\nhash\r\n$6\r\nfield1\r\n").await;
assert!(response.contains("1"));
let response = send_command(&mut stream, "*3\r\n$7\r\nHEXISTS\r\n$4\r\nhash\r\n$7\r\nnoexist\r\n").await;
assert!(response.contains("0"));
// Test HDEL
let response = send_command(&mut stream, "*3\r\n$4\r\nHDEL\r\n$4\r\nhash\r\n$6\r\nfield1\r\n").await;
assert!(response.contains("1"));
// Test HKEYS
let response = send_command(&mut stream, "*2\r\n$5\r\nHKEYS\r\n$4\r\nhash\r\n").await;
assert!(response.contains("field2"));
assert!(response.contains("field3"));
assert!(!response.contains("field1")); // Should be deleted
// Test HVALS
let response = send_command(&mut stream, "*2\r\n$5\r\nHVALS\r\n$4\r\nhash\r\n").await;
assert!(response.contains("value2"));
assert!(response.contains("value3"));
}
#[tokio::test]
async fn test_expiration() {
let (mut server, port) = start_test_server("expiration").await;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.unwrap();
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
}
}
});
sleep(Duration::from_millis(100)).await;
let mut stream = connect_to_server(port).await;
// Test SETEX (expire in 1 second)
let response = send_command(&mut stream, "*5\r\n$3\r\nSET\r\n$6\r\nexpkey\r\n$5\r\nvalue\r\n$2\r\nEX\r\n$1\r\n1\r\n").await;
assert!(response.contains("OK"));
// Test TTL
let response = send_command(&mut stream, "*2\r\n$3\r\nTTL\r\n$6\r\nexpkey\r\n").await;
assert!(response.contains("1") || response.contains("0")); // Should be 1 or 0 seconds
// Test EXISTS
let response = send_command(&mut stream, "*2\r\n$6\r\nEXISTS\r\n$6\r\nexpkey\r\n").await;
assert!(response.contains("1"));
// Wait for expiration
sleep(Duration::from_millis(1100)).await;
// Test GET after expiration
let response = send_command(&mut stream, "*2\r\n$3\r\nGET\r\n$6\r\nexpkey\r\n").await;
assert!(response.contains("$-1")); // Should be NULL
// Test TTL after expiration
let response = send_command(&mut stream, "*2\r\n$3\r\nTTL\r\n$6\r\nexpkey\r\n").await;
assert!(response.contains("-2")); // Key doesn't exist
// Test EXISTS after expiration
let response = send_command(&mut stream, "*2\r\n$6\r\nEXISTS\r\n$6\r\nexpkey\r\n").await;
assert!(response.contains("0"));
}
#[tokio::test]
async fn test_scan_operations() {
let (mut server, port) = start_test_server("scan").await;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.unwrap();
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
}
}
});
sleep(Duration::from_millis(100)).await;
let mut stream = connect_to_server(port).await;
// Set up test data
for i in 0..5 {
let cmd = format!("*3\r\n$3\r\nSET\r\n$4\r\nkey{}\r\n$6\r\nvalue{}\r\n", i, i);
send_command(&mut stream, &cmd).await;
}
// Test SCAN
let response = send_command(&mut stream, "*6\r\n$4\r\nSCAN\r\n$1\r\n0\r\n$5\r\nMATCH\r\n$1\r\n*\r\n$5\r\nCOUNT\r\n$2\r\n10\r\n").await;
assert!(response.contains("key"));
// Test KEYS
let response = send_command(&mut stream, "*2\r\n$4\r\nKEYS\r\n$1\r\n*\r\n").await;
assert!(response.contains("key0"));
assert!(response.contains("key1"));
}
#[tokio::test]
async fn test_hscan_operations() {
let (mut server, port) = start_test_server("hscan").await;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.unwrap();
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
}
}
});
sleep(Duration::from_millis(100)).await;
let mut stream = connect_to_server(port).await;
// Set up hash data
for i in 0..3 {
let cmd = format!("*4\r\n$4\r\nHSET\r\n$8\r\ntesthash\r\n$6\r\nfield{}\r\n$6\r\nvalue{}\r\n", i, i);
send_command(&mut stream, &cmd).await;
}
// Test HSCAN
let response = send_command(&mut stream, "*7\r\n$5\r\nHSCAN\r\n$8\r\ntesthash\r\n$1\r\n0\r\n$5\r\nMATCH\r\n$1\r\n*\r\n$5\r\nCOUNT\r\n$2\r\n10\r\n").await;
assert!(response.contains("field"));
assert!(response.contains("value"));
}
#[tokio::test]
async fn test_transaction_operations() {
let (mut server, port) = start_test_server("transaction").await;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.unwrap();
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
}
}
});
sleep(Duration::from_millis(100)).await;
let mut stream = connect_to_server(port).await;
// Test MULTI
let response = send_command(&mut stream, "*1\r\n$5\r\nMULTI\r\n").await;
assert!(response.contains("OK"));
// Test queued commands
let response = send_command(&mut stream, "*3\r\n$3\r\nSET\r\n$4\r\nkey1\r\n$6\r\nvalue1\r\n").await;
assert!(response.contains("QUEUED"));
let response = send_command(&mut stream, "*3\r\n$3\r\nSET\r\n$4\r\nkey2\r\n$6\r\nvalue2\r\n").await;
assert!(response.contains("QUEUED"));
// Test EXEC
let response = send_command(&mut stream, "*1\r\n$4\r\nEXEC\r\n").await;
assert!(response.contains("OK")); // Should contain results of executed commands
// Verify commands were executed
let response = send_command(&mut stream, "*2\r\n$3\r\nGET\r\n$4\r\nkey1\r\n").await;
assert!(response.contains("value1"));
let response = send_command(&mut stream, "*2\r\n$3\r\nGET\r\n$4\r\nkey2\r\n").await;
assert!(response.contains("value2"));
}
#[tokio::test]
async fn test_discard_transaction() {
let (mut server, port) = start_test_server("discard").await;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.unwrap();
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
}
}
});
sleep(Duration::from_millis(100)).await;
let mut stream = connect_to_server(port).await;
// Test MULTI
let response = send_command(&mut stream, "*1\r\n$5\r\nMULTI\r\n").await;
assert!(response.contains("OK"));
// Test queued command
let response = send_command(&mut stream, "*3\r\n$3\r\nSET\r\n$7\r\ndiscard\r\n$5\r\nvalue\r\n").await;
assert!(response.contains("QUEUED"));
// Test DISCARD
let response = send_command(&mut stream, "*1\r\n$7\r\nDISCARD\r\n").await;
assert!(response.contains("OK"));
// Verify command was not executed
let response = send_command(&mut stream, "*2\r\n$3\r\nGET\r\n$7\r\ndiscard\r\n").await;
assert!(response.contains("$-1")); // Should be NULL
}
#[tokio::test]
async fn test_type_command() {
let (mut server, port) = start_test_server("type").await;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.unwrap();
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
}
}
});
sleep(Duration::from_millis(100)).await;
let mut stream = connect_to_server(port).await;
// Test string type
send_command(&mut stream, "*3\r\n$3\r\nSET\r\n$6\r\nstring\r\n$5\r\nvalue\r\n").await;
let response = send_command(&mut stream, "*2\r\n$4\r\nTYPE\r\n$6\r\nstring\r\n").await;
assert!(response.contains("string"));
// Test hash type
send_command(&mut stream, "*4\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$5\r\nfield\r\n$5\r\nvalue\r\n").await;
let response = send_command(&mut stream, "*2\r\n$4\r\nTYPE\r\n$4\r\nhash\r\n").await;
assert!(response.contains("hash"));
// Test non-existent key
let response = send_command(&mut stream, "*2\r\n$4\r\nTYPE\r\n$7\r\nnoexist\r\n").await;
assert!(response.contains("none"));
}
#[tokio::test]
async fn test_config_commands() {
let (mut server, port) = start_test_server("config").await;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.unwrap();
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
}
}
});
sleep(Duration::from_millis(100)).await;
let mut stream = connect_to_server(port).await;
// Test CONFIG GET databases
let response = send_command(&mut stream, "*3\r\n$6\r\nCONFIG\r\n$3\r\nGET\r\n$9\r\ndatabases\r\n").await;
assert!(response.contains("databases"));
assert!(response.contains("16"));
// Test CONFIG GET dir
let response = send_command(&mut stream, "*3\r\n$6\r\nCONFIG\r\n$3\r\nGET\r\n$3\r\ndir\r\n").await;
assert!(response.contains("dir"));
assert!(response.contains("/tmp/herodb_test_config"));
}
#[tokio::test]
async fn test_info_command() {
let (mut server, port) = start_test_server("info").await;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.unwrap();
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
}
}
});
sleep(Duration::from_millis(100)).await;
let mut stream = connect_to_server(port).await;
// Test INFO
let response = send_command(&mut stream, "*1\r\n$4\r\nINFO\r\n").await;
assert!(response.contains("redis_version"));
// Test INFO replication
let response = send_command(&mut stream, "*2\r\n$4\r\nINFO\r\n$11\r\nreplication\r\n").await;
assert!(response.contains("role:master"));
}
#[tokio::test]
async fn test_error_handling() {
let (mut server, port) = start_test_server("error").await;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.unwrap();
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
}
}
});
sleep(Duration::from_millis(100)).await;
let mut stream = connect_to_server(port).await;
// Test WRONGTYPE error - try to use hash command on string
send_command(&mut stream, "*3\r\n$3\r\nSET\r\n$6\r\nstring\r\n$5\r\nvalue\r\n").await;
let response = send_command(&mut stream, "*3\r\n$4\r\nHGET\r\n$6\r\nstring\r\n$5\r\nfield\r\n").await;
assert!(response.contains("WRONGTYPE"));
// Test unknown command
let response = send_command(&mut stream, "*1\r\n$7\r\nUNKNOWN\r\n").await;
assert!(response.contains("unknown cmd") || response.contains("ERR"));
// Test EXEC without MULTI
let response = send_command(&mut stream, "*1\r\n$4\r\nEXEC\r\n").await;
assert!(response.contains("ERR"));
// Test DISCARD without MULTI
let response = send_command(&mut stream, "*1\r\n$7\r\nDISCARD\r\n").await;
assert!(response.contains("ERR"));
}
#[tokio::test]
async fn test_list_operations() {
let (mut server, port) = start_test_server("list").await;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.unwrap();
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
}
}
});
sleep(Duration::from_millis(100)).await;
let mut stream = connect_to_server(port).await;
// Test LPUSH
let response = send_command(&mut stream, "*4\r\n$5\r\nLPUSH\r\n$4\r\nlist\r\n$1\r\na\r\n$1\r\nb\r\n").await;
assert!(response.contains("2")); // 2 elements
// Test RPUSH
let response = send_command(&mut stream, "*4\r\n$5\r\nRPUSH\r\n$4\r\nlist\r\n$1\r\nc\r\n$1\r\nd\r\n").await;
assert!(response.contains("4")); // 4 elements
// Test LLEN
let response = send_command(&mut stream, "*2\r\n$4\r\nLLEN\r\n$4\r\nlist\r\n").await;
assert!(response.contains("4"));
// Test LRANGE
let response = send_command(&mut stream, "*4\r\n$6\r\nLRANGE\r\n$4\r\nlist\r\n$1\r\n0\r\n$2\r\n-1\r\n").await;
assert!(response.contains("b"));
assert!(response.contains("a"));
assert!(response.contains("c"));
assert!(response.contains("d"));
// Test LINDEX
let response = send_command(&mut stream, "*3\r\n$6\r\nLINDEX\r\n$4\r\nlist\r\n$1\r\n0\r\n").await;
assert!(response.contains("b"));
// Test LPOP
let response = send_command(&mut stream, "*2\r\n$4\r\nLPOP\r\n$4\r\nlist\r\n").await;
assert!(response.contains("b"));
// Test RPOP
let response = send_command(&mut stream, "*2\r\n$4\r\nRPOP\r\n$4\r\nlist\r\n").await;
assert!(response.contains("d"));
// Test LREM
send_command(&mut stream, "*3\r\n$5\r\nLPUSH\r\n$4\r\nlist\r\n$1\r\na\r\n").await; // list is now a, c, a
let response = send_command(&mut stream, "*4\r\n$4\r\nLREM\r\n$4\r\nlist\r\n$1\r\n1\r\n$1\r\na\r\n").await;
assert!(response.contains("1"));
// Test LTRIM
let response = send_command(&mut stream, "*4\r\n$5\r\nLTRIM\r\n$4\r\nlist\r\n$1\r\n0\r\n$1\r\n0\r\n").await;
assert!(response.contains("OK"));
let response = send_command(&mut stream, "*2\r\n$4\r\nLLEN\r\n$4\r\nlist\r\n").await;
assert!(response.contains("1"));
}

View File

@ -0,0 +1,207 @@
use redis_rs::{server::Server, options::DBOption};
use std::time::Duration;
use tokio::time::sleep;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
// Helper function to start a test server with clean data directory
async fn start_test_server(test_name: &str) -> (Server, u16) {
use std::sync::atomic::{AtomicU16, Ordering};
static PORT_COUNTER: AtomicU16 = AtomicU16::new(17000);
// Get a unique port for this test
let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst);
let test_dir = format!("/tmp/herodb_test_{}", test_name);
// Clean up any existing test data
let _ = std::fs::remove_dir_all(&test_dir);
std::fs::create_dir_all(&test_dir).unwrap();
let option = DBOption {
dir: test_dir,
port,
debug: true,
};
let server = Server::new(option).await;
(server, port)
}
// Helper function to send Redis command and get response
async fn send_redis_command(port: u16, command: &str) -> String {
let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port)).await.unwrap();
stream.write_all(command.as_bytes()).await.unwrap();
let mut buffer = [0; 1024];
let n = stream.read(&mut buffer).await.unwrap();
String::from_utf8_lossy(&buffer[..n]).to_string()
}
#[tokio::test]
async fn test_basic_redis_functionality() {
let (mut server, port) = start_test_server("basic").await;
// Start server in background with timeout
let server_handle = tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.unwrap();
// Accept only a few connections for testing
for _ in 0..10 {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
}
}
});
sleep(Duration::from_millis(100)).await;
// Test PING
let response = send_redis_command(port, "*1\r\n$4\r\nPING\r\n").await;
assert!(response.contains("PONG"));
// Test SET
let response = send_redis_command(port, "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n").await;
assert!(response.contains("OK"));
// Test GET
let response = send_redis_command(port, "*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n").await;
assert!(response.contains("value"));
// Test HSET
let response = send_redis_command(port, "*4\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$5\r\nfield\r\n$5\r\nvalue\r\n").await;
assert!(response.contains("1"));
// Test HGET
let response = send_redis_command(port, "*3\r\n$4\r\nHGET\r\n$4\r\nhash\r\n$5\r\nfield\r\n").await;
assert!(response.contains("value"));
// Test EXISTS
let response = send_redis_command(port, "*2\r\n$6\r\nEXISTS\r\n$3\r\nkey\r\n").await;
assert!(response.contains("1"));
// Test TTL
let response = send_redis_command(port, "*2\r\n$3\r\nTTL\r\n$3\r\nkey\r\n").await;
assert!(response.contains("-1")); // No expiration
// Test TYPE
let response = send_redis_command(port, "*2\r\n$4\r\nTYPE\r\n$3\r\nkey\r\n").await;
assert!(response.contains("string"));
// Test QUIT to close connection gracefully
let response = send_redis_command(port, "*1\r\n$4\r\nQUIT\r\n").await;
assert!(response.contains("OK"));
// Stop the server
server_handle.abort();
println!("✅ All basic Redis functionality tests passed!");
}
#[tokio::test]
async fn test_hash_operations() {
let (mut server, port) = start_test_server("hash_ops").await;
// Start server in background with timeout
let server_handle = tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.unwrap();
// Accept only a few connections for testing
for _ in 0..5 {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
}
}
});
sleep(Duration::from_millis(100)).await;
// Test HSET multiple fields
let response = send_redis_command(port, "*6\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n$6\r\nvalue1\r\n$6\r\nfield2\r\n$6\r\nvalue2\r\n").await;
assert!(response.contains("2")); // 2 new fields
// Test HGETALL
let response = send_redis_command(port, "*2\r\n$7\r\nHGETALL\r\n$4\r\nhash\r\n").await;
assert!(response.contains("field1"));
assert!(response.contains("value1"));
assert!(response.contains("field2"));
assert!(response.contains("value2"));
// Test HEXISTS
let response = send_redis_command(port, "*3\r\n$7\r\nHEXISTS\r\n$4\r\nhash\r\n$6\r\nfield1\r\n").await;
assert!(response.contains("1"));
// Test HLEN
let response = send_redis_command(port, "*2\r\n$4\r\nHLEN\r\n$4\r\nhash\r\n").await;
assert!(response.contains("2"));
// Test HSCAN
let response = send_redis_command(port, "*6\r\n$5\r\nHSCAN\r\n$4\r\nhash\r\n$1\r\n0\r\n$5\r\nMATCH\r\n$1\r\n*\r\n$5\r\nCOUNT\r\n$2\r\n10\r\n").await;
assert!(response.contains("*2\r\n$1\r\n0\r\n*4\r\n$6\r\nfield1\r\n$6\r\nvalue1\r\n$6\r\nfield2\r\n$6\r\nvalue2\r\n"));
// Stop the server
server_handle.abort();
println!("✅ All hash operations tests passed!");
}
#[tokio::test]
async fn test_transaction_operations() {
let (mut server, port) = start_test_server("transactions").await;
// Start server in background with timeout
let server_handle = tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.unwrap();
// Accept only a few connections for testing
for _ in 0..5 {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
}
}
});
sleep(Duration::from_millis(100)).await;
// Use a single connection for the transaction
let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port)).await.unwrap();
// Test MULTI
stream.write_all("*1\r\n$5\r\nMULTI\r\n".as_bytes()).await.unwrap();
let mut buffer = [0; 1024];
let n = stream.read(&mut buffer).await.unwrap();
let response = String::from_utf8_lossy(&buffer[..n]);
assert!(response.contains("OK"));
// Test queued commands
stream.write_all("*3\r\n$3\r\nSET\r\n$4\r\nkey1\r\n$6\r\nvalue1\r\n".as_bytes()).await.unwrap();
let n = stream.read(&mut buffer).await.unwrap();
let response = String::from_utf8_lossy(&buffer[..n]);
assert!(response.contains("QUEUED"));
stream.write_all("*3\r\n$3\r\nSET\r\n$4\r\nkey2\r\n$6\r\nvalue2\r\n".as_bytes()).await.unwrap();
let n = stream.read(&mut buffer).await.unwrap();
let response = String::from_utf8_lossy(&buffer[..n]);
assert!(response.contains("QUEUED"));
// Test EXEC
stream.write_all("*1\r\n$4\r\nEXEC\r\n".as_bytes()).await.unwrap();
let n = stream.read(&mut buffer).await.unwrap();
let response = String::from_utf8_lossy(&buffer[..n]);
assert!(response.contains("OK")); // Should contain array of OK responses
// Verify commands were executed
let response = send_redis_command(port, "*2\r\n$3\r\nGET\r\n$4\r\nkey1\r\n").await;
assert!(response.contains("value1"));
// Stop the server
server_handle.abort();
println!("✅ All transaction operations tests passed!");
}

181
tests/simple_redis_test.rs Normal file
View File

@ -0,0 +1,181 @@
use redis_rs::{server::Server, options::DBOption};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::sleep;
// Helper function to start a test server with clean data directory
async fn start_test_server(test_name: &str) -> (Server, u16) {
use std::sync::atomic::{AtomicU16, Ordering};
static PORT_COUNTER: AtomicU16 = AtomicU16::new(16500);
let port = PORT_COUNTER.fetch_add(1, Ordering::SeqCst);
let test_dir = format!("/tmp/herodb_simple_test_{}", test_name);
// Clean up any existing test data
let _ = std::fs::remove_dir_all(&test_dir);
std::fs::create_dir_all(&test_dir).unwrap();
let option = DBOption {
dir: test_dir,
port,
debug: false,
};
let server = Server::new(option).await;
(server, port)
}
// Helper function to send command and get response
async fn send_command(stream: &mut TcpStream, command: &str) -> String {
stream.write_all(command.as_bytes()).await.unwrap();
let mut buffer = [0; 1024];
let n = stream.read(&mut buffer).await.unwrap();
String::from_utf8_lossy(&buffer[..n]).to_string()
}
// Helper function to connect to the test server
async fn connect_to_server(port: u16) -> TcpStream {
let mut attempts = 0;
loop {
match TcpStream::connect(format!("127.0.0.1:{}", port)).await {
Ok(stream) => return stream,
Err(_) if attempts < 10 => {
attempts += 1;
sleep(Duration::from_millis(100)).await;
}
Err(e) => panic!("Failed to connect to test server: {}", e),
}
}
}
#[tokio::test]
async fn test_basic_ping_simple() {
let (mut server, port) = start_test_server("ping").await;
// Start server in background
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.unwrap();
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
}
}
});
sleep(Duration::from_millis(200)).await;
let mut stream = connect_to_server(port).await;
let response = send_command(&mut stream, "*1\r\n$4\r\nPING\r\n").await;
assert!(response.contains("PONG"));
}
#[tokio::test]
async fn test_hset_clean_db() {
let (mut server, port) = start_test_server("hset_clean").await;
// Start server in background
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.unwrap();
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
}
}
});
sleep(Duration::from_millis(200)).await;
let mut stream = connect_to_server(port).await;
// Test HSET - should return 1 for new field
let response = send_command(&mut stream, "*4\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n$6\r\nvalue1\r\n").await;
println!("HSET response: {}", response);
assert!(response.contains("1"), "Expected HSET to return 1, got: {}", response);
// Test HGET
let response = send_command(&mut stream, "*3\r\n$4\r\nHGET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n").await;
println!("HGET response: {}", response);
assert!(response.contains("value1"));
}
#[tokio::test]
async fn test_type_command_simple() {
let (mut server, port) = start_test_server("type").await;
// Start server in background
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.unwrap();
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
}
}
});
sleep(Duration::from_millis(200)).await;
let mut stream = connect_to_server(port).await;
// Test string type
send_command(&mut stream, "*3\r\n$3\r\nSET\r\n$6\r\nstring\r\n$5\r\nvalue\r\n").await;
let response = send_command(&mut stream, "*2\r\n$4\r\nTYPE\r\n$6\r\nstring\r\n").await;
println!("TYPE string response: {}", response);
assert!(response.contains("string"));
// Test hash type
send_command(&mut stream, "*4\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$5\r\nfield\r\n$5\r\nvalue\r\n").await;
let response = send_command(&mut stream, "*2\r\n$4\r\nTYPE\r\n$4\r\nhash\r\n").await;
println!("TYPE hash response: {}", response);
assert!(response.contains("hash"));
// Test non-existent key
let response = send_command(&mut stream, "*2\r\n$4\r\nTYPE\r\n$7\r\nnoexist\r\n").await;
println!("TYPE noexist response: {}", response);
assert!(response.contains("none"), "Expected 'none' for non-existent key, got: {}", response);
}
#[tokio::test]
async fn test_hexists_simple() {
let (mut server, port) = start_test_server("hexists").await;
// Start server in background
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.unwrap();
loop {
if let Ok((stream, _)) = listener.accept().await {
let _ = server.handle(stream).await;
}
}
});
sleep(Duration::from_millis(200)).await;
let mut stream = connect_to_server(port).await;
// Set up hash
send_command(&mut stream, "*4\r\n$4\r\nHSET\r\n$4\r\nhash\r\n$6\r\nfield1\r\n$6\r\nvalue1\r\n").await;
// Test HEXISTS for existing field
let response = send_command(&mut stream, "*3\r\n$7\r\nHEXISTS\r\n$4\r\nhash\r\n$6\r\nfield1\r\n").await;
println!("HEXISTS existing field response: {}", response);
assert!(response.contains("1"));
// Test HEXISTS for non-existent field
let response = send_command(&mut stream, "*3\r\n$7\r\nHEXISTS\r\n$4\r\nhash\r\n$7\r\nnoexist\r\n").await;
println!("HEXISTS non-existent field response: {}", response);
assert!(response.contains("0"), "Expected HEXISTS to return 0 for non-existent field, got: {}", response);
}