feat: make database access more uniform and stop locking as many pool connections
All checks were successful
ci/woodpecker/push/build-and-publish Pipeline was successful
All checks were successful
ci/woodpecker/push/build-and-publish Pipeline was successful
This commit is contained in:
parent
f5d4211fad
commit
fa52412b43
30 changed files with 516 additions and 373 deletions
|
@ -2,15 +2,15 @@ use diesel::{
|
|||
ExpressionMethods, Insertable, QueryDsl, Queryable, Selectable, SelectableHelper, delete,
|
||||
insert_into, update,
|
||||
};
|
||||
use diesel_async::{RunQueryDsl, pooled_connection::AsyncDieselConnectionManager};
|
||||
use diesel_async::RunQueryDsl;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
AppState, Conn,
|
||||
Conn,
|
||||
error::Error,
|
||||
schema::{channel_permissions, channels, messages},
|
||||
utils::{CHANNEL_REGEX, order_by_is_above},
|
||||
utils::{CHANNEL_REGEX, CacheFns, order_by_is_above},
|
||||
};
|
||||
|
||||
use super::{HasIsAbove, HasUuid, Message, load_or_empty, message::MessageBuilder};
|
||||
|
@ -79,49 +79,44 @@ impl HasIsAbove for Channel {
|
|||
}
|
||||
|
||||
impl Channel {
|
||||
pub async fn fetch_all(
|
||||
pool: &deadpool::managed::Pool<
|
||||
AsyncDieselConnectionManager<diesel_async::AsyncPgConnection>,
|
||||
Conn,
|
||||
>,
|
||||
guild_uuid: Uuid,
|
||||
) -> Result<Vec<Self>, Error> {
|
||||
let mut conn = pool.get().await?;
|
||||
|
||||
pub async fn fetch_all(conn: &mut Conn, guild_uuid: Uuid) -> Result<Vec<Self>, Error> {
|
||||
use channels::dsl;
|
||||
let channel_builders: Vec<ChannelBuilder> = load_or_empty(
|
||||
dsl::channels
|
||||
.filter(dsl::guild_uuid.eq(guild_uuid))
|
||||
.select(ChannelBuilder::as_select())
|
||||
.load(&mut conn)
|
||||
.load(conn)
|
||||
.await,
|
||||
)?;
|
||||
|
||||
let channel_futures = channel_builders.iter().map(async move |c| {
|
||||
let mut conn = pool.get().await?;
|
||||
c.clone().build(&mut conn).await
|
||||
});
|
||||
let mut channels = vec![];
|
||||
|
||||
futures_util::future::try_join_all(channel_futures).await
|
||||
}
|
||||
|
||||
pub async fn fetch_one(app_state: &AppState, channel_uuid: Uuid) -> Result<Self, Error> {
|
||||
if let Ok(cache_hit) = app_state.get_cache_key(channel_uuid.to_string()).await {
|
||||
return Ok(serde_json::from_str(&cache_hit)?);
|
||||
for builder in channel_builders {
|
||||
channels.push(builder.build(conn).await?);
|
||||
}
|
||||
|
||||
let mut conn = app_state.pool.get().await?;
|
||||
Ok(channels)
|
||||
}
|
||||
|
||||
pub async fn fetch_one(
|
||||
conn: &mut Conn,
|
||||
cache_pool: &redis::Client,
|
||||
channel_uuid: Uuid,
|
||||
) -> Result<Self, Error> {
|
||||
if let Ok(cache_hit) = cache_pool.get_cache_key(channel_uuid.to_string()).await {
|
||||
return Ok(cache_hit);
|
||||
}
|
||||
|
||||
use channels::dsl;
|
||||
let channel_builder: ChannelBuilder = dsl::channels
|
||||
.filter(dsl::uuid.eq(channel_uuid))
|
||||
.select(ChannelBuilder::as_select())
|
||||
.get_result(&mut conn)
|
||||
.get_result(conn)
|
||||
.await?;
|
||||
|
||||
let channel = channel_builder.build(&mut conn).await?;
|
||||
let channel = channel_builder.build(conn).await?;
|
||||
|
||||
app_state
|
||||
cache_pool
|
||||
.set_cache_key(channel_uuid.to_string(), channel.clone(), 60)
|
||||
.await?;
|
||||
|
||||
|
@ -129,7 +124,8 @@ impl Channel {
|
|||
}
|
||||
|
||||
pub async fn new(
|
||||
app_state: &AppState,
|
||||
conn: &mut Conn,
|
||||
cache_pool: &redis::Client,
|
||||
guild_uuid: Uuid,
|
||||
name: String,
|
||||
description: Option<String>,
|
||||
|
@ -138,11 +134,9 @@ impl Channel {
|
|||
return Err(Error::BadRequest("Channel name is invalid".to_string()));
|
||||
}
|
||||
|
||||
let mut conn = app_state.pool.get().await?;
|
||||
|
||||
let channel_uuid = Uuid::now_v7();
|
||||
|
||||
let channels = Self::fetch_all(&app_state.pool, guild_uuid).await?;
|
||||
let channels = Self::fetch_all(conn, guild_uuid).await?;
|
||||
|
||||
let channels_ordered = order_by_is_above(channels).await?;
|
||||
|
||||
|
@ -158,7 +152,7 @@ impl Channel {
|
|||
|
||||
insert_into(channels::table)
|
||||
.values(new_channel.clone())
|
||||
.execute(&mut conn)
|
||||
.execute(conn)
|
||||
.await?;
|
||||
|
||||
if let Some(old_last_channel) = last_channel {
|
||||
|
@ -166,7 +160,7 @@ impl Channel {
|
|||
update(channels::table)
|
||||
.filter(dsl::uuid.eq(old_last_channel.uuid))
|
||||
.set(dsl::is_above.eq(new_channel.uuid))
|
||||
.execute(&mut conn)
|
||||
.execute(conn)
|
||||
.await?;
|
||||
}
|
||||
|
||||
|
@ -180,16 +174,16 @@ impl Channel {
|
|||
permissions: vec![],
|
||||
};
|
||||
|
||||
app_state
|
||||
cache_pool
|
||||
.set_cache_key(channel_uuid.to_string(), channel.clone(), 1800)
|
||||
.await?;
|
||||
|
||||
if app_state
|
||||
.get_cache_key(format!("{guild_uuid}_channels"))
|
||||
if cache_pool
|
||||
.get_cache_key::<Vec<Channel>>(format!("{guild_uuid}_channels"))
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
app_state
|
||||
cache_pool
|
||||
.del_cache_key(format!("{guild_uuid}_channels"))
|
||||
.await?;
|
||||
}
|
||||
|
@ -197,14 +191,12 @@ impl Channel {
|
|||
Ok(channel)
|
||||
}
|
||||
|
||||
pub async fn delete(self, app_state: &AppState) -> Result<(), Error> {
|
||||
let mut conn = app_state.pool.get().await?;
|
||||
|
||||
pub async fn delete(self, conn: &mut Conn, cache_pool: &redis::Client) -> Result<(), Error> {
|
||||
use channels::dsl;
|
||||
match update(channels::table)
|
||||
.filter(dsl::is_above.eq(self.uuid))
|
||||
.set(dsl::is_above.eq(None::<Uuid>))
|
||||
.execute(&mut conn)
|
||||
.execute(conn)
|
||||
.await
|
||||
{
|
||||
Ok(r) => Ok(r),
|
||||
|
@ -214,13 +206,13 @@ impl Channel {
|
|||
|
||||
delete(channels::table)
|
||||
.filter(dsl::uuid.eq(self.uuid))
|
||||
.execute(&mut conn)
|
||||
.execute(conn)
|
||||
.await?;
|
||||
|
||||
match update(channels::table)
|
||||
.filter(dsl::is_above.eq(self.uuid))
|
||||
.set(dsl::is_above.eq(self.is_above))
|
||||
.execute(&mut conn)
|
||||
.execute(conn)
|
||||
.await
|
||||
{
|
||||
Ok(r) => Ok(r),
|
||||
|
@ -228,16 +220,20 @@ impl Channel {
|
|||
Err(e) => Err(e),
|
||||
}?;
|
||||
|
||||
if app_state.get_cache_key(self.uuid.to_string()).await.is_ok() {
|
||||
app_state.del_cache_key(self.uuid.to_string()).await?;
|
||||
}
|
||||
|
||||
if app_state
|
||||
.get_cache_key(format!("{}_channels", self.guild_uuid))
|
||||
if cache_pool
|
||||
.get_cache_key::<Channel>(self.uuid.to_string())
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
app_state
|
||||
cache_pool.del_cache_key(self.uuid.to_string()).await?;
|
||||
}
|
||||
|
||||
if cache_pool
|
||||
.get_cache_key::<Vec<Channel>>(format!("{}_channels", self.guild_uuid))
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
cache_pool
|
||||
.del_cache_key(format!("{}_channels", self.guild_uuid))
|
||||
.await?;
|
||||
}
|
||||
|
@ -247,32 +243,36 @@ impl Channel {
|
|||
|
||||
pub async fn fetch_messages(
|
||||
&self,
|
||||
app_state: &AppState,
|
||||
conn: &mut Conn,
|
||||
cache_pool: &redis::Client,
|
||||
amount: i64,
|
||||
offset: i64,
|
||||
) -> Result<Vec<Message>, Error> {
|
||||
let mut conn = app_state.pool.get().await?;
|
||||
|
||||
use messages::dsl;
|
||||
let messages: Vec<MessageBuilder> = load_or_empty(
|
||||
let message_builders: Vec<MessageBuilder> = load_or_empty(
|
||||
dsl::messages
|
||||
.filter(dsl::channel_uuid.eq(self.uuid))
|
||||
.select(MessageBuilder::as_select())
|
||||
.order(dsl::uuid.desc())
|
||||
.limit(amount)
|
||||
.offset(offset)
|
||||
.load(&mut conn)
|
||||
.load(conn)
|
||||
.await,
|
||||
)?;
|
||||
|
||||
let message_futures = messages.iter().map(async move |b| b.build(app_state).await);
|
||||
let mut messages = vec![];
|
||||
|
||||
futures_util::future::try_join_all(message_futures).await
|
||||
for builder in message_builders {
|
||||
messages.push(builder.build(conn, cache_pool).await?);
|
||||
}
|
||||
|
||||
Ok(messages)
|
||||
}
|
||||
|
||||
pub async fn new_message(
|
||||
&self,
|
||||
app_state: &AppState,
|
||||
conn: &mut Conn,
|
||||
cache_pool: &redis::Client,
|
||||
user_uuid: Uuid,
|
||||
message: String,
|
||||
reply_to: Option<Uuid>,
|
||||
|
@ -287,66 +287,101 @@ impl Channel {
|
|||
reply_to,
|
||||
};
|
||||
|
||||
let mut conn = app_state.pool.get().await?;
|
||||
|
||||
insert_into(messages::table)
|
||||
.values(message.clone())
|
||||
.execute(&mut conn)
|
||||
.execute(conn)
|
||||
.await?;
|
||||
|
||||
message.build(app_state).await
|
||||
message.build(conn, cache_pool).await
|
||||
}
|
||||
|
||||
pub async fn set_name(&mut self, app_state: &AppState, new_name: String) -> Result<(), Error> {
|
||||
pub async fn set_name(
|
||||
&mut self,
|
||||
conn: &mut Conn,
|
||||
cache_pool: &redis::Client,
|
||||
new_name: String,
|
||||
) -> Result<(), Error> {
|
||||
if !CHANNEL_REGEX.is_match(&new_name) {
|
||||
return Err(Error::BadRequest("Channel name is invalid".to_string()));
|
||||
}
|
||||
|
||||
let mut conn = app_state.pool.get().await?;
|
||||
|
||||
use channels::dsl;
|
||||
update(channels::table)
|
||||
.filter(dsl::uuid.eq(self.uuid))
|
||||
.set(dsl::name.eq(&new_name))
|
||||
.execute(&mut conn)
|
||||
.execute(conn)
|
||||
.await?;
|
||||
|
||||
self.name = new_name;
|
||||
|
||||
if cache_pool
|
||||
.get_cache_key::<Channel>(self.uuid.to_string())
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
cache_pool.del_cache_key(self.uuid.to_string()).await?;
|
||||
}
|
||||
|
||||
if cache_pool
|
||||
.get_cache_key::<Vec<Channel>>(format!("{}_channels", self.guild_uuid))
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
cache_pool
|
||||
.del_cache_key(format!("{}_channels", self.guild_uuid))
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn set_description(
|
||||
&mut self,
|
||||
app_state: &AppState,
|
||||
conn: &mut Conn,
|
||||
cache_pool: &redis::Client,
|
||||
new_description: String,
|
||||
) -> Result<(), Error> {
|
||||
let mut conn = app_state.pool.get().await?;
|
||||
|
||||
use channels::dsl;
|
||||
update(channels::table)
|
||||
.filter(dsl::uuid.eq(self.uuid))
|
||||
.set(dsl::description.eq(&new_description))
|
||||
.execute(&mut conn)
|
||||
.execute(conn)
|
||||
.await?;
|
||||
|
||||
self.description = Some(new_description);
|
||||
|
||||
if cache_pool
|
||||
.get_cache_key::<Channel>(self.uuid.to_string())
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
cache_pool.del_cache_key(self.uuid.to_string()).await?;
|
||||
}
|
||||
|
||||
if cache_pool
|
||||
.get_cache_key::<Vec<Channel>>(format!("{}_channels", self.guild_uuid))
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
cache_pool
|
||||
.del_cache_key(format!("{}_channels", self.guild_uuid))
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn move_channel(
|
||||
&mut self,
|
||||
app_state: &AppState,
|
||||
conn: &mut Conn,
|
||||
cache_pool: &redis::Client,
|
||||
new_is_above: Uuid,
|
||||
) -> Result<(), Error> {
|
||||
let mut conn = app_state.pool.get().await?;
|
||||
|
||||
use channels::dsl;
|
||||
let old_above_uuid: Option<Uuid> = match dsl::channels
|
||||
.filter(dsl::is_above.eq(self.uuid))
|
||||
.select(dsl::uuid)
|
||||
.get_result(&mut conn)
|
||||
.get_result(conn)
|
||||
.await
|
||||
{
|
||||
Ok(r) => Ok(Some(r)),
|
||||
|
@ -358,14 +393,14 @@ impl Channel {
|
|||
update(channels::table)
|
||||
.filter(dsl::uuid.eq(uuid))
|
||||
.set(dsl::is_above.eq(None::<Uuid>))
|
||||
.execute(&mut conn)
|
||||
.execute(conn)
|
||||
.await?;
|
||||
}
|
||||
|
||||
match update(channels::table)
|
||||
.filter(dsl::is_above.eq(new_is_above))
|
||||
.set(dsl::is_above.eq(self.uuid))
|
||||
.execute(&mut conn)
|
||||
.execute(conn)
|
||||
.await
|
||||
{
|
||||
Ok(r) => Ok(r),
|
||||
|
@ -376,19 +411,37 @@ impl Channel {
|
|||
update(channels::table)
|
||||
.filter(dsl::uuid.eq(self.uuid))
|
||||
.set(dsl::is_above.eq(new_is_above))
|
||||
.execute(&mut conn)
|
||||
.execute(conn)
|
||||
.await?;
|
||||
|
||||
if let Some(uuid) = old_above_uuid {
|
||||
update(channels::table)
|
||||
.filter(dsl::uuid.eq(uuid))
|
||||
.set(dsl::is_above.eq(self.is_above))
|
||||
.execute(&mut conn)
|
||||
.execute(conn)
|
||||
.await?;
|
||||
}
|
||||
|
||||
self.is_above = Some(new_is_above);
|
||||
|
||||
if cache_pool
|
||||
.get_cache_key::<Channel>(self.uuid.to_string())
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
cache_pool.del_cache_key(self.uuid.to_string()).await?;
|
||||
}
|
||||
|
||||
if cache_pool
|
||||
.get_cache_key::<Vec<Channel>>(format!("{}_channels", self.guild_uuid))
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
cache_pool
|
||||
.del_cache_key(format!("{}_channels", self.guild_uuid))
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue