From 79cfa258556d27e6ea56c7c28c10fd581ec81a9e Mon Sep 17 00:00:00 2001 From: SauceyRed Date: Thu, 15 May 2025 11:54:20 +0200 Subject: [PATCH 1/6] feat: add basic WebSocket server with echo handler --- Cargo.toml | 2 ++ src/main.rs | 86 ++++++++++++++++++++++++++++++----------------------- src/wss.rs | 40 +++++++++++++++++++++++++ 3 files changed, 91 insertions(+), 37 deletions(-) create mode 100644 src/wss.rs diff --git a/Cargo.toml b/Cargo.toml index 3209341..9510dac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,8 @@ toml = "0.8" url = { version = "2.5", features = ["serde"] } uuid = { version = "1.16", features = ["serde", "v7"] } random-string = "1.1" +actix-ws = "0.3.0" +futures-util = "0.3.31" [dependencies.tokio] version = "1.44" diff --git a/src/main.rs b/src/main.rs index 37011ed..a80ad0e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,16 @@ use actix_cors::Cors; -use actix_web::{App, HttpServer, web}; +use actix_web::{web, App, HttpServer}; use argon2::Argon2; use clap::Parser; +use futures::try_join; use simple_logger::SimpleLogger; use sqlx::{PgPool, Pool, Postgres}; +use wss::echo; use std::time::SystemTime; mod config; use config::{Config, ConfigBuilder}; mod api; +mod wss; pub mod utils; pub mod structs; @@ -169,43 +172,52 @@ async fn main() -> Result<(), Error> { argon2: Argon2::default(), start_time: SystemTime::now(), }; - - HttpServer::new(move || { - // Set CORS headers - let cors = Cors::default() - /* - Set Allowed-Control-Allow-Origin header to whatever - the request's Origin header is. Must be done like this - rather than setting it to "*" due to CORS not allowing - sending of credentials (cookies) with wildcard origin. - */ - .allowed_origin_fn(|_origin, _req_head| { - true - }) - /* - Allows any request method in CORS preflight requests. - This will be restricted to only ones actually in use later. - */ - .allow_any_method() - /* - Allows any header(s) in request in CORS preflight requests. - This wll be restricted to only ones actually in use later. - */ - .allow_any_header() - /* - Allows browser to include cookies in requests. - This is needed for receiving the secure HttpOnly refresh_token cookie. - */ - .supports_credentials(); + try_join!( + HttpServer::new(move || { + // Set CORS headers + let cors = Cors::default() + /* + Set Allowed-Control-Allow-Origin header to whatever + the request's Origin header is. Must be done like this + rather than setting it to "*" due to CORS not allowing + sending of credentials (cookies) with wildcard origin. + */ + .allowed_origin_fn(|_origin, _req_head| { + true + }) + /* + Allows any request method in CORS preflight requests. + This will be restricted to only ones actually in use later. + */ + .allow_any_method() + /* + Allows any header(s) in request in CORS preflight requests. + This wll be restricted to only ones actually in use later. + */ + .allow_any_header() + /* + Allows browser to include cookies in requests. + This is needed for receiving the secure HttpOnly refresh_token cookie. + */ + .supports_credentials(); + + App::new() + .app_data(web::Data::new(data.clone())) + .wrap(cors) + .service(api::web()) + }) + .bind((web.url, web.port))? + .run(), + + HttpServer::new(|| { + App::new() + .route("/servers/{server_id}/channels/{channel_id}", + web::get().to(echo)) + }) + .bind(("0.0.0.0", 4382))? + .run() + )?; - App::new() - .app_data(web::Data::new(data.clone())) - .wrap(cors) - .service(api::web()) - }) - .bind((web.url, web.port))? - .run() - .await?; Ok(()) } diff --git a/src/wss.rs b/src/wss.rs new file mode 100644 index 0000000..9f33f10 --- /dev/null +++ b/src/wss.rs @@ -0,0 +1,40 @@ +use actix_web::{rt, web, Error, HttpRequest, HttpResponse}; +use actix_ws::AggregatedMessage; +use futures_util::StreamExt as _; + +pub async fn echo(req: HttpRequest, stream: web::Payload) -> Result { + let (res, mut session, stream) = actix_ws::handle(&req, stream)?; + + let mut stream = stream + .aggregate_continuations() + // aggregate continuation frames up to 1MiB + .max_continuation_size(2_usize.pow(20)); + + // start task but don't wait for it + rt::spawn(async move { + // receive messages from websocket + while let Some(msg) = stream.next().await { + match msg { + Ok(AggregatedMessage::Text(text)) => { + // echo text message + session.text(text).await.unwrap(); + } + + Ok(AggregatedMessage::Binary(bin)) => { + // echo binary message + session.binary(bin).await.unwrap(); + } + + Ok(AggregatedMessage::Ping(msg)) => { + // respond to PING frame with PONG frame + session.pong(&msg).await.unwrap(); + } + + _ => {} + } + } + }); + + // respond immediately with response connected to WS session + Ok(res) +} \ No newline at end of file -- 2.47.2 From cb3c1ee6e41e0aff7b3e9b48db916e63bd52a741 Mon Sep 17 00:00:00 2001 From: Radical Date: Thu, 15 May 2025 11:57:47 +0000 Subject: [PATCH 2/6] refactor: :zap: move websocket into existing webserver and folder structure keeps things consistent and avoids having 2 webservers running under actix, can be reverted if its not desirable however i think this is the best option --- src/api/v1/servers/uuid/channels/uuid/mod.rs | 1 + .../v1/servers/uuid/channels/uuid/socket.rs} | 5 +- src/api/v1/servers/uuid/mod.rs | 1 + src/main.rs | 84 ++++++++----------- 4 files changed, 41 insertions(+), 50 deletions(-) rename src/{wss.rs => api/v1/servers/uuid/channels/uuid/socket.rs} (91%) diff --git a/src/api/v1/servers/uuid/channels/uuid/mod.rs b/src/api/v1/servers/uuid/channels/uuid/mod.rs index 277052c..81b4dfc 100644 --- a/src/api/v1/servers/uuid/channels/uuid/mod.rs +++ b/src/api/v1/servers/uuid/channels/uuid/mod.rs @@ -1,4 +1,5 @@ pub mod messages; +pub mod socket; use actix_web::{delete, get, web, Error, HttpRequest, HttpResponse}; use crate::{api::v1::auth::check_access_token, structs::{Channel, Member}, utils::get_auth_header, Data}; diff --git a/src/wss.rs b/src/api/v1/servers/uuid/channels/uuid/socket.rs similarity index 91% rename from src/wss.rs rename to src/api/v1/servers/uuid/channels/uuid/socket.rs index 9f33f10..64dcc44 100644 --- a/src/wss.rs +++ b/src/api/v1/servers/uuid/channels/uuid/socket.rs @@ -1,7 +1,8 @@ -use actix_web::{rt, web, Error, HttpRequest, HttpResponse}; +use actix_web::{get, rt, web, Error, HttpRequest, HttpResponse}; use actix_ws::AggregatedMessage; use futures_util::StreamExt as _; +#[get("{uuid}/channels/{channel_uuid}/socket")] pub async fn echo(req: HttpRequest, stream: web::Payload) -> Result { let (res, mut session, stream) = actix_ws::handle(&req, stream)?; @@ -37,4 +38,4 @@ pub async fn echo(req: HttpRequest, stream: web::Payload) -> Result Scope { .service(channels::uuid::get) .service(channels::uuid::delete) .service(channels::uuid::messages::get) + .service(channels::uuid::socket::echo) // Roles .service(roles::get) .service(roles::create) diff --git a/src/main.rs b/src/main.rs index a80ad0e..0ba2f69 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,15 +2,12 @@ use actix_cors::Cors; use actix_web::{web, App, HttpServer}; use argon2::Argon2; use clap::Parser; -use futures::try_join; use simple_logger::SimpleLogger; use sqlx::{PgPool, Pool, Postgres}; -use wss::echo; use std::time::SystemTime; mod config; use config::{Config, ConfigBuilder}; mod api; -mod wss; pub mod utils; pub mod structs; @@ -173,51 +170,42 @@ async fn main() -> Result<(), Error> { start_time: SystemTime::now(), }; - try_join!( - HttpServer::new(move || { - // Set CORS headers - let cors = Cors::default() - /* - Set Allowed-Control-Allow-Origin header to whatever - the request's Origin header is. Must be done like this - rather than setting it to "*" due to CORS not allowing - sending of credentials (cookies) with wildcard origin. - */ - .allowed_origin_fn(|_origin, _req_head| { - true - }) - /* - Allows any request method in CORS preflight requests. - This will be restricted to only ones actually in use later. - */ - .allow_any_method() - /* - Allows any header(s) in request in CORS preflight requests. - This wll be restricted to only ones actually in use later. - */ - .allow_any_header() - /* - Allows browser to include cookies in requests. - This is needed for receiving the secure HttpOnly refresh_token cookie. - */ - .supports_credentials(); - - App::new() - .app_data(web::Data::new(data.clone())) - .wrap(cors) - .service(api::web()) - }) - .bind((web.url, web.port))? - .run(), - - HttpServer::new(|| { - App::new() - .route("/servers/{server_id}/channels/{channel_id}", - web::get().to(echo)) - }) - .bind(("0.0.0.0", 4382))? - .run() - )?; + HttpServer::new(move || { + // Set CORS headers + let cors = Cors::default() + /* + Set Allowed-Control-Allow-Origin header to whatever + the request's Origin header is. Must be done like this + rather than setting it to "*" due to CORS not allowing + sending of credentials (cookies) with wildcard origin. + */ + .allowed_origin_fn(|_origin, _req_head| { + true + }) + /* + Allows any request method in CORS preflight requests. + This will be restricted to only ones actually in use later. + */ + .allow_any_method() + /* + Allows any header(s) in request in CORS preflight requests. + This wll be restricted to only ones actually in use later. + */ + .allow_any_header() + /* + Allows browser to include cookies in requests. + This is needed for receiving the secure HttpOnly refresh_token cookie. + */ + .supports_credentials(); + + App::new() + .app_data(web::Data::new(data.clone())) + .wrap(cors) + .service(api::web()) + }) + .bind((web.url, web.port))? + .run() + .await?; Ok(()) } -- 2.47.2 From b23783dda35babb26d8914fc721df4f9dc551b16 Mon Sep 17 00:00:00 2001 From: Radical Date: Thu, 15 May 2025 17:54:10 +0200 Subject: [PATCH 3/6] feat: add auth and check if server/channel exists before opening ws connection --- .../v1/servers/uuid/channels/uuid/socket.rs | 61 ++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/src/api/v1/servers/uuid/channels/uuid/socket.rs b/src/api/v1/servers/uuid/channels/uuid/socket.rs index 64dcc44..2d1cdd8 100644 --- a/src/api/v1/servers/uuid/channels/uuid/socket.rs +++ b/src/api/v1/servers/uuid/channels/uuid/socket.rs @@ -1,9 +1,68 @@ use actix_web::{get, rt, web, Error, HttpRequest, HttpResponse}; use actix_ws::AggregatedMessage; use futures_util::StreamExt as _; +use uuid::Uuid; +use log::error; + +use crate::{api::v1::auth::check_access_token, structs::{Channel, Member}, utils::get_auth_header, Data}; #[get("{uuid}/channels/{channel_uuid}/socket")] -pub async fn echo(req: HttpRequest, stream: web::Payload) -> Result { +pub async fn echo(req: HttpRequest, path: web::Path<(Uuid, Uuid)>, stream: web::Payload, data: web::Data) -> Result { + // Get all headers + let headers = req.headers(); + + // Retrieve auth header + let auth_header = get_auth_header(headers); + + if let Err(error) = auth_header { + return Ok(error) + } + + // Get uuids from path + let (guild_uuid, channel_uuid) = path.into_inner(); + + // Authorize client using auth header + let authorized = check_access_token(auth_header.unwrap(), &data.pool).await; + + if let Err(error) = authorized { + return Ok(error) + } + + // Unwrap user uuid from authorization + let uuid = authorized.unwrap(); + + // Get server member from psql + let member = Member::fetch_one(&data.pool, uuid, guild_uuid).await; + + if let Err(error) = member { + return Ok(error); + } + + // Get cache for channel + let cache_result = data.get_cache_key(format!("{}", channel_uuid)).await; + + let channel: Channel; + + // Return channel cache or result from psql as `channel` variable + if let Ok(cache_hit) = cache_result { + channel = serde_json::from_str(&cache_hit).unwrap() + } else { + let channel_result = Channel::fetch_one(&data.pool, guild_uuid, channel_uuid).await; + + if let Err(error) = channel_result { + return Ok(error) + } + + channel = channel_result.unwrap(); + + let cache_result = data.set_cache_key(format!("{}", channel_uuid), channel.clone(), 60).await; + + if let Err(error) = cache_result { + error!("{}", error); + return Ok(HttpResponse::InternalServerError().finish()); + } + } + let (res, mut session, stream) = actix_ws::handle(&req, stream)?; let mut stream = stream -- 2.47.2 From 95964e6fecd2fb88a3027bd3b50460c8839f6218 Mon Sep 17 00:00:00 2001 From: Radical Date: Thu, 15 May 2025 23:43:39 +0200 Subject: [PATCH 4/6] feat: add rough message sending Doesnt get stored in psql and is currently done without any error handling --- .../v1/servers/uuid/channels/uuid/socket.rs | 27 ++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/src/api/v1/servers/uuid/channels/uuid/socket.rs b/src/api/v1/servers/uuid/channels/uuid/socket.rs index 2d1cdd8..8f8a15c 100644 --- a/src/api/v1/servers/uuid/channels/uuid/socket.rs +++ b/src/api/v1/servers/uuid/channels/uuid/socket.rs @@ -63,31 +63,50 @@ pub async fn echo(req: HttpRequest, path: web::Path<(Uuid, Uuid)>, stream: web:: } } - let (res, mut session, stream) = actix_ws::handle(&req, stream)?; + let (res, mut session_1, stream) = actix_ws::handle(&req, stream)?; let mut stream = stream .aggregate_continuations() // aggregate continuation frames up to 1MiB .max_continuation_size(2_usize.pow(20)); + let pubsub_result = data.cache_pool.get_async_pubsub().await; + + if let Err(error) = pubsub_result { + error!("{}", error); + return Ok(HttpResponse::InternalServerError().finish()) + } + + let mut session_2 = session_1.clone(); + + rt::spawn(async move { + let mut pubsub = pubsub_result.unwrap(); + pubsub.subscribe(channel_uuid.to_string()).await.unwrap(); + while let Some(msg) = pubsub.on_message().next().await { + let payload: String = msg.get_payload().unwrap(); + session_1.text(payload).await.unwrap(); + } + }); + // start task but don't wait for it rt::spawn(async move { + let mut conn = data.cache_pool.get_multiplexed_tokio_connection().await.unwrap(); // receive messages from websocket while let Some(msg) = stream.next().await { match msg { Ok(AggregatedMessage::Text(text)) => { // echo text message - session.text(text).await.unwrap(); + redis::cmd("PUBLISH").arg(&[channel_uuid.to_string(), text.to_string()]).exec_async(&mut conn).await.unwrap(); } Ok(AggregatedMessage::Binary(bin)) => { // echo binary message - session.binary(bin).await.unwrap(); + session_2.binary(bin).await.unwrap(); } Ok(AggregatedMessage::Ping(msg)) => { // respond to PING frame with PONG frame - session.pong(&msg).await.unwrap(); + session_2.pong(&msg).await.unwrap(); } _ => {} -- 2.47.2 From c5d14ac063895feaa81f27c2af1d61bf53dc90fd Mon Sep 17 00:00:00 2001 From: Radical Date: Sat, 17 May 2025 11:04:48 +0200 Subject: [PATCH 5/6] feat: add message storing in DB UNTESTED! Should work but might be really slow --- .../v1/servers/uuid/channels/uuid/socket.rs | 1 + src/structs.rs | 21 +++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/src/api/v1/servers/uuid/channels/uuid/socket.rs b/src/api/v1/servers/uuid/channels/uuid/socket.rs index 8f8a15c..5b9fa73 100644 --- a/src/api/v1/servers/uuid/channels/uuid/socket.rs +++ b/src/api/v1/servers/uuid/channels/uuid/socket.rs @@ -97,6 +97,7 @@ pub async fn echo(req: HttpRequest, path: web::Path<(Uuid, Uuid)>, stream: web:: Ok(AggregatedMessage::Text(text)) => { // echo text message redis::cmd("PUBLISH").arg(&[channel_uuid.to_string(), text.to_string()]).exec_async(&mut conn).await.unwrap(); + channel.new_message(&data.pool, uuid, text.to_string()).await.unwrap(); } Ok(AggregatedMessage::Binary(bin)) => { diff --git a/src/structs.rs b/src/structs.rs index ac34e9d..cde4f7e 100644 --- a/src/structs.rs +++ b/src/structs.rs @@ -186,6 +186,27 @@ impl Channel { Ok(message_builders.iter().map(|b| b.build()).collect()) } + + pub async fn new_message(&self, pool: &Pool, user_uuid: Uuid, message: String) -> Result { + let message_uuid = Uuid::now_v7(); + + let row = sqlx::query(&format!("INSERT INTO messages (uuid, channel_uuid, user_uuid, message) VALUES ('{}', '{}', '{}', $1", message_uuid, self.uuid, user_uuid)) + .bind(&message) + .execute(pool) + .await; + + if let Err(error) = row { + error!("{}", error); + return Err(HttpResponse::InternalServerError().finish()); + } + + Ok(Message { + uuid: message_uuid, + channel_uuid: self.uuid, + user_uuid, + message, + }) + } } #[derive(Clone, Copy)] -- 2.47.2 From cee8b55599884414bd65ee778734db3373c00f71 Mon Sep 17 00:00:00 2001 From: Radical Date: Sat, 17 May 2025 14:11:57 +0200 Subject: [PATCH 6/6] fix: make message storing/fetching work properly --- src/api/v1/servers/uuid/channels/uuid/messages.rs | 2 +- src/structs.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/api/v1/servers/uuid/channels/uuid/messages.rs b/src/api/v1/servers/uuid/channels/uuid/messages.rs index 3bfeaaa..928afbe 100644 --- a/src/api/v1/servers/uuid/channels/uuid/messages.rs +++ b/src/api/v1/servers/uuid/channels/uuid/messages.rs @@ -11,7 +11,7 @@ struct MessageRequest { } #[get("{uuid}/channels/{channel_uuid}/messages")] -pub async fn get(req: HttpRequest, path: web::Path<(Uuid, Uuid)>, message_request: web::Json, data: web::Data) -> Result { +pub async fn get(req: HttpRequest, path: web::Path<(Uuid, Uuid)>, message_request: web::Query, data: web::Data) -> Result { let headers = req.headers(); let auth_header = get_auth_header(headers); diff --git a/src/structs.rs b/src/structs.rs index cde4f7e..44862c0 100644 --- a/src/structs.rs +++ b/src/structs.rs @@ -171,7 +171,7 @@ impl Channel { } pub async fn fetch_messages(&self, pool: &Pool, amount: i64, offset: i64) -> Result, HttpResponse> { - let row = sqlx::query_as(&format!("SELECT uuid, user_uuid, message FROM channels WHERE channel_uuid = '{}' ORDER BY uuid LIMIT $1 OFFSET $2", self.uuid)) + let row = sqlx::query_as(&format!("SELECT CAST(uuid AS VARCHAR), CAST(user_uuid AS VARCHAR), CAST(channel_uuid AS VARCHAR), message FROM messages WHERE channel_uuid = '{}' ORDER BY uuid DESC LIMIT $1 OFFSET $2", self.uuid)) .bind(amount) .bind(offset) .fetch_all(pool) @@ -190,7 +190,7 @@ impl Channel { pub async fn new_message(&self, pool: &Pool, user_uuid: Uuid, message: String) -> Result { let message_uuid = Uuid::now_v7(); - let row = sqlx::query(&format!("INSERT INTO messages (uuid, channel_uuid, user_uuid, message) VALUES ('{}', '{}', '{}', $1", message_uuid, self.uuid, user_uuid)) + let row = sqlx::query(&format!("INSERT INTO messages (uuid, channel_uuid, user_uuid, message) VALUES ('{}', '{}', '{}', $1)", message_uuid, self.uuid, user_uuid)) .bind(&message) .execute(pool) .await; -- 2.47.2