...
This commit is contained in:
178
src/redisclient/redisclient.rs
Normal file
178
src/redisclient/redisclient.rs
Normal file
@@ -0,0 +1,178 @@
|
||||
use redis::{Client, Connection, Commands, RedisError, RedisResult, Cmd};
|
||||
use std::env;
|
||||
use std::path::Path;
|
||||
use std::sync::{Arc, Mutex, Once};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use lazy_static::lazy_static;
|
||||
|
||||
// Global Redis client instance using lazy_static
|
||||
lazy_static! {
|
||||
static ref REDIS_CLIENT: Mutex<Option<Arc<RedisClientWrapper>>> = Mutex::new(None);
|
||||
static ref INIT: Once = Once::new();
|
||||
}
|
||||
|
||||
// Wrapper for Redis client to handle connection and DB selection
|
||||
pub struct RedisClientWrapper {
|
||||
client: Client,
|
||||
connection: Mutex<Option<Connection>>,
|
||||
db: i64,
|
||||
initialized: AtomicBool,
|
||||
}
|
||||
|
||||
impl RedisClientWrapper {
|
||||
// Create a new Redis client wrapper
|
||||
fn new(client: Client, db: i64) -> Self {
|
||||
RedisClientWrapper {
|
||||
client,
|
||||
connection: Mutex::new(None),
|
||||
db,
|
||||
initialized: AtomicBool::new(false),
|
||||
}
|
||||
}
|
||||
|
||||
// Execute a command on the Redis connection
|
||||
pub fn execute<T: redis::FromRedisValue>(&self, cmd: &mut Cmd) -> RedisResult<T> {
|
||||
let mut conn_guard = self.connection.lock().unwrap();
|
||||
|
||||
// If we don't have a connection or it's not working, create a new one
|
||||
if conn_guard.is_none() || {
|
||||
if let Some(ref mut conn) = *conn_guard {
|
||||
let ping_result: RedisResult<String> = redis::cmd("PING").query(conn);
|
||||
ping_result.is_err()
|
||||
} else {
|
||||
true
|
||||
}
|
||||
} {
|
||||
*conn_guard = Some(self.client.get_connection()?);
|
||||
}
|
||||
cmd.query(&mut conn_guard.as_mut().unwrap())
|
||||
}
|
||||
|
||||
// Initialize the client (ping and select DB)
|
||||
fn initialize(&self) -> RedisResult<()> {
|
||||
if self.initialized.load(Ordering::Relaxed) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut conn = self.client.get_connection()?;
|
||||
|
||||
// Ping Redis to ensure it works
|
||||
let ping_result: String = redis::cmd("PING").query(&mut conn)?;
|
||||
if ping_result != "PONG" {
|
||||
return Err(RedisError::from((redis::ErrorKind::ResponseError, "Failed to ping Redis server")));
|
||||
}
|
||||
|
||||
// Select the database
|
||||
redis::cmd("SELECT").arg(self.db).execute(&mut conn);
|
||||
|
||||
self.initialized.store(true, Ordering::Relaxed);
|
||||
|
||||
// Store the connection
|
||||
let mut conn_guard = self.connection.lock().unwrap();
|
||||
*conn_guard = Some(conn);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// Get the Redis client instance
|
||||
pub fn get_redis_client() -> RedisResult<Arc<RedisClientWrapper>> {
|
||||
// Check if we already have a client
|
||||
{
|
||||
let guard = REDIS_CLIENT.lock().unwrap();
|
||||
if let Some(ref client) = &*guard {
|
||||
return Ok(Arc::clone(client));
|
||||
}
|
||||
}
|
||||
|
||||
// Create a new client
|
||||
let client = create_redis_client()?;
|
||||
|
||||
// Store the client globally
|
||||
{
|
||||
let mut guard = REDIS_CLIENT.lock().unwrap();
|
||||
*guard = Some(Arc::clone(&client));
|
||||
}
|
||||
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
// Create a new Redis client
|
||||
fn create_redis_client() -> RedisResult<Arc<RedisClientWrapper>> {
|
||||
// First try: Connect via Unix socket
|
||||
let home_dir = env::var("HOME").unwrap_or_else(|_| String::from("/root"));
|
||||
let socket_path = format!("{}/hero/var/myredis.sock", home_dir);
|
||||
|
||||
if Path::new(&socket_path).exists() {
|
||||
// Try to connect via Unix socket
|
||||
let socket_url = format!("unix://{}", socket_path);
|
||||
match Client::open(socket_url) {
|
||||
Ok(client) => {
|
||||
let db = get_redis_db();
|
||||
let wrapper = Arc::new(RedisClientWrapper::new(client, db));
|
||||
|
||||
// Initialize the client
|
||||
if let Err(err) = wrapper.initialize() {
|
||||
eprintln!("Socket exists at {} but connection failed: {}", socket_path, err);
|
||||
} else {
|
||||
return Ok(wrapper);
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
eprintln!("Socket exists at {} but connection failed: {}", socket_path, err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Second try: Connect via TCP to localhost
|
||||
let tcp_url = "redis://127.0.0.1/";
|
||||
match Client::open(tcp_url) {
|
||||
Ok(client) => {
|
||||
let db = get_redis_db();
|
||||
let wrapper = Arc::new(RedisClientWrapper::new(client, db));
|
||||
|
||||
// Initialize the client
|
||||
wrapper.initialize()?;
|
||||
|
||||
Ok(wrapper)
|
||||
},
|
||||
Err(err) => {
|
||||
Err(RedisError::from((
|
||||
redis::ErrorKind::IoError,
|
||||
"Failed to connect to Redis",
|
||||
format!("Could not connect via socket at {} or via TCP to localhost: {}", socket_path, err)
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get the Redis DB number from environment variable
|
||||
fn get_redis_db() -> i64 {
|
||||
env::var("REDISDB")
|
||||
.ok()
|
||||
.and_then(|db_str| db_str.parse::<i64>().ok())
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
// Reload the Redis client
|
||||
pub fn reset() -> RedisResult<()> {
|
||||
// Clear the existing client
|
||||
{
|
||||
let mut client_guard = REDIS_CLIENT.lock().unwrap();
|
||||
*client_guard = None;
|
||||
}
|
||||
|
||||
// Create a new client, only return error if it fails
|
||||
// We don't need to return the client itself
|
||||
get_redis_client()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Execute a Redis command
|
||||
pub fn execute<T>(cmd: &mut Cmd) -> RedisResult<T>
|
||||
where
|
||||
T: redis::FromRedisValue,
|
||||
{
|
||||
let client = get_redis_client()?;
|
||||
client.execute(cmd)
|
||||
}
|
Reference in New Issue
Block a user