feat: add event enum for sending and receiving messages on the socket #47
2 changed files with 173 additions and 24 deletions
|
@ -5,24 +5,60 @@ use axum::{
|
||||||
http::HeaderMap,
|
http::HeaderMap,
|
||||||
response::IntoResponse,
|
response::IntoResponse,
|
||||||
};
|
};
|
||||||
|
use diesel::{ExpressionMethods, QueryDsl, SelectableHelper, delete, update};
|
||||||
|
use diesel_async::RunQueryDsl;
|
||||||
use futures_util::{SinkExt, StreamExt};
|
use futures_util::{SinkExt, StreamExt};
|
||||||
use serde::Deserialize;
|
use serde::{Deserialize, Serialize};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
AppState,
|
AppState,
|
||||||
api::v1::auth::check_access_token,
|
api::v1::auth::check_access_token,
|
||||||
error::Error,
|
error::Error,
|
||||||
objects::{Channel, Member},
|
objects::{self, Channel, Member, message::MessageBuilder},
|
||||||
|
schema::messages,
|
||||||
utils::global_checks,
|
utils::global_checks,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
struct MessageBody {
|
#[serde(tag = "event")]
|
||||||
message: String,
|
enum ReceiveEvent {
|
||||||
|
MessageSend { entity: MessageSend },
|
||||||
|
MessageEdit { entity: MessageEdit },
|
||||||
|
MessageDelete { entity: MessageDelete },
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct MessageSend {
|
||||||
|
text: String,
|
||||||
reply_to: Option<Uuid>,
|
reply_to: Option<Uuid>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct MessageEdit {
|
||||||
|
uuid: Uuid,
|
||||||
|
text: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Serialize)]
|
||||||
|
struct MessageDelete {
|
||||||
|
uuid: Uuid,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
#[serde(tag = "event")]
|
||||||
|
enum SendEvent {
|
||||||
|
MessageSend { entity: objects::Message },
|
||||||
|
MessageEdit { entity: objects::Message },
|
||||||
|
MessageDelete { entity: MessageDelete },
|
||||||
|
Error { entity: SendError },
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize)]
|
||||||
|
struct SendError {
|
||||||
|
message: String,
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn ws(
|
pub async fn ws(
|
||||||
ws: WebSocketUpgrade,
|
ws: WebSocketUpgrade,
|
||||||
State(app_state): State<Arc<AppState>>,
|
State(app_state): State<Arc<AppState>>,
|
||||||
|
@ -99,20 +135,27 @@ pub async fn ws(
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
while let Some(msg) = receiver.next().await {
|
while let Some(msg) = receiver.next().await {
|
||||||
if let Ok(Message::Text(text)) = msg {
|
if let Ok(Message::Text(text)) = msg {
|
||||||
let message_body: MessageBody = serde_json::from_str(&text)?;
|
let message_body: ReceiveEvent = serde_json::from_str(&text)?;
|
||||||
|
|
||||||
|
match message_body {
|
||||||
|
ReceiveEvent::MessageSend { entity } => {
|
||||||
let message = channel
|
let message = channel
|
||||||
.new_message(
|
.new_message(
|
||||||
&mut conn,
|
&mut app_state.pool.get().await?,
|
||||||
&app_state.cache_pool,
|
&app_state.cache_pool,
|
||||||
uuid,
|
uuid,
|
||||||
message_body.message,
|
entity.text,
|
||||||
message_body.reply_to,
|
entity.reply_to,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
redis::cmd("PUBLISH")
|
redis::cmd("PUBLISH")
|
||||||
.arg(&[channel_uuid.to_string(), serde_json::to_string(&message)?])
|
.arg(&[
|
||||||
|
channel_uuid.to_string(),
|
||||||
|
serde_json::to_string(&SendEvent::MessageSend {
|
||||||
|
entity: message,
|
||||||
|
})?,
|
||||||
|
])
|
||||||
.exec_async(
|
.exec_async(
|
||||||
&mut app_state
|
&mut app_state
|
||||||
.cache_pool
|
.cache_pool
|
||||||
|
@ -121,6 +164,112 @@ pub async fn ws(
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
ReceiveEvent::MessageEdit { entity } => {
|
||||||
|
use messages::dsl;
|
||||||
|
let mut message: MessageBuilder = dsl::messages
|
||||||
|
.filter(dsl::uuid.eq(entity.uuid))
|
||||||
|
.select(MessageBuilder::as_select())
|
||||||
|
.get_result(&mut app_state.pool.get().await?)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if uuid != message.user_uuid {
|
||||||
|
redis::cmd("PUBLISH")
|
||||||
|
.arg(&[
|
||||||
|
channel_uuid.to_string(),
|
||||||
|
serde_json::to_string(&SendEvent::Error {
|
||||||
|
entity: SendError {
|
||||||
|
message: "Not allowed".to_string(),
|
||||||
|
},
|
||||||
|
})?,
|
||||||
|
])
|
||||||
|
.exec_async(
|
||||||
|
&mut app_state
|
||||||
|
.cache_pool
|
||||||
|
.get_multiplexed_tokio_connection()
|
||||||
|
.await?,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
update(messages::table)
|
||||||
|
.filter(dsl::uuid.eq(entity.uuid))
|
||||||
|
.set(dsl::message.eq(&entity.text))
|
||||||
|
.execute(&mut app_state.pool.get().await?)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
message.message = entity.text;
|
||||||
|
|
||||||
|
redis::cmd("PUBLISH")
|
||||||
|
.arg(&[
|
||||||
|
channel_uuid.to_string(),
|
||||||
|
serde_json::to_string(&SendEvent::MessageEdit {
|
||||||
|
entity: message
|
||||||
|
.build(
|
||||||
|
&mut app_state.pool.get().await?,
|
||||||
|
&app_state.cache_pool,
|
||||||
|
)
|
||||||
|
.await?,
|
||||||
|
})?,
|
||||||
|
])
|
||||||
|
.exec_async(
|
||||||
|
&mut app_state
|
||||||
|
.cache_pool
|
||||||
|
.get_multiplexed_tokio_connection()
|
||||||
|
.await?,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
ReceiveEvent::MessageDelete { entity } => {
|
||||||
|
use messages::dsl;
|
||||||
|
let message: MessageBuilder = dsl::messages
|
||||||
|
.filter(dsl::uuid.eq(entity.uuid))
|
||||||
|
.select(MessageBuilder::as_select())
|
||||||
|
.get_result(&mut app_state.pool.get().await?)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if uuid != message.user_uuid {
|
||||||
|
redis::cmd("PUBLISH")
|
||||||
|
.arg(&[
|
||||||
|
channel_uuid.to_string(),
|
||||||
|
serde_json::to_string(&SendEvent::Error {
|
||||||
|
entity: SendError {
|
||||||
|
message: "Not allowed".to_string(),
|
||||||
|
},
|
||||||
|
})?,
|
||||||
|
])
|
||||||
|
.exec_async(
|
||||||
|
&mut app_state
|
||||||
|
.cache_pool
|
||||||
|
.get_multiplexed_tokio_connection()
|
||||||
|
.await?,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(messages::table)
|
||||||
|
.filter(dsl::uuid.eq(entity.uuid))
|
||||||
|
.execute(&mut app_state.pool.get().await?)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
redis::cmd("PUBLISH")
|
||||||
|
.arg(&[
|
||||||
|
channel_uuid.to_string(),
|
||||||
|
serde_json::to_string(&SendEvent::MessageDelete { entity })?,
|
||||||
|
])
|
||||||
|
.exec_async(
|
||||||
|
&mut app_state
|
||||||
|
.cache_pool
|
||||||
|
.get_multiplexed_tokio_connection()
|
||||||
|
.await?,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok::<(), crate::error::Error>(())
|
Ok::<(), crate::error::Error>(())
|
||||||
|
|
|
@ -15,7 +15,7 @@ mod guild;
|
||||||
mod invite;
|
mod invite;
|
||||||
mod me;
|
mod me;
|
||||||
mod member;
|
mod member;
|
||||||
mod message;
|
pub mod message;
|
||||||
mod password_reset_token;
|
mod password_reset_token;
|
||||||
mod role;
|
mod role;
|
||||||
mod user;
|
mod user;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue