Compare commits

...

2 Commits

Author SHA1 Message Date
9177fa4091 ... 2025-08-18 12:30:20 +02:00
51ab90c4ad ... 2025-08-16 18:24:46 +02:00
7 changed files with 176 additions and 348 deletions

View File

@ -1,267 +0,0 @@
# Build Your Own Redis in Rust
This project is to build a toy Redis-Server clone that's capable of parsing Redis protocol and handling basic Redis commands, parsing and initializing Redis from RDB file,
supporting leader-follower replication, redis streams (queue), redis batch commands in transaction.
You can find all the source code and commit history in [my github repo](https://github.com/fangpin/redis-rs).
## Main features
+ Parse Redis protocol
+ Handle basic Redis commands
+ Parse and initialize Redis from RDB file
+ Leader-follower Replication
## Prerequisites
install `redis-cli` first (an implementation of redis client for test purpose)
```sh
cargo install mini-redis
```
Learn about:
- [Redis protocoal](https://redis.io/docs/latest/develop/reference/protocol-spec)
- [RDB file format](https://rdb.fnordig.de/file_format.html)
- [Redis replication](https://redis.io/docs/management/replication/)
## Start the Redis-rs server
```sh
# start as master
cargo run -- --dir /some/db/path --dbfilename dump.rdb
# start as slave
cargo run -- --dir /some/db/path --dbfilename dump.rdb --port 6380 --replicaof "localhost 6379"
```
## Supported Commands
```sh
# basic commands
redis-cli PING
redis-cli ECHO hey
redis-cli SET foo bar
redis-cli SET foo bar px/ex 100
redis-cli GET foo
redis-cli SET foo 2
redis-cli INCR foo
redis-cli INCR missing_key
redis-cli TYPE some_key
redis-cli KEYS "*"
# leader-follower replication related commands
redis-cli CONFIG GET dbfilename
redis-cli INFO replication
# streams related commands
redis-cli XADD stream_key 1526919030474-0 temperature 36 humidity 95
redis-cli XADD stream_key 1526919030474-* temperature 37 humidity 94
redis-cli XADD stream_key "*" foo bar
## read stream
redis-cli XRANGE stream_key 0-2 0-3
## query with + -
redis-cli XRANGE some_key - 1526985054079
## query single stream using xread
redis-cli XREAD streams some_key 1526985054069-0
## query multiple stream using xread
redis-cli XREAD streams stream_key other_stream_key 0-0 0-1
## blocking reads without timeout
redis-cli XREAD block 0 streams some_key 1526985054069-0
# transactions related commands
## start a transaction and exec all queued commands in a transaction
redis-cli
> MULTI
> set foo 1
> incr foo
> exec
## start a transaction and queued commands and cancel transaction then
redis-cli
> MULTI
> set foo 1
> incr foo
> discard
```
## RDB Persistence
Get Redis-rs server config
```sh
redis-cli CONFIG GET dbfilename
```
### RDB file format overview
Here are the different sections of the [RDB file](https://rdb.fnordig.de/file_format.html), in order:
+ Header section
+ Metadata section
+ Database section
+ End of file section
#### Header section
start with some magic number
```sh
52 45 44 49 53 30 30 31 31 // Magic string + version number (ASCII): "REDIS0011".
```
#### Metadata section
contains zero or more "metadata subsections", which each specify a single metadata attribute
e.g.
```sh
FA // Indicates the start of a metadata subsection.
09 72 65 64 69 73 2D 76 65 72 // The name of the metadata attribute (string encoded): "redis-ver".
06 36 2E 30 2E 31 36 // The value of the metadata attribute (string encoded): "6.0.16".
```
#### Database section
contains zero or more "database subsections," which each describe a single database.
e.g.
```sh
FE // Indicates the start of a database subsection.
00 /* The index of the database (size encoded). Here, the index is 0. */
FB // Indicates that hash table size information follows.
03 /* The size of the hash table that stores the keys and values (size encoded). Here, the total key-value hash table size is 3. */
02 /* The size of the hash table that stores the expires of the keys (size encoded). Here, the number of keys with an expiry is 2. */
```
```sh
00 /* The 1-byte flag that specifies the values type and encoding. Here, the flag is 0, which means "string." */
06 66 6F 6F 62 61 72 // The name of the key (string encoded). Here, it's "foobar".
06 62 61 7A 71 75 78 // The value (string encoded). Here, it's "bazqux".
```
```sh
FC /* Indicates that this key ("foo") has an expire, and that the expire timestamp is expressed in milliseconds. */
15 72 E7 07 8F 01 00 00 /* The expire timestamp, expressed in Unix time, stored as an 8-byte unsigned long, in little-endian (read right-to-left). Here, the expire timestamp is 1713824559637. */
00 // Value type is string.
03 66 6F 6F // Key name is "foo".
03 62 61 72 // Value is "bar".
```
```sh
FD /* Indicates that this key ("baz") has an expire, and that the expire timestamp is expressed in seconds. */
52 ED 2A 66 /* The expire timestamp, expressed in Unix time, stored as an 4-byte unsigned integer, in little-endian (read right-to-left). Here, the expire timestamp is 1714089298. */
00 // Value type is string.
03 62 61 7A // Key name is "baz".
03 71 75 78 // Value is "qux".
```
In summary,
- Optional expire information (one of the following):
- Timestamp in seconds:
- FD
- Expire timestamp in seconds (4-byte unsigned integer)
- Timestamp in milliseconds:
- FC
- Expire timestamp in milliseconds (8-byte unsigned long)
- Value type (1-byte flag)
- Key (string encoded)
- Value (encoding depends on value type)
#### End of file section
```sh
FF /* Indicates that the file is ending, and that the checksum follows. */
89 3b b7 4e f8 0f 77 19 // An 8-byte CRC64 checksum of the entire file.
```
#### Size encoding
```sh
/* If the first two bits are 0b00:
The size is the remaining 6 bits of the byte.
In this example, the size is 10: */
0A
00001010
/* If the first two bits are 0b01:
The size is the next 14 bits
(remaining 6 bits in the first byte, combined with the next byte),
in big-endian (read left-to-right).
In this example, the size is 700: */
42 BC
01000010 10111100
/* If the first two bits are 0b10:
Ignore the remaining 6 bits of the first byte.
The size is the next 4 bytes, in big-endian (read left-to-right).
In this example, the size is 17000: */
80 00 00 42 68
10000000 00000000 00000000 01000010 01101000
/* If the first two bits are 0b11:
The remaining 6 bits specify a type of string encoding.
See string encoding section. */
```
#### String encoding
+ The size of the string (size encoded).
+ The string.
```sh
/* The 0x0D size specifies that the string is 13 characters long. The remaining characters spell out "Hello, World!". */
0D 48 65 6C 6C 6F 2C 20 57 6F 72 6C 64 21
```
For sizes that begin with 0b11, the remaining 6 bits indicate a type of string format:
```sh
/* The 0xC0 size indicates the string is an 8-bit integer. In this example, the string is "123". */
C0 7B
/* The 0xC1 size indicates the string is a 16-bit integer. The remaining bytes are in little-endian (read right-to-left). In this example, the string is "12345". */
C1 39 30
/* The 0xC2 size indicates the string is a 32-bit integer. The remaining bytes are in little-endian (read right-to-left), In this example, the string is "1234567". */
C2 87 D6 12 00
/* The 0xC3 size indicates that the string is compressed with the LZF algorithm. You will not encounter LZF-compressed strings in this challenge. */
C3 ...
```
## Replication
Redis server [leader-follower replication](https://redis.io/docs/management/replication/).
Run multiple Redis servers with one acting as the "master" and the others as "replicas". Changes made to the master will be automatically replicated to replicas.
### Send Handshake (follower -> master)
1. When the follower starts, it will send a PING command to the master as RESP Array.
2. Then 2 REPLCONF (replication config) commands are sent to master from follower to communicate the port and the sync protocol. One is *REPLCONF listening-port <PORT>* and the other is *REPLCONF capa psync2*. psnync2 is an example sync protocol supported in this project.
3. The follower sends the *PSYNC* command to master with replication id and offset to start the replication process.
### Receive Handshake (master -> follower)
1. Response a PONG message to follower.
2. Response an OK message to follower for both REPLCONF commands.
3. Response a *+FULLRESYNC <REPL_ID> 0\r\n* to follower with the replication id and offset.
### RDB file transfer
When the follower starts, it sends a *PSYNC ? -1* command to tell master that it doesn't have any data yet, and needs a full resync.
Then the master send a *FULLRESYNC* response to the follower as an acknowledgement.
Finally, the master send the RDB file to represent its current state to the follower. The follower should load the RDB file received to the memory, replacing its current state.
### Receive write commands (master -> follower)
The master sends following write commands to the follower with the offset info.
The sending is to reuse the same TCP connection of handshake and RDB file transfer.
As the all the commands are encoded as RESP Array just like a normal client command, so the follower could reuse the same logic to handler the replicate commands from master. The only difference is the commands are coming from the master and no need response back.
## Streams
A stream is identified by a key, and it contains multiple entries.
Each entry consists of one or more key-value pairs, and is assigned a unique ID.
[More about redis streams](https://redis.io/docs/latest/develop/data-types/streams/)
[Radix tree](https://en.wikipedia.org/wiki/Radix_tree)
It looks like a list of key-value pairs.
```sh
entries:
- id: 1526985054069-0 # (ID of the first entry)
temperature: 36 # (A key value pair in the first entry)
humidity: 95 # (Another key value pair in the first entry)
- id: 1526985054079-0 # (ID of the second entry)
temperature: 37 # (A key value pair in the first entry)
humidity: 94 # (Another key value pair in the first entry)
# ... (and so on)
```
Examples of Redis stream use cases include:
- Event sourcing (e.g., tracking user actions, clicks, etc.)
- Sensor monitoring (e.g., readings from devices in the field)
- Notifications (e.g., storing a record of each user's notifications in a separate stream)
## Transaction
When *MULTI* command is called in a connection, redis just queued all following commands until *EXEC* or *DISCARD* command is called.
*EXEC* command will execute all queued commands and return an array representation of all execution result (including), instead the *DISCARD* command just clear all queued commands.
The transactions among each client connection are independent.

9
herodb/build.sh Executable file
View File

@ -0,0 +1,9 @@
#!/bin/bash
set -euo pipefail
export SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
echo "I am in $SCRIPT_DIR"
cd "$SCRIPT_DIR"
cargo build

View File

@ -12,7 +12,6 @@ echo ""
echo "2⃣ Running Comprehensive Redis Integration Tests (13 tests)..." echo "2⃣ Running Comprehensive Redis Integration Tests (13 tests)..."
echo "----------------------------------------------------------------" echo "----------------------------------------------------------------"
cargo test -p herodb --test redis_integration_tests -- --nocapture cargo test -p herodb --test redis_integration_tests -- --nocapture
cargo test -p herodb --test redis_basic_client -- --nocapture
cargo test -p herodb --test debug_hset -- --nocapture cargo test -p herodb --test debug_hset -- --nocapture
cargo test -p herodb --test debug_hset_simple -- --nocapture cargo test -p herodb --test debug_hset_simple -- --nocapture

View File

@ -12,20 +12,30 @@ impl Storage {
let mut types_table = write_txn.open_table(TYPES_TABLE)?; let mut types_table = write_txn.open_table(TYPES_TABLE)?;
let mut hashes_table = write_txn.open_table(HASHES_TABLE)?; let mut hashes_table = write_txn.open_table(HASHES_TABLE)?;
// Set the type to hash let key_type = {
types_table.insert(key, "hash")?; let access_guard = types_table.get(key)?;
access_guard.map(|v| v.value().to_string())
for (field, value) in pairs { };
// Check if field already exists
let exists = hashes_table.get((key, field.as_str()))?.is_some(); match key_type.as_deref() {
Some("hash") | None => { // Proceed if hash or new key
// Encrypt the value before storing // Set the type to hash (only if new key or existing hash)
let encrypted = self.encrypt_if_needed(value.as_bytes())?; types_table.insert(key, "hash")?;
hashes_table.insert((key, field.as_str()), encrypted.as_slice())?;
for (field, value) in pairs {
if !exists { // Check if field already exists
new_fields += 1; let exists = hashes_table.get((key, field.as_str()))?.is_some();
// Encrypt the value before storing
let encrypted = self.encrypt_if_needed(value.as_bytes())?;
hashes_table.insert((key, field.as_str()), encrypted.as_slice())?;
if !exists {
new_fields += 1;
}
}
} }
Some(_) => return Err(DBError("WRONGTYPE Operation against a key holding the wrong kind of value".to_string())),
} }
} }
@ -38,8 +48,10 @@ impl Storage {
let read_txn = self.db.begin_read()?; let read_txn = self.db.begin_read()?;
let types_table = read_txn.open_table(TYPES_TABLE)?; let types_table = read_txn.open_table(TYPES_TABLE)?;
match types_table.get(key)? { let key_type = types_table.get(key)?.map(|v| v.value().to_string());
Some(type_val) if type_val.value() == "hash" => {
match key_type.as_deref() {
Some("hash") => {
let hashes_table = read_txn.open_table(HASHES_TABLE)?; let hashes_table = read_txn.open_table(HASHES_TABLE)?;
match hashes_table.get((key, field))? { match hashes_table.get((key, field))? {
Some(data) => { Some(data) => {
@ -50,7 +62,8 @@ impl Storage {
None => Ok(None), None => Ok(None),
} }
} }
_ => Ok(None), Some(_) => Err(DBError("WRONGTYPE Operation against a key holding the wrong kind of value".to_string())),
None => Ok(None),
} }
} }
@ -58,9 +71,13 @@ impl Storage {
pub fn hgetall(&self, key: &str) -> Result<Vec<(String, String)>, DBError> { pub fn hgetall(&self, key: &str) -> Result<Vec<(String, String)>, DBError> {
let read_txn = self.db.begin_read()?; let read_txn = self.db.begin_read()?;
let types_table = read_txn.open_table(TYPES_TABLE)?; let types_table = read_txn.open_table(TYPES_TABLE)?;
let key_type = {
match types_table.get(key)? { let access_guard = types_table.get(key)?;
Some(type_val) if type_val.value() == "hash" => { access_guard.map(|v| v.value().to_string())
};
match key_type.as_deref() {
Some("hash") => {
let hashes_table = read_txn.open_table(HASHES_TABLE)?; let hashes_table = read_txn.open_table(HASHES_TABLE)?;
let mut result = Vec::new(); let mut result = Vec::new();
@ -77,7 +94,8 @@ impl Storage {
Ok(result) Ok(result)
} }
_ => Ok(Vec::new()), Some(_) => Err(DBError("WRONGTYPE Operation against a key holding the wrong kind of value".to_string())),
None => Ok(Vec::new()),
} }
} }
@ -86,41 +104,42 @@ impl Storage {
let mut deleted = 0i64; let mut deleted = 0i64;
// First check if key exists and is a hash // First check if key exists and is a hash
let is_hash = { let key_type = {
let types_table = write_txn.open_table(TYPES_TABLE)?; let types_table = write_txn.open_table(TYPES_TABLE)?;
let result = match types_table.get(key)? { let access_guard = types_table.get(key)?;
Some(type_val) => type_val.value() == "hash", access_guard.map(|v| v.value().to_string())
None => false,
};
result
}; };
if is_hash { match key_type.as_deref() {
let mut hashes_table = write_txn.open_table(HASHES_TABLE)?; Some("hash") => {
let mut hashes_table = write_txn.open_table(HASHES_TABLE)?;
for field in fields {
if hashes_table.remove((key, field.as_str()))?.is_some() { for field in fields {
deleted += 1; if hashes_table.remove((key, field.as_str()))?.is_some() {
deleted += 1;
}
}
// Check if hash is now empty and remove type if so
let mut has_fields = false;
let mut iter = hashes_table.iter()?;
while let Some(entry) = iter.next() {
let entry = entry?;
let (hash_key, _) = entry.0.value();
if hash_key == key {
has_fields = true;
break;
}
}
drop(iter);
if !has_fields {
let mut types_table = write_txn.open_table(TYPES_TABLE)?;
types_table.remove(key)?;
} }
} }
Some(_) => return Err(DBError("WRONGTYPE Operation against a key holding the wrong kind of value".to_string())),
// Check if hash is now empty and remove type if so None => {} // Key does not exist, nothing to delete, return 0 deleted
let mut has_fields = false;
let mut iter = hashes_table.iter()?;
while let Some(entry) = iter.next() {
let entry = entry?;
let (hash_key, _) = entry.0.value();
if hash_key == key {
has_fields = true;
break;
}
}
drop(iter);
if !has_fields {
let mut types_table = write_txn.open_table(TYPES_TABLE)?;
types_table.remove(key)?;
}
} }
write_txn.commit()?; write_txn.commit()?;
@ -131,12 +150,19 @@ impl Storage {
let read_txn = self.db.begin_read()?; let read_txn = self.db.begin_read()?;
let types_table = read_txn.open_table(TYPES_TABLE)?; let types_table = read_txn.open_table(TYPES_TABLE)?;
match types_table.get(key)? { let types_table = read_txn.open_table(TYPES_TABLE)?;
Some(type_val) if type_val.value() == "hash" => { let key_type = {
let access_guard = types_table.get(key)?;
access_guard.map(|v| v.value().to_string())
};
match key_type.as_deref() {
Some("hash") => {
let hashes_table = read_txn.open_table(HASHES_TABLE)?; let hashes_table = read_txn.open_table(HASHES_TABLE)?;
Ok(hashes_table.get((key, field))?.is_some()) Ok(hashes_table.get((key, field))?.is_some())
} }
_ => Ok(false), Some(_) => Err(DBError("WRONGTYPE Operation against a key holding the wrong kind of value".to_string())),
None => Ok(false),
} }
} }
@ -144,8 +170,14 @@ impl Storage {
let read_txn = self.db.begin_read()?; let read_txn = self.db.begin_read()?;
let types_table = read_txn.open_table(TYPES_TABLE)?; let types_table = read_txn.open_table(TYPES_TABLE)?;
match types_table.get(key)? { let types_table = read_txn.open_table(TYPES_TABLE)?;
Some(type_val) if type_val.value() == "hash" => { let key_type = {
let access_guard = types_table.get(key)?;
access_guard.map(|v| v.value().to_string())
};
match key_type.as_deref() {
Some("hash") => {
let hashes_table = read_txn.open_table(HASHES_TABLE)?; let hashes_table = read_txn.open_table(HASHES_TABLE)?;
let mut result = Vec::new(); let mut result = Vec::new();
@ -160,7 +192,8 @@ impl Storage {
Ok(result) Ok(result)
} }
_ => Ok(Vec::new()), Some(_) => Err(DBError("WRONGTYPE Operation against a key holding the wrong kind of value".to_string())),
None => Ok(Vec::new()),
} }
} }
@ -169,8 +202,14 @@ impl Storage {
let read_txn = self.db.begin_read()?; let read_txn = self.db.begin_read()?;
let types_table = read_txn.open_table(TYPES_TABLE)?; let types_table = read_txn.open_table(TYPES_TABLE)?;
match types_table.get(key)? { let types_table = read_txn.open_table(TYPES_TABLE)?;
Some(type_val) if type_val.value() == "hash" => { let key_type = {
let access_guard = types_table.get(key)?;
access_guard.map(|v| v.value().to_string())
};
match key_type.as_deref() {
Some("hash") => {
let hashes_table = read_txn.open_table(HASHES_TABLE)?; let hashes_table = read_txn.open_table(HASHES_TABLE)?;
let mut result = Vec::new(); let mut result = Vec::new();
@ -187,7 +226,8 @@ impl Storage {
Ok(result) Ok(result)
} }
_ => Ok(Vec::new()), Some(_) => Err(DBError("WRONGTYPE Operation against a key holding the wrong kind of value".to_string())),
None => Ok(Vec::new()),
} }
} }
@ -195,8 +235,14 @@ impl Storage {
let read_txn = self.db.begin_read()?; let read_txn = self.db.begin_read()?;
let types_table = read_txn.open_table(TYPES_TABLE)?; let types_table = read_txn.open_table(TYPES_TABLE)?;
match types_table.get(key)? { let types_table = read_txn.open_table(TYPES_TABLE)?;
Some(type_val) if type_val.value() == "hash" => { let key_type = {
let access_guard = types_table.get(key)?;
access_guard.map(|v| v.value().to_string())
};
match key_type.as_deref() {
Some("hash") => {
let hashes_table = read_txn.open_table(HASHES_TABLE)?; let hashes_table = read_txn.open_table(HASHES_TABLE)?;
let mut count = 0i64; let mut count = 0i64;
@ -211,7 +257,8 @@ impl Storage {
Ok(count) Ok(count)
} }
_ => Ok(0), Some(_) => Err(DBError("WRONGTYPE Operation against a key holding the wrong kind of value".to_string())),
None => Ok(0),
} }
} }
@ -220,8 +267,14 @@ impl Storage {
let read_txn = self.db.begin_read()?; let read_txn = self.db.begin_read()?;
let types_table = read_txn.open_table(TYPES_TABLE)?; let types_table = read_txn.open_table(TYPES_TABLE)?;
match types_table.get(key)? { let types_table = read_txn.open_table(TYPES_TABLE)?;
Some(type_val) if type_val.value() == "hash" => { let key_type = {
let access_guard = types_table.get(key)?;
access_guard.map(|v| v.value().to_string())
};
match key_type.as_deref() {
Some("hash") => {
let hashes_table = read_txn.open_table(HASHES_TABLE)?; let hashes_table = read_txn.open_table(HASHES_TABLE)?;
let mut result = Vec::new(); let mut result = Vec::new();
@ -238,7 +291,8 @@ impl Storage {
Ok(result) Ok(result)
} }
_ => Ok(fields.into_iter().map(|_| None).collect()), Some(_) => Err(DBError("WRONGTYPE Operation against a key holding the wrong kind of value".to_string())),
None => Ok(fields.into_iter().map(|_| None).collect()),
} }
} }
@ -251,15 +305,25 @@ impl Storage {
let mut types_table = write_txn.open_table(TYPES_TABLE)?; let mut types_table = write_txn.open_table(TYPES_TABLE)?;
let mut hashes_table = write_txn.open_table(HASHES_TABLE)?; let mut hashes_table = write_txn.open_table(HASHES_TABLE)?;
// Check if field already exists let key_type = {
if hashes_table.get((key, field))?.is_none() { let access_guard = types_table.get(key)?;
// Set the type to hash access_guard.map(|v| v.value().to_string())
types_table.insert(key, "hash")?; };
// Encrypt the value before storing match key_type.as_deref() {
let encrypted = self.encrypt_if_needed(value.as_bytes())?; Some("hash") | None => { // Proceed if hash or new key
hashes_table.insert((key, field), encrypted.as_slice())?; // Check if field already exists
result = true; if hashes_table.get((key, field))?.is_none() {
// Set the type to hash (only if new key or existing hash)
types_table.insert(key, "hash")?;
// Encrypt the value before storing
let encrypted = self.encrypt_if_needed(value.as_bytes())?;
hashes_table.insert((key, field), encrypted.as_slice())?;
result = true;
}
}
Some(_) => return Err(DBError("WRONGTYPE Operation against a key holding the wrong kind of value".to_string())),
} }
} }
@ -272,8 +336,14 @@ impl Storage {
let read_txn = self.db.begin_read()?; let read_txn = self.db.begin_read()?;
let types_table = read_txn.open_table(TYPES_TABLE)?; let types_table = read_txn.open_table(TYPES_TABLE)?;
match types_table.get(key)? { let types_table = read_txn.open_table(TYPES_TABLE)?;
Some(type_val) if type_val.value() == "hash" => { let key_type = {
let access_guard = types_table.get(key)?;
access_guard.map(|v| v.value().to_string())
};
match key_type.as_deref() {
Some("hash") => {
let hashes_table = read_txn.open_table(HASHES_TABLE)?; let hashes_table = read_txn.open_table(HASHES_TABLE)?;
let mut result = Vec::new(); let mut result = Vec::new();
let mut current_cursor = 0u64; let mut current_cursor = 0u64;
@ -312,7 +382,8 @@ impl Storage {
let next_cursor = if result.len() < limit { 0 } else { current_cursor }; let next_cursor = if result.len() < limit { 0 } else { current_cursor };
Ok((next_cursor, result)) Ok((next_cursor, result))
} }
_ => Ok((0, Vec::new())), Some(_) => Err(DBError("WRONGTYPE Operation against a key holding the wrong kind of value".to_string())),
None => Ok((0, Vec::new())),
} }
} }
} }

View File

@ -25,7 +25,7 @@ impl Storage {
}; };
// Add elements to the front (left) // Add elements to the front (left)
for element in elements.into_iter().rev() { for element in elements.into_iter() {
list.insert(0, element); list.insert(0, element);
} }

View File

@ -93,9 +93,16 @@ async fn test_basic_redis_functionality() {
assert!(response.contains("string")); assert!(response.contains("string"));
// Test QUIT to close connection gracefully // Test QUIT to close connection gracefully
let response = send_redis_command(port, "*1\r\n$4\r\nQUIT\r\n").await; let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port)).await.unwrap();
stream.write_all("*1\r\n$4\r\nQUIT\r\n".as_bytes()).await.unwrap();
let mut buffer = [0; 1024];
let n = stream.read(&mut buffer).await.unwrap();
let response = String::from_utf8_lossy(&buffer[..n]);
assert!(response.contains("OK")); assert!(response.contains("OK"));
// Ensure the stream is closed
stream.shutdown().await.unwrap();
// Stop the server // Stop the server
server_handle.abort(); server_handle.abort();
@ -149,6 +156,8 @@ async fn test_hash_operations() {
assert!(response.contains("value2")); assert!(response.contains("value2"));
// Stop the server // Stop the server
// For hash operations, we don't have a persistent stream, so we'll just abort the server.
// The server should handle closing its connections.
server_handle.abort(); server_handle.abort();
println!("✅ All hash operations tests passed!"); println!("✅ All hash operations tests passed!");
@ -202,9 +211,16 @@ async fn test_transaction_operations() {
assert!(response.contains("OK")); // Should contain array of OK responses assert!(response.contains("OK")); // Should contain array of OK responses
// Verify commands were executed // Verify commands were executed
let response = send_redis_command(port, "*2\r\n$3\r\nGET\r\n$4\r\nkey1\r\n").await; stream.write_all("*2\r\n$3\r\nGET\r\n$4\r\nkey1\r\n".as_bytes()).await.unwrap();
let n = stream.read(&mut buffer).await.unwrap();
let response = String::from_utf8_lossy(&buffer[..n]);
assert!(response.contains("value1")); assert!(response.contains("value1"));
stream.write_all("*2\r\n$3\r\nGET\r\n$4\r\nkey2\r\n".as_bytes()).await.unwrap();
let n = stream.read(&mut buffer).await.unwrap();
let response = String::from_utf8_lossy(&buffer[..n]);
assert!(response.contains("value2"));
// Stop the server // Stop the server
server_handle.abort(); server_handle.abort();