Implement CBS API handling on the trinitrix core side. #23

Merged
antifallobst merged 8 commits from cbs-core into main 2024-05-21 21:09:48 +00:00
5 changed files with 90 additions and 99 deletions
Showing only changes of commit f6a1b5200a - Show all commits

20
Cargo.lock generated
View File

@ -1441,16 +1441,33 @@ dependencies = [
"tokio",
]
[[package]]
name = "triba"
version = "0.1.0"
dependencies = [
"aes-gcm-siv",
"interprocess",
"rand",
"rmp-serde",
"serde",
"thiserror",
"tokio",
"tokio-util",
"triba-packet",
"uuid",
"x25519-dalek",
]
[[package]]
name = "triba-packet"
version = "0.1.0"
dependencies = [
"aes-gcm-siv",
"anyhow",
"interprocess",
"rmp-serde",
"serde",
"strum",
"thiserror",
"tokio",
"uuid",
"x25519-dalek",
@ -1479,6 +1496,7 @@ dependencies = [
"strum",
"tokio",
"tokio-util",
"triba",
"triba-packet",
"trinitry",
"trixy",

View File

@ -70,6 +70,7 @@ crossterm = { version = "0.25" }
# Trinitrx Backend API specific
interprocess = { version = "2.1.0", features = ["tokio"] }
triba-packet = { path = "../triba-packet" }
triba = { path = "../triba" }
[dev-dependencies]
pretty_assertions = "1.4.0"

View File

@ -6,22 +6,29 @@ use tokio::{
sync::mpsc,
};
use tokio_util::sync::CancellationToken;
use triba_packet::{self as packet, IdPool, Packet};
use triba_packet::{IdPool, Packet, Request, Response};
use uuid::Uuid;
pub struct Connection {
tx: mpsc::UnboundedSender<packet::Body>,
req_tx: mpsc::UnboundedSender<Request>,
resp_tx: mpsc::UnboundedSender<(Response, u64)>,
}
impl Connection {
pub async fn send_packet(&self, body: packet::Body) -> Result<()> {
self.tx.send(body)?;
pub async fn send_request(&self, body: Request) -> Result<()> {
self.req_tx.send(body)?;
Ok(())
}
pub async fn send_response(&self, body: Response, receiver: u64) -> Result<()> {
self.resp_tx.send((body, receiver))?;
Ok(())
}
}
enum Event {
ToCBS(Packet),
ToCBSReq(Request),
ToCBSResp(Response, u64),
FromCBS(Packet),
}
@ -46,9 +53,13 @@ impl UnstableConnection {
let mut id_pool = IdPool::new();
let packet = Packet::recv(&mut sock_rx, &cipher, &nonce).await.unwrap();
match packet.body() {
packet::Body::HandshakeUpgradeConnection => {
let packet = Packet::recv(&mut sock_rx, &cipher, &nonce).await?;
match packet {
Packet::Request { id, body } => {
Packet::response(id_pool.acquire(), id, Response::Success)
.send(&mut sock_tx, &cipher, &nonce)
.await?;
cli_log::info!(
"CBS {id}: upgraded connection to encrypted messagepack",
id = self.id
@ -56,16 +67,11 @@ impl UnstableConnection {
}
body => {
return Err(anyhow!(
"expected cbs to send HandshakeUpgradeConnection but got {body}"
"expected cbs to send: Request::HandshakeUpgradeConnection, but got: {body}"
))
}
}
Packet::new(&mut id_pool, packet::Body::HandshakeSuccess)
.send(&mut sock_tx, &cipher, &nonce)
.await
.unwrap();
// Poll packets from socket
{
let cipher = cipher.clone();
@ -80,15 +86,26 @@ impl UnstableConnection {
});
}
let (core_tx, mut core_rx) = mpsc::unbounded_channel();
let (core_req_tx, mut core_req_rx) = mpsc::unbounded_channel();
// Poll packets from core
// Poll requests from core
{
let tx = tx.clone();
tokio::spawn(async move {
loop {
let body = core_req_rx.recv().await.unwrap();
tx.send(Event::ToCBSReq(body)).unwrap();
}
});
}
let (core_resp_tx, mut core_resp_rx) = mpsc::unbounded_channel();
// Poll responses from core
tokio::spawn(async move {
loop {
let body = core_rx.recv().await.unwrap();
tx.send(Event::ToCBS(Packet::new(&mut id_pool, body)))
.unwrap();
let (body, req) = core_resp_rx.recv().await.unwrap();
tx.send(Event::ToCBSResp(body, req)).unwrap();
}
});
@ -106,20 +123,29 @@ impl UnstableConnection {
};
match event {
Event::ToCBS(packet) => {
sock_tx
.write(packet.pack(&cipher, &nonce).unwrap().as_slice())
Event::ToCBSReq(req) => {
Packet::request(id_pool.acquire(), req)
.send(&mut sock_tx, &cipher, &nonce)
.await
.unwrap();
}
Event::ToCBSResp(resp, req) => {
Packet::response(id_pool.acquire(), req, resp)
.send(&mut sock_tx, &cipher, &nonce)
.await
.unwrap();
}
Event::FromCBS(packet) => {
cli_log::info!("Core received CBS packet: {packet:?}")
cli_log::info!("Core received CBS packet: {packet:?}");
}
}
}
});
}
Ok(Connection { tx: core_tx })
Ok(Connection {
req_tx: core_req_tx,
resp_tx: core_resp_tx,
})
}
}

View File

@ -1,72 +1,8 @@
use aes_gcm_siv::{Aes256GcmSiv, KeyInit, Nonce};
use interprocess::local_socket::{
self,
tokio::{prelude::*, Stream},
};
use rand::thread_rng;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use triba_packet::{self as packet, IdPool, Packet};
use interprocess::local_socket::Name;
use uuid::Uuid;
use x25519_dalek::{EphemeralSecret, PublicKey};
pub async fn cbs(sock_name: local_socket::Name<'_>, id: Uuid) {
let stream = Stream::connect(sock_name).await.unwrap();
let (rx, mut tx) = stream.split();
let mut rx = BufReader::new(rx);
tx.write(id.as_bytes()).await.unwrap();
let dh_core_public = {
let mut buffer = [0u8; 32];
rx.read(&mut buffer).await.unwrap();
PublicKey::from(buffer)
};
let dh_own_secret = EphemeralSecret::random_from_rng(thread_rng());
let dh_own_public = PublicKey::from(&dh_own_secret);
tx.write(dh_own_public.as_bytes()).await.unwrap();
let shared_secret = dh_own_secret.diffie_hellman(&dh_core_public);
let nonce = {
let mut buffer = [0u8; 12];
rx.read(&mut buffer).await.unwrap();
Nonce::from(buffer)
};
let cipher = Aes256GcmSiv::new(shared_secret.as_bytes().into());
let mut id_pool = IdPool::new();
tx.write(
Packet::new(&mut id_pool, packet::Body::HandshakeUpgradeConnection)
.pack(&cipher, &nonce)
.unwrap()
.as_slice(),
)
.await
.unwrap();
let packet = Packet::recv(&mut rx, &cipher, &nonce).await.unwrap();
match packet.body() {
packet::Body::HandshakeSuccess => {
cli_log::info!("[DUMMY CBS] Connection to core stabilized");
}
_ => {
cli_log::info!("[DUMMY CBS] Connection to core failed");
return;
}
}
Packet::new(
&mut id_pool,
packet::Body::Exit(packet::Error::Fatal("This is a test".to_string())),
)
.send(&mut tx, &cipher, &nonce)
.await
.unwrap();
pub async fn cbs(sock_name: Name<'_>, id: Uuid) {
let (session, rx) = triba::Session::new(id, sock_name).await.unwrap();
loop {}
}

View File

@ -17,14 +17,15 @@ use tokio::{
sync::{mpsc, oneshot},
};
use tokio_util::sync::CancellationToken;
use triba_packet as packet;
use triba_packet::{Request, Response};
use uuid::Uuid;
use x25519_dalek::{EphemeralSecret, PublicKey};
enum Event {
IncomingConnection(Stream),
SpawnCBS(oneshot::Sender<Result<Uuid>>),
SendPacket(Uuid, packet::Body),
SendRequest(Uuid, Request),
SendResponse(Uuid, Response, u64),
}
async fn poll_socket(
@ -152,9 +153,13 @@ impl Manager {
let sock_name = sock_name.clone();
tokio::spawn(super::dummy::cbs(sock_name.clone(), id));
}
Event::SendPacket(id, body) => {
Event::SendRequest(id, body) => {
let conn = connections.get(&id).unwrap();
conn.send_packet(body).await.unwrap();
conn.send_request(body).await.unwrap();
}
Event::SendResponse(id, body, req) => {
let conn = connections.get(&id).unwrap();
conn.send_response(body, req).await.unwrap();
}
}
}
@ -170,8 +175,13 @@ impl Manager {
rx.await?
}
pub fn send_packet(&self, cbs: Uuid, body: packet::Body) -> Result<()> {
self.tx.send(Event::SendPacket(cbs, body))?;
pub fn send_request(&self, cbs: Uuid, body: Request) -> Result<()> {
self.tx.send(Event::SendRequest(cbs, body))?;
Ok(())
}
pub fn send_response(&self, cbs: Uuid, body: Response, receiver: u64) -> Result<()> {
self.tx.send(Event::SendResponse(cbs, body, receiver))?;
Ok(())
}
}