messaging implementation using valkey pubsub and websockets #12

Merged
radical merged 7 commits from wip/messaging-wss into main 2025-05-18 18:14:41 +00:00
3 changed files with 91 additions and 37 deletions
Showing only changes of commit 79cfa25855 - Show all commits

View file

@ -28,6 +28,8 @@ toml = "0.8"
url = { version = "2.5", features = ["serde"] } url = { version = "2.5", features = ["serde"] }
uuid = { version = "1.16", features = ["serde", "v7"] } uuid = { version = "1.16", features = ["serde", "v7"] }
random-string = "1.1" random-string = "1.1"
actix-ws = "0.3.0"
futures-util = "0.3.31"
[dependencies.tokio] [dependencies.tokio]
version = "1.44" version = "1.44"

View file

@ -1,13 +1,16 @@
use actix_cors::Cors; use actix_cors::Cors;
use actix_web::{App, HttpServer, web}; use actix_web::{web, App, HttpServer};
use argon2::Argon2; use argon2::Argon2;
use clap::Parser; use clap::Parser;
use futures::try_join;
use simple_logger::SimpleLogger; use simple_logger::SimpleLogger;
use sqlx::{PgPool, Pool, Postgres}; use sqlx::{PgPool, Pool, Postgres};
use wss::echo;
use std::time::SystemTime; use std::time::SystemTime;
mod config; mod config;
use config::{Config, ConfigBuilder}; use config::{Config, ConfigBuilder};
mod api; mod api;
mod wss;
pub mod utils; pub mod utils;
pub mod structs; pub mod structs;
@ -170,7 +173,7 @@ async fn main() -> Result<(), Error> {
start_time: SystemTime::now(), start_time: SystemTime::now(),
}; };
try_join!(
HttpServer::new(move || { HttpServer::new(move || {
// Set CORS headers // Set CORS headers
let cors = Cors::default() let cors = Cors::default()
@ -205,7 +208,16 @@ async fn main() -> Result<(), Error> {
.service(api::web()) .service(api::web())
}) })
.bind((web.url, web.port))? .bind((web.url, web.port))?
.run(),
HttpServer::new(|| {
App::new()
.route("/servers/{server_id}/channels/{channel_id}",
web::get().to(echo))
})
.bind(("0.0.0.0", 4382))?
.run() .run()
.await?; )?;
Ok(()) Ok(())
} }

40
src/wss.rs Normal file
View file

@ -0,0 +1,40 @@
use actix_web::{rt, web, Error, HttpRequest, HttpResponse};
use actix_ws::AggregatedMessage;
use futures_util::StreamExt as _;
pub async fn echo(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
let (res, mut session, stream) = actix_ws::handle(&req, stream)?;
let mut stream = stream
.aggregate_continuations()
// aggregate continuation frames up to 1MiB
.max_continuation_size(2_usize.pow(20));
// start task but don't wait for it
rt::spawn(async move {
// receive messages from websocket
while let Some(msg) = stream.next().await {
match msg {
Ok(AggregatedMessage::Text(text)) => {
// echo text message
session.text(text).await.unwrap();
}
Ok(AggregatedMessage::Binary(bin)) => {
// echo binary message
session.binary(bin).await.unwrap();
}
Ok(AggregatedMessage::Ping(msg)) => {
// respond to PING frame with PONG frame
session.pong(&msg).await.unwrap();
}
_ => {}
}
}
});
// respond immediately with response connected to WS session
Ok(res)
}