feat: add event enum for sending and receiving messages on the socket
All checks were successful
ci/woodpecker/push/build-and-publish Pipeline was successful
ci/woodpecker/pr/build-and-publish Pipeline was successful

Added in message editing and deleting with this change
This commit is contained in:
Radical 2025-08-07 21:38:01 +02:00
parent 447c577a2a
commit b4bb83b7f5
2 changed files with 173 additions and 24 deletions

View file

@ -5,24 +5,60 @@ use axum::{
http::HeaderMap, http::HeaderMap,
response::IntoResponse, response::IntoResponse,
}; };
use diesel::{ExpressionMethods, QueryDsl, SelectableHelper, delete, update};
use diesel_async::RunQueryDsl;
use futures_util::{SinkExt, StreamExt}; use futures_util::{SinkExt, StreamExt};
use serde::Deserialize; use serde::{Deserialize, Serialize};
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
AppState, AppState,
api::v1::auth::check_access_token, api::v1::auth::check_access_token,
error::Error, error::Error,
objects::{Channel, Member}, objects::{self, Channel, Member, message::MessageBuilder},
schema::messages,
utils::global_checks, utils::global_checks,
}; };
#[derive(Deserialize)] #[derive(Deserialize)]
struct MessageBody { #[serde(tag = "event")]
message: String, enum ReceiveEvent {
MessageSend { entity: MessageSend },
MessageEdit { entity: MessageEdit },
MessageDelete { entity: MessageDelete },
}
#[derive(Deserialize)]
struct MessageSend {
text: String,
reply_to: Option<Uuid>, reply_to: Option<Uuid>,
} }
#[derive(Deserialize)]
struct MessageEdit {
uuid: Uuid,
text: String,
}
#[derive(Deserialize, Serialize)]
struct MessageDelete {
uuid: Uuid,
}
#[derive(Serialize)]
#[serde(tag = "event")]
enum SendEvent {
MessageSend { entity: objects::Message },
MessageEdit { entity: objects::Message },
MessageDelete { entity: MessageDelete },
Error { entity: SendError },
}
#[derive(Serialize)]
struct SendError {
message: String,
}
pub async fn ws( pub async fn ws(
ws: WebSocketUpgrade, ws: WebSocketUpgrade,
State(app_state): State<Arc<AppState>>, State(app_state): State<Arc<AppState>>,
@ -99,20 +135,27 @@ pub async fn ws(
tokio::spawn(async move { tokio::spawn(async move {
while let Some(msg) = receiver.next().await { while let Some(msg) = receiver.next().await {
if let Ok(Message::Text(text)) = msg { if let Ok(Message::Text(text)) = msg {
let message_body: MessageBody = serde_json::from_str(&text)?; let message_body: ReceiveEvent = serde_json::from_str(&text)?;
match message_body {
ReceiveEvent::MessageSend { entity } => {
let message = channel let message = channel
.new_message( .new_message(
&mut conn, &mut app_state.pool.get().await?,
&app_state.cache_pool, &app_state.cache_pool,
uuid, uuid,
message_body.message, entity.text,
message_body.reply_to, entity.reply_to,
) )
.await?; .await?;
redis::cmd("PUBLISH") redis::cmd("PUBLISH")
.arg(&[channel_uuid.to_string(), serde_json::to_string(&message)?]) .arg(&[
channel_uuid.to_string(),
serde_json::to_string(&SendEvent::MessageSend {
entity: message,
})?,
])
.exec_async( .exec_async(
&mut app_state &mut app_state
.cache_pool .cache_pool
@ -121,6 +164,112 @@ pub async fn ws(
) )
.await?; .await?;
} }
ReceiveEvent::MessageEdit { entity } => {
use messages::dsl;
let mut message: MessageBuilder = dsl::messages
.filter(dsl::uuid.eq(entity.uuid))
.select(MessageBuilder::as_select())
.get_result(&mut app_state.pool.get().await?)
.await?;
if uuid != message.user_uuid {
redis::cmd("PUBLISH")
.arg(&[
channel_uuid.to_string(),
serde_json::to_string(&SendEvent::Error {
entity: SendError {
message: "Not allowed".to_string(),
},
})?,
])
.exec_async(
&mut app_state
.cache_pool
.get_multiplexed_tokio_connection()
.await?,
)
.await?;
continue;
}
update(messages::table)
.filter(dsl::uuid.eq(entity.uuid))
.set(dsl::message.eq(&entity.text))
.execute(&mut app_state.pool.get().await?)
.await?;
message.message = entity.text;
redis::cmd("PUBLISH")
.arg(&[
channel_uuid.to_string(),
serde_json::to_string(&SendEvent::MessageEdit {
entity: message
.build(
&mut app_state.pool.get().await?,
&app_state.cache_pool,
)
.await?,
})?,
])
.exec_async(
&mut app_state
.cache_pool
.get_multiplexed_tokio_connection()
.await?,
)
.await?;
}
ReceiveEvent::MessageDelete { entity } => {
use messages::dsl;
let message: MessageBuilder = dsl::messages
.filter(dsl::uuid.eq(entity.uuid))
.select(MessageBuilder::as_select())
.get_result(&mut app_state.pool.get().await?)
.await?;
if uuid != message.user_uuid {
redis::cmd("PUBLISH")
.arg(&[
channel_uuid.to_string(),
serde_json::to_string(&SendEvent::Error {
entity: SendError {
message: "Not allowed".to_string(),
},
})?,
])
.exec_async(
&mut app_state
.cache_pool
.get_multiplexed_tokio_connection()
.await?,
)
.await?;
continue;
}
delete(messages::table)
.filter(dsl::uuid.eq(entity.uuid))
.execute(&mut app_state.pool.get().await?)
.await?;
redis::cmd("PUBLISH")
.arg(&[
channel_uuid.to_string(),
serde_json::to_string(&SendEvent::MessageDelete { entity })?,
])
.exec_async(
&mut app_state
.cache_pool
.get_multiplexed_tokio_connection()
.await?,
)
.await?;
}
}
}
} }
Ok::<(), crate::error::Error>(()) Ok::<(), crate::error::Error>(())

View file

@ -15,7 +15,7 @@ mod guild;
mod invite; mod invite;
mod me; mod me;
mod member; mod member;
mod message; pub mod message;
mod password_reset_token; mod password_reset_token;
mod role; mod role;
mod user; mod user;