From 5d26f94cdd7fea5fbe336637be635eb986d312cb Mon Sep 17 00:00:00 2001 From: Radical Date: Mon, 26 May 2025 19:17:36 +0200 Subject: [PATCH] style: use ? operator instead of unwrap in websockets --- .../v1/servers/uuid/channels/uuid/socket.rs | 32 ++++++++++--------- src/error.rs | 2 ++ 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/src/api/v1/servers/uuid/channels/uuid/socket.rs b/src/api/v1/servers/uuid/channels/uuid/socket.rs index 8938842..3300e6c 100644 --- a/src/api/v1/servers/uuid/channels/uuid/socket.rs +++ b/src/api/v1/servers/uuid/channels/uuid/socket.rs @@ -62,49 +62,51 @@ pub async fn echo( let mut session_2 = session_1.clone(); rt::spawn(async move { - pubsub.subscribe(channel_uuid.to_string()).await.unwrap(); + pubsub.subscribe(channel_uuid.to_string()).await?; while let Some(msg) = pubsub.on_message().next().await { - let payload: String = msg.get_payload().unwrap(); - session_1.text(payload).await.unwrap(); + let payload: String = msg.get_payload()?; + session_1.text(payload).await?; } + + Ok::<(), crate::error::Error>(()) }); // start task but don't wait for it rt::spawn(async move { - let mut conn = data - .cache_pool - .get_multiplexed_tokio_connection() - .await - .unwrap(); // receive messages from websocket while let Some(msg) = stream.next().await { match msg { Ok(AggregatedMessage::Text(text)) => { - // echo text message + let mut conn = data + .cache_pool + .get_multiplexed_tokio_connection() + .await?; + redis::cmd("PUBLISH") .arg(&[channel_uuid.to_string(), text.to_string()]) .exec_async(&mut conn) - .await - .unwrap(); + .await?; + channel .new_message(&mut data.pool.get().await.unwrap(), uuid, text.to_string()) - .await - .unwrap(); + .await?; } Ok(AggregatedMessage::Binary(bin)) => { // echo binary message - session_2.binary(bin).await.unwrap(); + session_2.binary(bin).await?; } Ok(AggregatedMessage::Ping(msg)) => { // respond to PING frame with PONG frame - session_2.pong(&msg).await.unwrap(); + session_2.pong(&msg).await?; } _ => {} } } + + Ok::<(), crate::error::Error>(()) }); // respond immediately with response connected to WS session diff --git a/src/error.rs b/src/error.rs index ce586ac..fa9524f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -52,6 +52,8 @@ pub enum Error { UrlParseError(#[from] url::ParseError), #[error(transparent)] PayloadError(#[from] PayloadError), + #[error(transparent)] + WsClosed(#[from] actix_ws::Closed), #[error("{0}")] PasswordHashError(String), #[error("{0}")]