messaging implementation using valkey pubsub and websockets #12

Merged
radical merged 7 commits from wip/messaging-wss into main 2025-05-18 18:14:41 +00:00
2 changed files with 22 additions and 0 deletions
Showing only changes of commit c5d14ac063 - Show all commits

View file

@ -97,6 +97,7 @@ pub async fn echo(req: HttpRequest, path: web::Path<(Uuid, Uuid)>, stream: web::
Ok(AggregatedMessage::Text(text)) => { Ok(AggregatedMessage::Text(text)) => {
// echo text message // echo text message
redis::cmd("PUBLISH").arg(&[channel_uuid.to_string(), text.to_string()]).exec_async(&mut conn).await.unwrap(); redis::cmd("PUBLISH").arg(&[channel_uuid.to_string(), text.to_string()]).exec_async(&mut conn).await.unwrap();
channel.new_message(&data.pool, uuid, text.to_string()).await.unwrap();
} }
Ok(AggregatedMessage::Binary(bin)) => { Ok(AggregatedMessage::Binary(bin)) => {

View file

@ -186,6 +186,27 @@ impl Channel {
Ok(message_builders.iter().map(|b| b.build()).collect()) Ok(message_builders.iter().map(|b| b.build()).collect())
} }
pub async fn new_message(&self, pool: &Pool<Postgres>, user_uuid: Uuid, message: String) -> Result<Message, HttpResponse> {
let message_uuid = Uuid::now_v7();
let row = sqlx::query(&format!("INSERT INTO messages (uuid, channel_uuid, user_uuid, message) VALUES ('{}', '{}', '{}', $1", message_uuid, self.uuid, user_uuid))
.bind(&message)
.execute(pool)
.await;
if let Err(error) = row {
error!("{}", error);
return Err(HttpResponse::InternalServerError().finish());
}
Ok(Message {
uuid: message_uuid,
channel_uuid: self.uuid,
user_uuid,
message,
})
}
} }
#[derive(Clone, Copy)] #[derive(Clone, Copy)]