add test implementation for messaging
This commit is contained in:
parent
bcf857d6b2
commit
8fcfef59d3
6 changed files with 141 additions and 1 deletions
|
@ -1,2 +1,5 @@
|
|||
pub mod v1;
|
||||
pub mod versions;
|
||||
|
||||
// This is purely for testing, will be deleted at a later date
|
||||
pub mod v0;
|
61
src/api/v0/channel.rs
Normal file
61
src/api/v0/channel.rs
Normal file
|
@ -0,0 +1,61 @@
|
|||
use actix_web::{Error, HttpResponse, error, post, web};
|
||||
use futures::StreamExt;
|
||||
use log::error;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::prelude::FromRow;
|
||||
|
||||
use crate::{Data, api::v1::auth::check_access_token};
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct Request {
|
||||
access_token: String,
|
||||
start: i32,
|
||||
amount: i32,
|
||||
}
|
||||
|
||||
#[derive(Serialize, FromRow)]
|
||||
struct Response {
|
||||
uuid: String,
|
||||
message: String,
|
||||
}
|
||||
|
||||
const MAX_SIZE: usize = 262_144;
|
||||
|
||||
#[post("/channel")]
|
||||
pub async fn res(
|
||||
mut payload: web::Payload,
|
||||
data: web::Data<Data>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
let mut body = web::BytesMut::new();
|
||||
while let Some(chunk) = payload.next().await {
|
||||
let chunk = chunk?;
|
||||
// limit max size of in-memory payload
|
||||
if (body.len() + chunk.len()) > MAX_SIZE {
|
||||
return Err(error::ErrorBadRequest("overflow"));
|
||||
}
|
||||
body.extend_from_slice(&chunk);
|
||||
}
|
||||
|
||||
let request = serde_json::from_slice::<Request>(&body)?;
|
||||
|
||||
let authorized = check_access_token(request.access_token, &data.pool).await;
|
||||
|
||||
if let Err(error) = authorized {
|
||||
return Ok(error);
|
||||
}
|
||||
|
||||
let row = sqlx::query_as("SELECT timestamp, (uuid AS VARCHAR), message FROM channel ORDERED BY timestamp DESC LIMIT $1 OFFSET $2")
|
||||
.bind(request.amount)
|
||||
.bind(request.start)
|
||||
.fetch_all(&data.pool)
|
||||
.await;
|
||||
|
||||
if let Err(error) = row {
|
||||
error!("{}", error);
|
||||
return Ok(HttpResponse::InternalServerError().finish());
|
||||
}
|
||||
|
||||
let messages: Vec<Response> = row.unwrap();
|
||||
|
||||
Ok(HttpResponse::Ok().json(messages))
|
||||
}
|
10
src/api/v0/mod.rs
Normal file
10
src/api/v0/mod.rs
Normal file
|
@ -0,0 +1,10 @@
|
|||
use actix_web::{Scope, web};
|
||||
|
||||
mod channel;
|
||||
mod send;
|
||||
|
||||
pub fn web() -> Scope {
|
||||
web::scope("/v1")
|
||||
.service(channel::res)
|
||||
.service(send::res)
|
||||
}
|
60
src/api/v0/send.rs
Normal file
60
src/api/v0/send.rs
Normal file
|
@ -0,0 +1,60 @@
|
|||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
use actix_web::{Error, HttpResponse, error, post, web};
|
||||
use futures::StreamExt;
|
||||
use log::error;
|
||||
use serde::Deserialize;
|
||||
|
||||
use crate::{Data, api::v1::auth::check_access_token};
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct Request {
|
||||
access_token: String,
|
||||
message: String,
|
||||
}
|
||||
|
||||
const MAX_SIZE: usize = 262_144;
|
||||
|
||||
#[post("/channel")]
|
||||
pub async fn res(
|
||||
mut payload: web::Payload,
|
||||
data: web::Data<Data>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
let mut body = web::BytesMut::new();
|
||||
while let Some(chunk) = payload.next().await {
|
||||
let chunk = chunk?;
|
||||
// limit max size of in-memory payload
|
||||
if (body.len() + chunk.len()) > MAX_SIZE {
|
||||
return Err(error::ErrorBadRequest("overflow"));
|
||||
}
|
||||
body.extend_from_slice(&chunk);
|
||||
}
|
||||
|
||||
let request = serde_json::from_slice::<Request>(&body)?;
|
||||
|
||||
let authorized = check_access_token(request.access_token, &data.pool).await;
|
||||
|
||||
if let Err(error) = authorized {
|
||||
return Ok(error);
|
||||
}
|
||||
|
||||
let uuid = authorized.unwrap();
|
||||
|
||||
let current_time = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs() as i64;
|
||||
|
||||
let row = sqlx::query(&format!("INSERT INTO channel (timestamp, uuid, message) VALUES ($1, '{}', $2)", uuid))
|
||||
.bind(current_time)
|
||||
.bind(request.message)
|
||||
.execute(&data.pool)
|
||||
.await;
|
||||
|
||||
if let Err(error) = row {
|
||||
error!("{}", error);
|
||||
return Ok(HttpResponse::InternalServerError().finish());
|
||||
}
|
||||
|
||||
Ok(HttpResponse::Ok().finish())
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
use actix_web::{Scope, web};
|
||||
|
||||
mod auth;
|
||||
pub mod auth;
|
||||
mod stats;
|
||||
mod users;
|
||||
|
||||
|
|
|
@ -71,6 +71,11 @@ async fn main() -> Result<(), Error> {
|
|||
refresh_token varchar(64) UNIQUE NOT NULL REFERENCES refresh_tokens(token),
|
||||
uuid uuid NOT NULL REFERENCES users(uuid),
|
||||
created int8 NOT NULL
|
||||
);
|
||||
CREATE TABLE IF NOT EXISTS channel (
|
||||
timestamp int8 PRIMARY KEY NOT NULL,
|
||||
uuid uuid NOT NULL REFERENCES users(uuid),
|
||||
message varchar(2000) NOT NULL,
|
||||
)
|
||||
"#,
|
||||
)
|
||||
|
@ -90,6 +95,7 @@ async fn main() -> Result<(), Error> {
|
|||
.app_data(web::Data::new(data.clone()))
|
||||
.service(api::versions::res)
|
||||
.service(api::v1::web())
|
||||
.service(api::v0::web())
|
||||
})
|
||||
.bind((web.url, web.port))?
|
||||
.run()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue