From 95964e6fecd2fb88a3027bd3b50460c8839f6218 Mon Sep 17 00:00:00 2001 From: Radical Date: Thu, 15 May 2025 23:43:39 +0200 Subject: [PATCH] feat: add rough message sending Doesnt get stored in psql and is currently done without any error handling --- .../v1/servers/uuid/channels/uuid/socket.rs | 27 ++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/src/api/v1/servers/uuid/channels/uuid/socket.rs b/src/api/v1/servers/uuid/channels/uuid/socket.rs index 2d1cdd8..8f8a15c 100644 --- a/src/api/v1/servers/uuid/channels/uuid/socket.rs +++ b/src/api/v1/servers/uuid/channels/uuid/socket.rs @@ -63,31 +63,50 @@ pub async fn echo(req: HttpRequest, path: web::Path<(Uuid, Uuid)>, stream: web:: } } - let (res, mut session, stream) = actix_ws::handle(&req, stream)?; + let (res, mut session_1, stream) = actix_ws::handle(&req, stream)?; let mut stream = stream .aggregate_continuations() // aggregate continuation frames up to 1MiB .max_continuation_size(2_usize.pow(20)); + let pubsub_result = data.cache_pool.get_async_pubsub().await; + + if let Err(error) = pubsub_result { + error!("{}", error); + return Ok(HttpResponse::InternalServerError().finish()) + } + + let mut session_2 = session_1.clone(); + + rt::spawn(async move { + let mut pubsub = pubsub_result.unwrap(); + pubsub.subscribe(channel_uuid.to_string()).await.unwrap(); + while let Some(msg) = pubsub.on_message().next().await { + let payload: String = msg.get_payload().unwrap(); + session_1.text(payload).await.unwrap(); + } + }); + // start task but don't wait for it rt::spawn(async move { + let mut conn = data.cache_pool.get_multiplexed_tokio_connection().await.unwrap(); // receive messages from websocket while let Some(msg) = stream.next().await { match msg { Ok(AggregatedMessage::Text(text)) => { // echo text message - session.text(text).await.unwrap(); + redis::cmd("PUBLISH").arg(&[channel_uuid.to_string(), text.to_string()]).exec_async(&mut conn).await.unwrap(); } Ok(AggregatedMessage::Binary(bin)) => { // echo binary message - session.binary(bin).await.unwrap(); + session_2.binary(bin).await.unwrap(); } Ok(AggregatedMessage::Ping(msg)) => { // respond to PING frame with PONG frame - session.pong(&msg).await.unwrap(); + session_2.pong(&msg).await.unwrap(); } _ => {}