feat(relays): implemented user public key fetching and restructured runtime relay manager, so it uses a message queue in order to get shared mutable state
This commit is contained in:
parent
d4a89cb0f8
commit
28d3d03661
|
@ -30,7 +30,7 @@ dependencies = [
|
||||||
"actix-service",
|
"actix-service",
|
||||||
"actix-utils",
|
"actix-utils",
|
||||||
"ahash",
|
"ahash",
|
||||||
"base64",
|
"base64 0.21.7",
|
||||||
"bitflags 2.4.2",
|
"bitflags 2.4.2",
|
||||||
"brotli",
|
"brotli",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
@ -189,7 +189,7 @@ checksum = "1d613edf08a42ccc6864c941d30fe14e1b676a77d16f1dbadc1174d065a0a775"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"actix-utils",
|
"actix-utils",
|
||||||
"actix-web",
|
"actix-web",
|
||||||
"base64",
|
"base64 0.21.7",
|
||||||
"futures-core",
|
"futures-core",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"log",
|
"log",
|
||||||
|
@ -371,6 +371,22 @@ version = "0.21.7"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
|
checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "base64"
|
||||||
|
version = "0.22.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "9475866fec1451be56a3c2400fd081ff546538961565ccb5b7142cbd22bc7a51"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "base64-serde"
|
||||||
|
version = "0.7.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "ba368df5de76a5bea49aaf0cf1b39ccfbbef176924d1ba5db3e4135216cbe3c7"
|
||||||
|
dependencies = [
|
||||||
|
"base64 0.21.7",
|
||||||
|
"serde",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "base64ct"
|
name = "base64ct"
|
||||||
version = "1.6.0"
|
version = "1.6.0"
|
||||||
|
@ -1029,6 +1045,8 @@ dependencies = [
|
||||||
"actix-web-httpauth",
|
"actix-web-httpauth",
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"argon2",
|
"argon2",
|
||||||
|
"base64 0.22.0",
|
||||||
|
"base64-serde",
|
||||||
"chrono",
|
"chrono",
|
||||||
"compile-time-run",
|
"compile-time-run",
|
||||||
"dotenvy",
|
"dotenvy",
|
||||||
|
@ -1038,6 +1056,7 @@ dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
"sqlx",
|
"sqlx",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
|
"tokio",
|
||||||
"uuid",
|
"uuid",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -1895,7 +1914,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "1ed31390216d20e538e447a7a9b959e06ed9fc51c37b514b46eb758016ecd418"
|
checksum = "1ed31390216d20e538e447a7a9b959e06ed9fc51c37b514b46eb758016ecd418"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"atoi",
|
"atoi",
|
||||||
"base64",
|
"base64 0.21.7",
|
||||||
"bitflags 2.4.2",
|
"bitflags 2.4.2",
|
||||||
"byteorder",
|
"byteorder",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
@ -1938,7 +1957,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "7c824eb80b894f926f89a0b9da0c7f435d27cdd35b8c655b114e58223918577e"
|
checksum = "7c824eb80b894f926f89a0b9da0c7f435d27cdd35b8c655b114e58223918577e"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"atoi",
|
"atoi",
|
||||||
"base64",
|
"base64 0.21.7",
|
||||||
"bitflags 2.4.2",
|
"bitflags 2.4.2",
|
||||||
"byteorder",
|
"byteorder",
|
||||||
"chrono",
|
"chrono",
|
||||||
|
|
|
@ -10,6 +10,8 @@ license = "MIT"
|
||||||
[dependencies]
|
[dependencies]
|
||||||
anyhow = "1.0.81"
|
anyhow = "1.0.81"
|
||||||
argon2 = "0.5.3"
|
argon2 = "0.5.3"
|
||||||
|
base64 = "0.22.0"
|
||||||
|
base64-serde = "0.7.0"
|
||||||
chrono = "0.4.35"
|
chrono = "0.4.35"
|
||||||
compile-time-run = "0.2.12"
|
compile-time-run = "0.2.12"
|
||||||
dotenvy = "0.15.7"
|
dotenvy = "0.15.7"
|
||||||
|
@ -19,6 +21,7 @@ rand = "0.8.5"
|
||||||
serde = { version = "1.0.197", features = ["default"] }
|
serde = { version = "1.0.197", features = ["default"] }
|
||||||
thiserror = "1.0.58"
|
thiserror = "1.0.58"
|
||||||
uuid = { version = "1.7.0", features = ["v4"] }
|
uuid = { version = "1.7.0", features = ["v4"] }
|
||||||
|
tokio = { version = "1.36.0", features = ["sync"] }
|
||||||
|
|
||||||
actix-web = "4.5.1"
|
actix-web = "4.5.1"
|
||||||
actix-web-httpauth = "0.8.1"
|
actix-web-httpauth = "0.8.1"
|
||||||
|
|
|
@ -2,7 +2,6 @@ pub mod invite;
|
||||||
|
|
||||||
use crate::backend::Backend;
|
use crate::backend::Backend;
|
||||||
use actix_web::{post, web, HttpResponse, Responder};
|
use actix_web::{post, web, HttpResponse, Responder};
|
||||||
use log::error;
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
@ -59,7 +58,7 @@ pub async fn auth(backend: web::Data<Backend>, body: web::Json<AuthRequest>) ->
|
||||||
|
|
||||||
match backend.authenticate(userid, body.password).await {
|
match backend.authenticate(userid, body.password).await {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("{e}");
|
log::error!("{e}");
|
||||||
HttpResponse::InternalServerError().finish()
|
HttpResponse::InternalServerError().finish()
|
||||||
}
|
}
|
||||||
Ok(res) => match res {
|
Ok(res) => match res {
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
pub mod relay_id;
|
||||||
|
|
||||||
use crate::backend::Backend;
|
use crate::backend::Backend;
|
||||||
use actix_web::{post, web, HttpResponse, Responder};
|
use actix_web::{post, web, HttpResponse, Responder};
|
||||||
use actix_web_httpauth::extractors::bearer::BearerAuth;
|
use actix_web_httpauth::extractors::bearer::BearerAuth;
|
|
@ -0,0 +1 @@
|
||||||
|
pub mod user;
|
|
@ -0,0 +1 @@
|
||||||
|
pub mod user_id;
|
|
@ -0,0 +1,43 @@
|
||||||
|
use crate::{backend::Backend, Base64Encoding};
|
||||||
|
use actix_web::{get, web, HttpResponse, Responder};
|
||||||
|
use actix_web_httpauth::extractors::bearer::BearerAuth;
|
||||||
|
use log::error;
|
||||||
|
use serde::Serialize;
|
||||||
|
use std::str::FromStr;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
struct PublicKeyResponse {
|
||||||
|
#[serde(with = "Base64Encoding")]
|
||||||
|
public_key: [u8; 32],
|
||||||
|
}
|
||||||
|
|
||||||
|
#[get("/relay/{relay_id}/user/{user_id}/public_key")]
|
||||||
|
pub async fn public_key(
|
||||||
|
backend: web::Data<Backend>,
|
||||||
|
auth: BearerAuth,
|
||||||
|
path: web::Path<(String, String)>,
|
||||||
|
) -> impl Responder {
|
||||||
|
let relay_id = match Uuid::from_str(path.0.as_str()) {
|
||||||
|
Err(_) => return HttpResponse::BadRequest().finish(),
|
||||||
|
Ok(uuid) => uuid,
|
||||||
|
};
|
||||||
|
let user_id = match Uuid::from_str(path.1.as_str()) {
|
||||||
|
Err(_) => return HttpResponse::BadRequest().finish(),
|
||||||
|
Ok(uuid) => uuid,
|
||||||
|
};
|
||||||
|
|
||||||
|
match backend
|
||||||
|
.get_public_key_relay(auth.token(), relay_id, user_id)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Err(e) => {
|
||||||
|
error!("{e}");
|
||||||
|
HttpResponse::InternalServerError().finish()
|
||||||
|
}
|
||||||
|
Ok(res) => match res {
|
||||||
|
Err(e) => e.into(),
|
||||||
|
Ok(public_key) => HttpResponse::Ok().json(PublicKeyResponse { public_key }),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,7 +1,6 @@
|
||||||
use crate::{backend::Backend, config::Config};
|
use crate::{backend::Backend, config::Config};
|
||||||
use actix_web::{web, App, HttpServer};
|
use actix_web::{web, App, HttpServer};
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use log::info;
|
|
||||||
|
|
||||||
mod endpoints;
|
mod endpoints;
|
||||||
|
|
||||||
|
@ -14,10 +13,11 @@ pub async fn start(config: &Config, backend: Backend) -> Result<()> {
|
||||||
.service(endpoints::account::auth)
|
.service(endpoints::account::auth)
|
||||||
.service(endpoints::account::invite::new)
|
.service(endpoints::account::invite::new)
|
||||||
.service(endpoints::relay::create)
|
.service(endpoints::relay::create)
|
||||||
|
.service(endpoints::relay::relay_id::user::user_id::public_key)
|
||||||
})
|
})
|
||||||
.bind((config.addr.as_str(), config.port))?;
|
.bind((config.addr.as_str(), config.port))?;
|
||||||
|
|
||||||
info!("API starting");
|
log::info!("API starting");
|
||||||
if let Some(threads) = config.threads {
|
if let Some(threads) = config.threads {
|
||||||
server.workers(threads as usize).run().await?;
|
server.workers(threads as usize).run().await?;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use sqlx::types::chrono::NaiveDateTime;
|
use sqlx::{types::chrono::NaiveDateTime, FromRow};
|
||||||
|
|
||||||
pub struct InviteTokensRow {
|
pub struct InviteTokensRow {
|
||||||
pub token: String,
|
pub token: String,
|
||||||
|
@ -22,9 +22,10 @@ pub struct RelaysRow {
|
||||||
pub secret: String,
|
pub secret: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(FromRow)]
|
||||||
pub struct RelayIndexRow {
|
pub struct RelayIndexRow {
|
||||||
|
pub private_id: String,
|
||||||
pub public_id: String,
|
pub public_id: String,
|
||||||
pub public_key: [u8; 32],
|
pub public_key: Vec<u8>,
|
||||||
pub auth: String,
|
|
||||||
pub name: String,
|
pub name: String,
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,17 +5,18 @@ mod relay;
|
||||||
mod tokens;
|
mod tokens;
|
||||||
mod user;
|
mod user;
|
||||||
|
|
||||||
use crate::{backend::relay::Relay, config::Config};
|
use crate::config::Config;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use log::info;
|
|
||||||
use sqlx::MySqlPool;
|
use sqlx::MySqlPool;
|
||||||
use std::collections::HashMap;
|
use tokio::sync::{mpsc, oneshot};
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Backend {
|
pub struct Backend {
|
||||||
pub pool: MySqlPool,
|
pub pool: MySqlPool,
|
||||||
pub relays: HashMap<Uuid, Relay>,
|
pub relays_manager: mpsc::UnboundedSender<(
|
||||||
|
relay::manager::Request,
|
||||||
|
oneshot::Sender<relay::manager::Response>,
|
||||||
|
)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Backend {
|
impl Backend {
|
||||||
|
@ -60,11 +61,13 @@ impl Backend {
|
||||||
.execute(&pool)
|
.execute(&pool)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
info!("Backend initialized");
|
log::info!("Database initialized");
|
||||||
|
|
||||||
|
let tx = relay::manager::start(pool.to_owned()).await;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
pool,
|
pool,
|
||||||
relays: HashMap::new(),
|
relays_manager: tx,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,154 @@
|
||||||
|
use crate::backend::{
|
||||||
|
db_structures::{RelayIndexRow, RelaysRow},
|
||||||
|
error::Error,
|
||||||
|
relay::{Relay, UserIndex},
|
||||||
|
};
|
||||||
|
use anyhow::{anyhow, bail, Result};
|
||||||
|
use sqlx::MySqlPool;
|
||||||
|
use std::{
|
||||||
|
collections::{HashMap, VecDeque},
|
||||||
|
fmt::{Display, Formatter},
|
||||||
|
str::FromStr,
|
||||||
|
};
|
||||||
|
use tokio::sync::{
|
||||||
|
mpsc,
|
||||||
|
mpsc::{UnboundedReceiver, UnboundedSender},
|
||||||
|
oneshot,
|
||||||
|
};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
pub enum Request {
|
||||||
|
FetchUserPublicKey {
|
||||||
|
requester: Uuid,
|
||||||
|
relay: Uuid,
|
||||||
|
user: Uuid,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum Response {
|
||||||
|
FetchUserPublicKey(Result<Result<[u8; 32], Error>>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for Response {
|
||||||
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(
|
||||||
|
f,
|
||||||
|
"Response::{}",
|
||||||
|
match self {
|
||||||
|
Response::FetchUserPublicKey(_) => "FetchUserPublicKey",
|
||||||
|
}
|
||||||
|
)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Manager {
|
||||||
|
pub pool: MySqlPool,
|
||||||
|
pub relays: HashMap<Uuid, Relay>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Manager {
|
||||||
|
/// Returns a reference to a relay. If the relay is not loaded into the
|
||||||
|
/// runtime manager, it will be loaded by this function.
|
||||||
|
pub async fn get_relay(&mut self, relay_id: Uuid) -> Result<Result<&Relay, Error>> {
|
||||||
|
if self.relays.get(&relay_id).is_none() {
|
||||||
|
match sqlx::query_as!(
|
||||||
|
RelaysRow,
|
||||||
|
r#"SELECT * FROM Relays WHERE id = ? LIMIT 1;"#,
|
||||||
|
relay_id.as_bytes().as_slice()
|
||||||
|
)
|
||||||
|
.fetch_one(&self.pool)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Err(sqlx::Error::RowNotFound) => return Ok(Err(Error::RelayNotFound)),
|
||||||
|
Err(e) => return Err(e.into()),
|
||||||
|
Ok(_) => {
|
||||||
|
let mut user_index = HashMap::new();
|
||||||
|
let mut user_map = HashMap::new();
|
||||||
|
|
||||||
|
let rows: Vec<RelayIndexRow> =
|
||||||
|
sqlx::query_as(&format!("SELECT * FROM RelayIndex_{};", relay_id.simple()))
|
||||||
|
.fetch_all(&self.pool)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
for row in rows {
|
||||||
|
let public_id = Uuid::from_str(&row.public_id)?;
|
||||||
|
|
||||||
|
user_map.insert(Uuid::from_str(&row.private_id)?, public_id);
|
||||||
|
user_index.insert(
|
||||||
|
public_id,
|
||||||
|
UserIndex {
|
||||||
|
public_key: row.public_key.as_slice().try_into()?,
|
||||||
|
name: row.name,
|
||||||
|
messages: VecDeque::new(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let relay = Relay {
|
||||||
|
user_index,
|
||||||
|
user_map,
|
||||||
|
};
|
||||||
|
|
||||||
|
self.relays.insert(relay_id, relay);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(relay) = self.relays.get(&relay_id) {
|
||||||
|
Ok(Ok(relay))
|
||||||
|
} else {
|
||||||
|
bail!("!!! This should never happen !!! relay not found in runtime manager, even tho it was there before.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_request(
|
||||||
|
&mut self,
|
||||||
|
request: Request,
|
||||||
|
tx: oneshot::Sender<Response>,
|
||||||
|
) -> Result<()> {
|
||||||
|
match request {
|
||||||
|
Request::FetchUserPublicKey {
|
||||||
|
requester,
|
||||||
|
relay,
|
||||||
|
user,
|
||||||
|
} => tx
|
||||||
|
.send(Response::FetchUserPublicKey(
|
||||||
|
self.get_public_key(requester, relay, user).await,
|
||||||
|
))
|
||||||
|
.map_err(|_| anyhow!("Failed to send response back to oneshot"))?,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn manager(pool: MySqlPool, mut rx: UnboundedReceiver<(Request, oneshot::Sender<Response>)>) {
|
||||||
|
let mut manager = Manager {
|
||||||
|
pool,
|
||||||
|
relays: HashMap::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
log::info!("Relays runtime manager initialized");
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let (request, tx) = match rx.recv().await {
|
||||||
|
Some(msg) => msg,
|
||||||
|
None => panic!("The relays runtime manager message queue lost all senders!"),
|
||||||
|
};
|
||||||
|
|
||||||
|
match manager.handle_request(request, tx).await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(e) => log::error!("Relays runtime manager: {e}"),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn start(pool: MySqlPool) -> UnboundedSender<(Request, oneshot::Sender<Response>)> {
|
||||||
|
let (tx, rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
tokio::task::spawn(manager(pool, rx));
|
||||||
|
|
||||||
|
tx
|
||||||
|
}
|
|
@ -1,9 +1,7 @@
|
||||||
use crate::backend::{
|
pub mod manager;
|
||||||
db_structures::{RelayIndexRow, RelaysRow},
|
pub mod user;
|
||||||
error::Error,
|
|
||||||
user::IntoUser,
|
use crate::backend::{error::Error, relay, user::IntoUser, Backend};
|
||||||
Backend,
|
|
||||||
};
|
|
||||||
use anyhow::{bail, Result};
|
use anyhow::{bail, Result};
|
||||||
use argon2::{
|
use argon2::{
|
||||||
password_hash::{rand_core::OsRng, PasswordHasher, SaltString},
|
password_hash::{rand_core::OsRng, PasswordHasher, SaltString},
|
||||||
|
@ -11,6 +9,7 @@ use argon2::{
|
||||||
};
|
};
|
||||||
use rand::distributions::DistString;
|
use rand::distributions::DistString;
|
||||||
use std::collections::{HashMap, VecDeque};
|
use std::collections::{HashMap, VecDeque};
|
||||||
|
use tokio::sync::oneshot;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
@ -32,13 +31,23 @@ pub struct Relay {
|
||||||
user_map: HashMap<Uuid, Uuid>,
|
user_map: HashMap<Uuid, Uuid>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Relay {
|
||||||
|
pub fn is_user_in_relay(&self, private_id: &Uuid) -> bool {
|
||||||
|
self.user_map.contains_key(private_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_user_index(&self, public_id: &Uuid) -> Result<&UserIndex, Error> {
|
||||||
|
match self.user_index.get(public_id) {
|
||||||
|
Some(index) => Ok(index),
|
||||||
|
None => Err(Error::UserNotFound),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Backend {
|
impl Backend {
|
||||||
/// Creates the structures for a new relay
|
/// Creates the structures for a new relay
|
||||||
pub async fn create_relay(
|
pub async fn create_relay(&self, user: impl IntoUser) -> Result<Result<(Uuid, String), Error>> {
|
||||||
&self,
|
let _user = match user.into_user(&self.pool).await? {
|
||||||
user: impl IntoUser,
|
|
||||||
) -> anyhow::Result<anyhow::Result<(Uuid, String), Error>> {
|
|
||||||
let _user = match user.into_user(&self).await? {
|
|
||||||
Ok(user) => user,
|
Ok(user) => user,
|
||||||
Err(e) => return Ok(Err(e)),
|
Err(e) => return Ok(Err(e)),
|
||||||
};
|
};
|
||||||
|
@ -64,8 +73,8 @@ impl Backend {
|
||||||
sqlx::query(&format!(
|
sqlx::query(&format!(
|
||||||
"CREATE TABLE RelayIndex_{} (
|
"CREATE TABLE RelayIndex_{} (
|
||||||
public_id UUID NOT NULL PRIMARY KEY,
|
public_id UUID NOT NULL PRIMARY KEY,
|
||||||
|
private_id UUID NOT NULL,
|
||||||
public_key BINARY(32) NOT NULL,
|
public_key BINARY(32) NOT NULL,
|
||||||
auth VARCHAR(128) NOT NULL,
|
|
||||||
name VARCHAR(32)
|
name VARCHAR(32)
|
||||||
);",
|
);",
|
||||||
relay_id.simple()
|
relay_id.simple()
|
||||||
|
@ -76,35 +85,31 @@ impl Backend {
|
||||||
Ok(Ok((relay_id, secret)))
|
Ok(Ok((relay_id, secret)))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_relay(&mut self, relay_id: Uuid) -> Result<Result<&Relay, Error>> {
|
pub async fn get_public_key_relay(
|
||||||
// can't use early return pattern, because of the borrow checker :/
|
&self,
|
||||||
if self.relays.get(&relay_id).is_none() {
|
requester: impl IntoUser,
|
||||||
match sqlx::query_as!(
|
relay: Uuid,
|
||||||
RelaysRow,
|
user: Uuid,
|
||||||
r#"SELECT * FROM Relays WHERE id = ? LIMIT 1;"#,
|
) -> Result<Result<[u8; 32], Error>> {
|
||||||
relay_id.as_bytes().as_slice()
|
let requester = match requester.into_user(&self.pool).await? {
|
||||||
)
|
Err(e) => return Ok(Err(e)),
|
||||||
.fetch_one(&self.pool)
|
Ok(user) => user,
|
||||||
.await
|
|
||||||
{
|
|
||||||
Err(sqlx::Error::RowNotFound) => return Ok(Err(Error::RelayNotFound)),
|
|
||||||
Err(e) => return Err(e.into()),
|
|
||||||
Ok(_) => {
|
|
||||||
// the hashmaps will be filled, as soon, as clients subscribe to the relay
|
|
||||||
let relay = Relay {
|
|
||||||
user_index: HashMap::new(),
|
|
||||||
user_map: HashMap::new(),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
self.relays.insert(relay_id, relay);
|
let (tx, rx) = oneshot::channel::<manager::Response>();
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(relay) = self.relays.get(&relay_id) {
|
self.relays_manager.send((
|
||||||
Ok(Ok(relay))
|
manager::Request::FetchUserPublicKey {
|
||||||
} else {
|
requester: requester.uuid,
|
||||||
bail!("!!! This should never happen !!! relay not found in runtime manager, even tho it was there before.")
|
relay,
|
||||||
|
user,
|
||||||
|
},
|
||||||
|
tx,
|
||||||
|
))?;
|
||||||
|
|
||||||
|
match rx.await? {
|
||||||
|
manager::Response::FetchUserPublicKey(response) => response,
|
||||||
|
resp => bail!("Relay runtime wrong response error: expected Response::FetchUserPublicKey, got: {resp}")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -0,0 +1,40 @@
|
||||||
|
use crate::backend::{
|
||||||
|
error::Error,
|
||||||
|
relay::manager::{self, Manager},
|
||||||
|
user::IntoUser,
|
||||||
|
Backend,
|
||||||
|
};
|
||||||
|
use anyhow::Result;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
impl Manager {
|
||||||
|
pub async fn get_public_key(
|
||||||
|
&mut self,
|
||||||
|
user: impl IntoUser,
|
||||||
|
relay_id: Uuid,
|
||||||
|
user_id: Uuid,
|
||||||
|
) -> Result<Result<[u8; 32], Error>> {
|
||||||
|
let requester = match user.into_user(&self.pool).await? {
|
||||||
|
Ok(user) => user,
|
||||||
|
Err(e) => return Ok(Err(e)),
|
||||||
|
};
|
||||||
|
|
||||||
|
let requested_relay = match self.get_relay(relay_id).await? {
|
||||||
|
Err(e) => return Ok(Err(e)),
|
||||||
|
Ok(relay) => relay,
|
||||||
|
};
|
||||||
|
|
||||||
|
if !requested_relay.is_user_in_relay(&requester.uuid) {
|
||||||
|
return Ok(Err(Error::PermissionDenied(
|
||||||
|
"You're not part of the relay.",
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
let requested_user = match requested_relay.get_user_index(&user_id) {
|
||||||
|
Err(e) => return Ok(Err(e)),
|
||||||
|
Ok(user) => user,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Ok(requested_user.public_key))
|
||||||
|
}
|
||||||
|
}
|
|
@ -8,13 +8,13 @@ use crate::backend::{
|
||||||
use argon2::password_hash::rand_core::OsRng;
|
use argon2::password_hash::rand_core::OsRng;
|
||||||
use chrono::{Days, Utc};
|
use chrono::{Days, Utc};
|
||||||
use rand::distributions::DistString;
|
use rand::distributions::DistString;
|
||||||
|
use sqlx::MySqlPool;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
impl Backend {
|
|
||||||
/// Returns the UUID of the user who owns the auth token.
|
/// Returns the UUID of the user who owns the auth token.
|
||||||
pub async fn resolve_auth_token(
|
pub async fn resolve_auth_token(
|
||||||
&self,
|
pool: &MySqlPool,
|
||||||
token: &str,
|
token: &str,
|
||||||
) -> anyhow::Result<anyhow::Result<Uuid, Error>> {
|
) -> anyhow::Result<anyhow::Result<Uuid, Error>> {
|
||||||
match sqlx::query_as!(
|
match sqlx::query_as!(
|
||||||
|
@ -22,7 +22,7 @@ impl Backend {
|
||||||
r#"SELECT * FROM AuthTokens WHERE token = ? LIMIT 1;"#,
|
r#"SELECT * FROM AuthTokens WHERE token = ? LIMIT 1;"#,
|
||||||
token
|
token
|
||||||
)
|
)
|
||||||
.fetch_one(&self.pool)
|
.fetch_one(pool)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Err(e) => match e {
|
Err(e) => match e {
|
||||||
|
@ -34,7 +34,7 @@ impl Backend {
|
||||||
Ok(Ok(Uuid::from_str(&row.userid)?))
|
Ok(Ok(Uuid::from_str(&row.userid)?))
|
||||||
} else {
|
} else {
|
||||||
sqlx::query!(r#"DELETE FROM AuthTokens WHERE token = ?;"#, token)
|
sqlx::query!(r#"DELETE FROM AuthTokens WHERE token = ?;"#, token)
|
||||||
.execute(&self.pool)
|
.execute(pool)
|
||||||
.await?;
|
.await?;
|
||||||
Ok(Err(Error::TokenExpired))
|
Ok(Err(Error::TokenExpired))
|
||||||
}
|
}
|
||||||
|
@ -42,6 +42,7 @@ impl Backend {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Backend {
|
||||||
/// Check whether an invite-token is valid or not.
|
/// Check whether an invite-token is valid or not.
|
||||||
pub async fn check_invite_token(
|
pub async fn check_invite_token(
|
||||||
&self,
|
&self,
|
||||||
|
@ -77,7 +78,7 @@ impl Backend {
|
||||||
&self,
|
&self,
|
||||||
user: impl IntoUser,
|
user: impl IntoUser,
|
||||||
) -> anyhow::Result<anyhow::Result<String, Error>> {
|
) -> anyhow::Result<anyhow::Result<String, Error>> {
|
||||||
let user = match user.into_user(&self).await? {
|
let user = match user.into_user(&self.pool).await? {
|
||||||
Ok(user) => user,
|
Ok(user) => user,
|
||||||
Err(e) => return Ok(Err(e)),
|
Err(e) => return Ok(Err(e)),
|
||||||
};
|
};
|
||||||
|
|
|
@ -1,4 +1,7 @@
|
||||||
use crate::backend::{db_structures::UsersRow, error::Error, permissions::Permission, Backend};
|
use crate::backend::{
|
||||||
|
db_structures::UsersRow, error::Error, permissions::Permission, tokens::resolve_auth_token,
|
||||||
|
Backend,
|
||||||
|
};
|
||||||
use anyhow::{bail, Result};
|
use anyhow::{bail, Result};
|
||||||
use argon2::{
|
use argon2::{
|
||||||
password_hash::{rand_core::OsRng, SaltString},
|
password_hash::{rand_core::OsRng, SaltString},
|
||||||
|
@ -6,12 +9,12 @@ use argon2::{
|
||||||
};
|
};
|
||||||
use chrono::Days;
|
use chrono::Days;
|
||||||
use rand::distributions::DistString;
|
use rand::distributions::DistString;
|
||||||
use sqlx::types::chrono::Utc;
|
use sqlx::{types::chrono::Utc, MySqlPool};
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
pub struct User {
|
pub struct User {
|
||||||
uuid: Uuid,
|
pub uuid: Uuid,
|
||||||
password: String,
|
password: String,
|
||||||
permissions: u16,
|
permissions: u16,
|
||||||
}
|
}
|
||||||
|
@ -60,40 +63,39 @@ impl User {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait IntoUser {
|
pub trait IntoUser {
|
||||||
async fn into_user(self, backend: &Backend) -> Result<Result<User, Error>>;
|
async fn into_user(self, pool: &MySqlPool) -> Result<Result<User, Error>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IntoUser for Uuid {
|
impl IntoUser for Uuid {
|
||||||
async fn into_user(self, backend: &Backend) -> Result<Result<User, Error>> {
|
async fn into_user(self, pool: &MySqlPool) -> Result<Result<User, Error>> {
|
||||||
backend.get_user(self).await
|
get_user(pool, self).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IntoUser for &str {
|
impl IntoUser for &str {
|
||||||
async fn into_user(self, backend: &Backend) -> Result<Result<User, Error>> {
|
async fn into_user(self, pool: &MySqlPool) -> Result<Result<User, Error>> {
|
||||||
let userid = match backend.resolve_auth_token(self).await? {
|
let userid = match resolve_auth_token(pool, self).await? {
|
||||||
Ok(userid) => userid,
|
Ok(userid) => userid,
|
||||||
Err(e) => return Ok(Err(e)),
|
Err(e) => return Ok(Err(e)),
|
||||||
};
|
};
|
||||||
userid.into_user(backend).await
|
userid.into_user(pool).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IntoUser for User {
|
impl IntoUser for User {
|
||||||
async fn into_user(self, _backend: &Backend) -> Result<Result<User, Error>> {
|
async fn into_user(self, _pool: &MySqlPool) -> Result<Result<User, Error>> {
|
||||||
Ok(Ok(self))
|
Ok(Ok(self))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Backend {
|
|
||||||
/// Returns detailed information about the user identified by the UUID.
|
/// Returns detailed information about the user identified by the UUID.
|
||||||
async fn get_user(&self, userid: Uuid) -> Result<Result<User, Error>> {
|
async fn get_user(pool: &MySqlPool, userid: Uuid) -> Result<Result<User, Error>> {
|
||||||
match sqlx::query_as!(
|
match sqlx::query_as!(
|
||||||
UsersRow,
|
UsersRow,
|
||||||
r#"SELECT * FROM Users WHERE userid = ? LIMIT 1;"#,
|
r#"SELECT * FROM Users WHERE userid = ? LIMIT 1;"#,
|
||||||
userid.as_bytes().as_slice()
|
userid.as_bytes().as_slice()
|
||||||
)
|
)
|
||||||
.fetch_one(&self.pool)
|
.fetch_one(pool)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Err(e) => match e {
|
Err(e) => match e {
|
||||||
|
@ -104,6 +106,7 @@ impl Backend {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Backend {
|
||||||
/// Creates a new account and returns its UUID.
|
/// Creates a new account and returns its UUID.
|
||||||
pub async fn account_register(
|
pub async fn account_register(
|
||||||
&self,
|
&self,
|
||||||
|
@ -140,7 +143,7 @@ impl Backend {
|
||||||
userid: Uuid,
|
userid: Uuid,
|
||||||
password: String,
|
password: String,
|
||||||
) -> Result<Result<String, Error>> {
|
) -> Result<Result<String, Error>> {
|
||||||
let user = match userid.into_user(&self).await? {
|
let user = match userid.into_user(&self.pool).await? {
|
||||||
Ok(user) => user,
|
Ok(user) => user,
|
||||||
Err(e) => return Ok(Err(e)),
|
Err(e) => return Ok(Err(e)),
|
||||||
};
|
};
|
||||||
|
@ -152,7 +155,7 @@ impl Backend {
|
||||||
let mut token = rand::distributions::Alphanumeric.sample_string(&mut OsRng, 48);
|
let mut token = rand::distributions::Alphanumeric.sample_string(&mut OsRng, 48);
|
||||||
// just for the case, that there's some duplication
|
// just for the case, that there's some duplication
|
||||||
loop {
|
loop {
|
||||||
match self.resolve_auth_token(&token).await? {
|
match resolve_auth_token(&self.pool, &token).await? {
|
||||||
Ok(_) => token = rand::distributions::Alphanumeric.sample_string(&mut OsRng, 48),
|
Ok(_) => token = rand::distributions::Alphanumeric.sample_string(&mut OsRng, 48),
|
||||||
Err(Error::InvalidToken) | Err(Error::TokenExpired) => break,
|
Err(Error::InvalidToken) | Err(Error::TokenExpired) => break,
|
||||||
Err(e) => bail!("!THIS ERROR SHOULDN'T BE HERE! -> {e}"),
|
Err(e) => bail!("!THIS ERROR SHOULDN'T BE HERE! -> {e}"),
|
||||||
|
|
|
@ -4,16 +4,18 @@ mod config;
|
||||||
|
|
||||||
use crate::backend::Backend;
|
use crate::backend::Backend;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
|
use base64_serde::base64_serde_type;
|
||||||
use compile_time_run::run_command_str;
|
use compile_time_run::run_command_str;
|
||||||
use config::Config;
|
use config::Config;
|
||||||
use log::info;
|
|
||||||
|
base64_serde_type!(pub Base64Encoding, base64::engine::general_purpose::STANDARD);
|
||||||
|
|
||||||
#[actix_web::main]
|
#[actix_web::main]
|
||||||
async fn main() -> Result<()> {
|
async fn main() -> Result<()> {
|
||||||
dotenvy::dotenv()?;
|
dotenvy::dotenv()?;
|
||||||
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
|
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
|
||||||
|
|
||||||
info!(
|
log::info!(
|
||||||
"Starting Cipher Relay (v{} - git: {}/{})",
|
"Starting Cipher Relay (v{} - git: {}/{})",
|
||||||
env!("CARGO_PKG_VERSION"),
|
env!("CARGO_PKG_VERSION"),
|
||||||
run_command_str!("git", "branch", "--show-current"),
|
run_command_str!("git", "branch", "--show-current"),
|
||||||
|
|
Loading…
Reference in New Issue