Add postgres code
- Set is currently working - Other methods to test Signed-off-by: Lee Smet <lee.smet@hotmail.com>
This commit is contained in:
@@ -4,6 +4,7 @@ use heromodels_core::{Index, Model};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub mod hero;
|
||||
pub mod postgres;
|
||||
|
||||
pub trait Db {
|
||||
/// Error type returned by database operations.
|
||||
|
400
heromodels/src/db/postgres.rs
Normal file
400
heromodels/src/db/postgres.rs
Normal file
@@ -0,0 +1,400 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use heromodels_core::Model;
|
||||
use postgres::{Client, NoTls};
|
||||
use r2d2::Pool;
|
||||
use r2d2_postgres::PostgresConnectionManager;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Postgres {
|
||||
pool: Pool<PostgresConnectionManager<NoTls>>,
|
||||
}
|
||||
|
||||
/// Configuration used to connect to a postgres server. All values are optional, if they are
|
||||
/// [Option::None], the postgres defaults are used.
|
||||
#[derive(Default)]
|
||||
pub struct Config {
|
||||
pub user: Option<String>,
|
||||
pub password: Option<String>,
|
||||
pub dbname: Option<String>,
|
||||
pub host: Option<String>,
|
||||
pub port: Option<u16>,
|
||||
}
|
||||
|
||||
impl Config {
|
||||
/// Create a new empty config
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Set the user value in the config
|
||||
pub fn user(mut self, user: Option<String>) -> Self {
|
||||
self.user = user;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the password value in the config
|
||||
pub fn password(mut self, password: Option<String>) -> Self {
|
||||
self.password = password;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the dbname value in the config
|
||||
pub fn dbname(mut self, dbname: Option<String>) -> Self {
|
||||
self.dbname = dbname;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the host value in the config
|
||||
pub fn host(mut self, host: Option<String>) -> Self {
|
||||
self.host = host;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the port value in the config
|
||||
pub fn port(mut self, port: Option<u16>) -> Self {
|
||||
self.port = port;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
/// An error communicating with the postgres server
|
||||
Postgres(postgres::error::Error),
|
||||
/// An error originating from the connection pool
|
||||
ConnectionPool(r2d2::Error),
|
||||
/// An error encoding to or decoding from json
|
||||
Json(serde_json::Error),
|
||||
/// An error encoding to or decoding from jsonb
|
||||
JsonB(jsonb::Error),
|
||||
/// We tried to insert a value but the row was not returned
|
||||
FailedInsert,
|
||||
/// We tried to update a value but we didn't get the expected 1 modified row
|
||||
FailedUpdate(usize),
|
||||
/// We tried to query the existence of a table but it failed
|
||||
TableExistenceQuery,
|
||||
}
|
||||
|
||||
impl Postgres {
|
||||
/// Create a new connection to a postgres instance.
|
||||
pub fn new(config: Config) -> Result<Self, Error> {
|
||||
let mut cfg = Client::configure();
|
||||
cfg.user(config.user.unwrap_or_default().as_ref())
|
||||
.password(config.password.unwrap_or_default())
|
||||
.dbname(config.dbname.unwrap_or_default().as_ref())
|
||||
.host(config.host.unwrap_or_default().as_ref())
|
||||
.port(config.port.unwrap_or_default());
|
||||
|
||||
// No TLS support for now
|
||||
// let client = cfg.connect(postgres::tls::NoTls)?;
|
||||
let manager = PostgresConnectionManager::new(cfg, NoTls);
|
||||
|
||||
let pool = Pool::builder()
|
||||
.connection_timeout(Duration::from_secs(1))
|
||||
.max_size(5)
|
||||
.build(manager)?;
|
||||
|
||||
Ok(Postgres { pool })
|
||||
}
|
||||
|
||||
/// Helper method which generates a table name for a model, to alleviate some keyword
|
||||
/// conflicts.
|
||||
fn collection_name<M: Model>() -> String {
|
||||
format!("model_{}", M::db_prefix())
|
||||
}
|
||||
}
|
||||
|
||||
impl super::Db for Postgres {
|
||||
type Error = Error;
|
||||
|
||||
fn collection<M: heromodels_core::Model>(
|
||||
&self,
|
||||
) -> Result<impl super::Collection<&str, M>, super::Error<Self::Error>> {
|
||||
// Check if the table exists, if not create it with proper indexes
|
||||
let mut con = self.pool.get().map_err(Error::from)?;
|
||||
|
||||
let row = con
|
||||
.query_one(
|
||||
r#"SELECT EXISTS (
|
||||
SELECT 1
|
||||
FROM information_schema.tables
|
||||
WHERE table_name = $1
|
||||
) AS table_existence;"#,
|
||||
&[&Self::collection_name::<M>()],
|
||||
)
|
||||
.map_err(Error::from)?;
|
||||
|
||||
let exists: bool = row.get("table_existence");
|
||||
|
||||
// If table does not exist set it up
|
||||
if !exists {
|
||||
eprintln!(
|
||||
"Table {} does not exist, create it now",
|
||||
Self::collection_name::<M>()
|
||||
);
|
||||
// Use a transaction here so if index creation failed the table is also not created.
|
||||
let mut tx = con.transaction().map_err(Error::from)?;
|
||||
|
||||
tx.execute(
|
||||
&format!(
|
||||
"CREATE TABLE {} (key SERIAL PRIMARY KEY, value JSONB NOT NULL);",
|
||||
Self::collection_name::<M>(),
|
||||
),
|
||||
&[],
|
||||
)
|
||||
.map_err(Error::from)?;
|
||||
|
||||
// TODO: Create indexes
|
||||
|
||||
tx.commit().map_err(Error::from)?;
|
||||
}
|
||||
|
||||
Ok(self.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl<M> super::Collection<&str, M> for Postgres
|
||||
where
|
||||
M: Model,
|
||||
{
|
||||
type Error = Error;
|
||||
|
||||
fn get<I, Q>(&self, key: &Q) -> Result<Vec<M>, super::Error<Self::Error>>
|
||||
where
|
||||
I: heromodels_core::Index<Model = M>,
|
||||
I::Key: std::borrow::Borrow<Q>,
|
||||
Q: ToString + ?Sized,
|
||||
{
|
||||
let mut con = self.pool.get().map_err(Error::from)?;
|
||||
|
||||
Ok(con
|
||||
.query(
|
||||
&format!(
|
||||
"SELECT (value) FROM {} WHERE value->$1 = $2;",
|
||||
Self::collection_name::<M>(),
|
||||
),
|
||||
&[&I::key(), &key.to_string()],
|
||||
)
|
||||
.map_err(Error::from)?
|
||||
.into_iter()
|
||||
.map(|row| serde_json::from_str::<M>(row.get("value")).map_err(Error::from))
|
||||
.collect::<Result<Vec<M>, _>>()?)
|
||||
}
|
||||
|
||||
fn get_by_id(&self, id: u32) -> Result<Option<M>, super::Error<Self::Error>> {
|
||||
let mut con = self.pool.get().map_err(Error::from)?;
|
||||
|
||||
if let Some(row) = con
|
||||
.query(
|
||||
&format!(
|
||||
"SELECT (value) FROM {} WHERE key = $1;",
|
||||
Self::collection_name::<M>()
|
||||
),
|
||||
&[&id],
|
||||
)
|
||||
.map_err(Error::from)?
|
||||
.into_iter()
|
||||
.next()
|
||||
{
|
||||
Ok(Some(
|
||||
serde_json::from_str::<M>(row.get("value")).map_err(Error::from)?,
|
||||
))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn set(&self, value: &M) -> Result<(u32, M), super::Error<Self::Error>> {
|
||||
let mut con = self.pool.get().map_err(Error::from)?;
|
||||
|
||||
// let ser_val = serde_json::to_string(&value).map_err(Error::from)?;
|
||||
let ser_val = jsonb::to_owned_jsonb(&value).map_err(Error::from)?;
|
||||
|
||||
if value.get_id() == 0 {
|
||||
// NOTE: We perform a query here since we want the returned value which has the updated ID
|
||||
let Some(row) = con
|
||||
.query(
|
||||
&format!(
|
||||
"INSERT INTO {} (value) VALUES ($1) RETURNING key, value;",
|
||||
Self::collection_name::<M>()
|
||||
),
|
||||
&[&postgres::types::Json(value)],
|
||||
)
|
||||
.map_err(Error::from)?
|
||||
.into_iter()
|
||||
.next()
|
||||
else {
|
||||
return Err(Error::FailedInsert.into());
|
||||
};
|
||||
|
||||
eprintln!("insert done");
|
||||
|
||||
// Get the generated ID
|
||||
let id = row.get::<_, i32>("key") as u32;
|
||||
let mut value = row.get::<_, postgres::types::Json<M>>("value").0;
|
||||
// .map_err(Error::from)?;
|
||||
value.base_data_mut().id = id;
|
||||
|
||||
// NOTE: Update the value so the id is set correctly in the value itself
|
||||
// let ser_val = serde_json::to_string(&value).map_err(Error::from)?;
|
||||
|
||||
let updated = con
|
||||
.execute(
|
||||
&format!(
|
||||
"UPDATE {} SET value = $1 WHERE key = $2;",
|
||||
Self::collection_name::<M>()
|
||||
),
|
||||
&[
|
||||
&postgres::types::Json(value.clone()),
|
||||
&(value.get_id() as i32),
|
||||
],
|
||||
)
|
||||
.map_err(Error::from)?;
|
||||
|
||||
if updated != 1 {
|
||||
return Err(Error::FailedUpdate(updated as usize).into());
|
||||
}
|
||||
|
||||
Ok((id, value))
|
||||
} else {
|
||||
let updated = con
|
||||
.execute(
|
||||
&format!(
|
||||
"UPDATE {} SET value = $1 WHERE key = $2;",
|
||||
Self::collection_name::<M>(),
|
||||
),
|
||||
&[
|
||||
&postgres::types::Json(value.clone()),
|
||||
&(value.get_id() as i32),
|
||||
],
|
||||
)
|
||||
.map_err(Error::from)?;
|
||||
|
||||
if updated != 1 {
|
||||
return Err(Error::FailedUpdate(updated as usize).into());
|
||||
}
|
||||
|
||||
Ok((value.get_id(), value.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
fn delete<I, Q>(&self, key: &Q) -> Result<(), super::Error<Self::Error>>
|
||||
where
|
||||
I: heromodels_core::Index<Model = M>,
|
||||
I::Key: std::borrow::Borrow<Q>,
|
||||
Q: ToString + ?Sized,
|
||||
{
|
||||
let mut con = self.pool.get().map_err(Error::from)?;
|
||||
|
||||
con.execute(
|
||||
&format!(
|
||||
"DELETE FROM {} WHERE value->$1 = $2;",
|
||||
Self::collection_name::<M>()
|
||||
),
|
||||
&[&I::key(), &key.to_string()],
|
||||
)
|
||||
.map_err(Error::from)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn delete_by_id(&self, id: u32) -> Result<(), super::Error<Self::Error>> {
|
||||
let mut con = self.pool.get().map_err(Error::from)?;
|
||||
|
||||
con.execute(
|
||||
&format!(
|
||||
"DELETE FROM {} WHERE key = $1;",
|
||||
Self::collection_name::<M>()
|
||||
),
|
||||
&[&id],
|
||||
)
|
||||
.map_err(Error::from)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_all(&self) -> Result<Vec<M>, super::Error<Self::Error>> {
|
||||
let mut con = self.pool.get().map_err(Error::from)?;
|
||||
|
||||
Ok(con
|
||||
.query(
|
||||
&format!("SELECT (value) FROM {};", Self::collection_name::<M>()),
|
||||
&[],
|
||||
)
|
||||
.map_err(Error::from)?
|
||||
.into_iter()
|
||||
.map(|row| serde_json::from_str::<M>(row.get("value")).map_err(Error::from))
|
||||
.collect::<Result<Vec<M>, _>>()?)
|
||||
}
|
||||
|
||||
fn begin_transaction(
|
||||
&self,
|
||||
) -> Result<Box<dyn super::Transaction<Error = Self::Error>>, super::Error<Self::Error>> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl core::fmt::Display for Error {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Postgres(e) => f.write_fmt(format_args!("postgres error: {e}")),
|
||||
Self::ConnectionPool(e) => {
|
||||
f.write_fmt(format_args!("postgres connection pool error: {e}"))
|
||||
}
|
||||
Self::Json(e) => {
|
||||
f.write_fmt(format_args!("could not decode from or encode to json: {e}"))
|
||||
}
|
||||
Self::JsonB(e) => f.write_fmt(format_args!(
|
||||
"could not decode from or encode to jsonb: {e}"
|
||||
)),
|
||||
Self::FailedInsert => f.write_str("insert did not return any result"),
|
||||
Self::FailedUpdate(amount) => f.write_fmt(format_args!(
|
||||
"update did not return the expected 1 modified row (got {amount})"
|
||||
)),
|
||||
Self::TableExistenceQuery => f.write_str("query to check if table exists failed"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl core::error::Error for Error {
|
||||
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||
match self {
|
||||
Self::Postgres(e) => Some(e),
|
||||
Self::ConnectionPool(e) => Some(e),
|
||||
Self::Json(e) => Some(e),
|
||||
Self::JsonB(e) => Some(e),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<postgres::error::Error> for Error {
|
||||
fn from(value: postgres::error::Error) -> Self {
|
||||
Self::Postgres(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<r2d2::Error> for Error {
|
||||
fn from(value: r2d2::Error) -> Self {
|
||||
Self::ConnectionPool(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<serde_json::Error> for Error {
|
||||
fn from(value: serde_json::Error) -> Self {
|
||||
Self::Json(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<jsonb::Error> for Error {
|
||||
fn from(value: jsonb::Error) -> Self {
|
||||
Self::JsonB(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Error> for super::Error<Error> {
|
||||
fn from(value: Error) -> Self {
|
||||
super::Error::DB(value)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user