diff --git a/src/api/v1/channels/uuid/socket.rs b/src/api/v1/channels/uuid/socket.rs index b6d449b..ac04301 100644 --- a/src/api/v1/channels/uuid/socket.rs +++ b/src/api/v1/channels/uuid/socket.rs @@ -5,58 +5,22 @@ use axum::{ http::HeaderMap, response::IntoResponse, }; -use diesel::{ExpressionMethods, QueryDsl, SelectableHelper, delete, update}; -use diesel_async::RunQueryDsl; use futures_util::{SinkExt, StreamExt}; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; use uuid::Uuid; use crate::{ AppState, api::v1::auth::check_access_token, error::Error, - objects::{self, Channel, Member, message::MessageBuilder}, - schema::messages, + objects::{Channel, Member}, utils::global_checks, }; #[derive(Deserialize)] -#[serde(tag = "event")] -enum ReceiveEvent { - MessageSend { entity: MessageSend }, - MessageEdit { entity: MessageEdit }, - MessageDelete { entity: MessageDelete }, -} - -#[derive(Deserialize)] -struct MessageSend { - text: String, - reply_to: Option, -} - -#[derive(Deserialize)] -struct MessageEdit { - uuid: Uuid, - text: String, -} - -#[derive(Deserialize, Serialize)] -struct MessageDelete { - uuid: Uuid, -} - -#[derive(Serialize)] -#[serde(tag = "event")] -enum SendEvent { - MessageSend { entity: objects::Message }, - MessageEdit { entity: objects::Message }, - MessageDelete { entity: MessageDelete }, - Error { entity: SendError }, -} - -#[derive(Serialize)] -struct SendError { +struct MessageBody { message: String, + reply_to: Option, } pub async fn ws( @@ -135,140 +99,27 @@ pub async fn ws( tokio::spawn(async move { while let Some(msg) = receiver.next().await { if let Ok(Message::Text(text)) = msg { - let message_body: ReceiveEvent = serde_json::from_str(&text)?; + let message_body: MessageBody = serde_json::from_str(&text)?; - match message_body { - ReceiveEvent::MessageSend { entity } => { - let message = channel - .new_message( - &mut app_state.pool.get().await?, - &app_state.cache_pool, - uuid, - entity.text, - entity.reply_to, - ) - .await?; + let message = channel + .new_message( + &mut conn, + &app_state.cache_pool, + uuid, + message_body.message, + message_body.reply_to, + ) + .await?; - redis::cmd("PUBLISH") - .arg(&[ - channel_uuid.to_string(), - serde_json::to_string(&SendEvent::MessageSend { - entity: message, - })?, - ]) - .exec_async( - &mut app_state - .cache_pool - .get_multiplexed_tokio_connection() - .await?, - ) - .await?; - } - ReceiveEvent::MessageEdit { entity } => { - use messages::dsl; - let mut message: MessageBuilder = dsl::messages - .filter(dsl::uuid.eq(entity.uuid)) - .select(MessageBuilder::as_select()) - .get_result(&mut app_state.pool.get().await?) - .await?; - - if uuid != message.user_uuid { - redis::cmd("PUBLISH") - .arg(&[ - channel_uuid.to_string(), - serde_json::to_string(&SendEvent::Error { - entity: SendError { - message: "Not allowed".to_string(), - }, - })?, - ]) - .exec_async( - &mut app_state - .cache_pool - .get_multiplexed_tokio_connection() - .await?, - ) - .await?; - - continue; - } - - update(messages::table) - .filter(dsl::uuid.eq(entity.uuid)) - .set(dsl::message.eq(&entity.text)) - .execute(&mut app_state.pool.get().await?) - .await?; - - message.message = entity.text; - - redis::cmd("PUBLISH") - .arg(&[ - channel_uuid.to_string(), - serde_json::to_string(&SendEvent::MessageEdit { - entity: message - .build( - &mut app_state.pool.get().await?, - &app_state.cache_pool, - ) - .await?, - })?, - ]) - .exec_async( - &mut app_state - .cache_pool - .get_multiplexed_tokio_connection() - .await?, - ) - .await?; - } - ReceiveEvent::MessageDelete { entity } => { - use messages::dsl; - let message: MessageBuilder = dsl::messages - .filter(dsl::uuid.eq(entity.uuid)) - .select(MessageBuilder::as_select()) - .get_result(&mut app_state.pool.get().await?) - .await?; - - if uuid != message.user_uuid { - redis::cmd("PUBLISH") - .arg(&[ - channel_uuid.to_string(), - serde_json::to_string(&SendEvent::Error { - entity: SendError { - message: "Not allowed".to_string(), - }, - })?, - ]) - .exec_async( - &mut app_state - .cache_pool - .get_multiplexed_tokio_connection() - .await?, - ) - .await?; - - continue; - } - - delete(messages::table) - .filter(dsl::uuid.eq(entity.uuid)) - .execute(&mut app_state.pool.get().await?) - .await?; - - redis::cmd("PUBLISH") - .arg(&[ - channel_uuid.to_string(), - serde_json::to_string(&SendEvent::MessageDelete { entity })?, - ]) - .exec_async( - &mut app_state - .cache_pool - .get_multiplexed_tokio_connection() - .await?, - ) - .await?; - } - } + redis::cmd("PUBLISH") + .arg(&[channel_uuid.to_string(), serde_json::to_string(&message)?]) + .exec_async( + &mut app_state + .cache_pool + .get_multiplexed_tokio_connection() + .await?, + ) + .await?; } } diff --git a/src/objects/mod.rs b/src/objects/mod.rs index 50fb4cb..5a013ca 100644 --- a/src/objects/mod.rs +++ b/src/objects/mod.rs @@ -15,7 +15,7 @@ mod guild; mod invite; mod me; mod member; -pub mod message; +mod message; mod password_reset_token; mod role; mod user;