From f6a1b5200a96ad808698eb52593915a78683e3e1 Mon Sep 17 00:00:00 2001 From: antifallobst Date: Tue, 21 May 2024 04:20:40 +0200 Subject: [PATCH] refactor(cbs): added responses and dumped the dummy cbs code --- Cargo.lock | 20 +++++++++++- Cargo.toml | 1 + src/cbs/connection.rs | 76 +++++++++++++++++++++++++++++-------------- src/cbs/dummy.rs | 70 ++------------------------------------- src/cbs/manager.rs | 22 +++++++++---- 5 files changed, 90 insertions(+), 99 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0b5de4b..b14658d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1441,16 +1441,33 @@ dependencies = [ "tokio", ] +[[package]] +name = "triba" +version = "0.1.0" +dependencies = [ + "aes-gcm-siv", + "interprocess", + "rand", + "rmp-serde", + "serde", + "thiserror", + "tokio", + "tokio-util", + "triba-packet", + "uuid", + "x25519-dalek", +] + [[package]] name = "triba-packet" version = "0.1.0" dependencies = [ "aes-gcm-siv", - "anyhow", "interprocess", "rmp-serde", "serde", "strum", + "thiserror", "tokio", "uuid", "x25519-dalek", @@ -1479,6 +1496,7 @@ dependencies = [ "strum", "tokio", "tokio-util", + "triba", "triba-packet", "trinitry", "trixy", diff --git a/Cargo.toml b/Cargo.toml index c958b9b..4ca579b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,6 +70,7 @@ crossterm = { version = "0.25" } # Trinitrx Backend API specific interprocess = { version = "2.1.0", features = ["tokio"] } triba-packet = { path = "../triba-packet" } +triba = { path = "../triba" } [dev-dependencies] pretty_assertions = "1.4.0" diff --git a/src/cbs/connection.rs b/src/cbs/connection.rs index 77bfd14..a89c277 100644 --- a/src/cbs/connection.rs +++ b/src/cbs/connection.rs @@ -6,22 +6,29 @@ use tokio::{ sync::mpsc, }; use tokio_util::sync::CancellationToken; -use triba_packet::{self as packet, IdPool, Packet}; +use triba_packet::{IdPool, Packet, Request, Response}; use uuid::Uuid; pub struct Connection { - tx: mpsc::UnboundedSender, + req_tx: mpsc::UnboundedSender, + resp_tx: mpsc::UnboundedSender<(Response, u64)>, } impl Connection { - pub async fn send_packet(&self, body: packet::Body) -> Result<()> { - self.tx.send(body)?; + pub async fn send_request(&self, body: Request) -> Result<()> { + self.req_tx.send(body)?; + Ok(()) + } + + pub async fn send_response(&self, body: Response, receiver: u64) -> Result<()> { + self.resp_tx.send((body, receiver))?; Ok(()) } } enum Event { - ToCBS(Packet), + ToCBSReq(Request), + ToCBSResp(Response, u64), FromCBS(Packet), } @@ -46,9 +53,13 @@ impl UnstableConnection { let mut id_pool = IdPool::new(); - let packet = Packet::recv(&mut sock_rx, &cipher, &nonce).await.unwrap(); - match packet.body() { - packet::Body::HandshakeUpgradeConnection => { + let packet = Packet::recv(&mut sock_rx, &cipher, &nonce).await?; + match packet { + Packet::Request { id, body } => { + Packet::response(id_pool.acquire(), id, Response::Success) + .send(&mut sock_tx, &cipher, &nonce) + .await?; + cli_log::info!( "CBS {id}: upgraded connection to encrypted messagepack", id = self.id @@ -56,16 +67,11 @@ impl UnstableConnection { } body => { return Err(anyhow!( - "expected cbs to send HandshakeUpgradeConnection but got {body}" + "expected cbs to send: Request::HandshakeUpgradeConnection, but got: {body}" )) } } - Packet::new(&mut id_pool, packet::Body::HandshakeSuccess) - .send(&mut sock_tx, &cipher, &nonce) - .await - .unwrap(); - // Poll packets from socket { let cipher = cipher.clone(); @@ -80,15 +86,26 @@ impl UnstableConnection { }); } - let (core_tx, mut core_rx) = mpsc::unbounded_channel(); + let (core_req_tx, mut core_req_rx) = mpsc::unbounded_channel(); - // Poll packets from core + // Poll requests from core + { + let tx = tx.clone(); + tokio::spawn(async move { + loop { + let body = core_req_rx.recv().await.unwrap(); + tx.send(Event::ToCBSReq(body)).unwrap(); + } + }); + } + + let (core_resp_tx, mut core_resp_rx) = mpsc::unbounded_channel(); + + // Poll responses from core tokio::spawn(async move { loop { - let body = core_rx.recv().await.unwrap(); - - tx.send(Event::ToCBS(Packet::new(&mut id_pool, body))) - .unwrap(); + let (body, req) = core_resp_rx.recv().await.unwrap(); + tx.send(Event::ToCBSResp(body, req)).unwrap(); } }); @@ -106,20 +123,29 @@ impl UnstableConnection { }; match event { - Event::ToCBS(packet) => { - sock_tx - .write(packet.pack(&cipher, &nonce).unwrap().as_slice()) + Event::ToCBSReq(req) => { + Packet::request(id_pool.acquire(), req) + .send(&mut sock_tx, &cipher, &nonce) + .await + .unwrap(); + } + Event::ToCBSResp(resp, req) => { + Packet::response(id_pool.acquire(), req, resp) + .send(&mut sock_tx, &cipher, &nonce) .await .unwrap(); } Event::FromCBS(packet) => { - cli_log::info!("Core received CBS packet: {packet:?}") + cli_log::info!("Core received CBS packet: {packet:?}"); } } } }); } - Ok(Connection { tx: core_tx }) + Ok(Connection { + req_tx: core_req_tx, + resp_tx: core_resp_tx, + }) } } diff --git a/src/cbs/dummy.rs b/src/cbs/dummy.rs index 33d7b82..e5cb73d 100644 --- a/src/cbs/dummy.rs +++ b/src/cbs/dummy.rs @@ -1,72 +1,8 @@ -use aes_gcm_siv::{Aes256GcmSiv, KeyInit, Nonce}; -use interprocess::local_socket::{ - self, - tokio::{prelude::*, Stream}, -}; -use rand::thread_rng; -use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; -use triba_packet::{self as packet, IdPool, Packet}; +use interprocess::local_socket::Name; use uuid::Uuid; -use x25519_dalek::{EphemeralSecret, PublicKey}; -pub async fn cbs(sock_name: local_socket::Name<'_>, id: Uuid) { - let stream = Stream::connect(sock_name).await.unwrap(); - - let (rx, mut tx) = stream.split(); - let mut rx = BufReader::new(rx); - - tx.write(id.as_bytes()).await.unwrap(); - - let dh_core_public = { - let mut buffer = [0u8; 32]; - rx.read(&mut buffer).await.unwrap(); - PublicKey::from(buffer) - }; - - let dh_own_secret = EphemeralSecret::random_from_rng(thread_rng()); - let dh_own_public = PublicKey::from(&dh_own_secret); - - tx.write(dh_own_public.as_bytes()).await.unwrap(); - - let shared_secret = dh_own_secret.diffie_hellman(&dh_core_public); - - let nonce = { - let mut buffer = [0u8; 12]; - rx.read(&mut buffer).await.unwrap(); - Nonce::from(buffer) - }; - - let cipher = Aes256GcmSiv::new(shared_secret.as_bytes().into()); - - let mut id_pool = IdPool::new(); - - tx.write( - Packet::new(&mut id_pool, packet::Body::HandshakeUpgradeConnection) - .pack(&cipher, &nonce) - .unwrap() - .as_slice(), - ) - .await - .unwrap(); - - let packet = Packet::recv(&mut rx, &cipher, &nonce).await.unwrap(); - match packet.body() { - packet::Body::HandshakeSuccess => { - cli_log::info!("[DUMMY CBS] Connection to core stabilized"); - } - _ => { - cli_log::info!("[DUMMY CBS] Connection to core failed"); - return; - } - } - - Packet::new( - &mut id_pool, - packet::Body::Exit(packet::Error::Fatal("This is a test".to_string())), - ) - .send(&mut tx, &cipher, &nonce) - .await - .unwrap(); +pub async fn cbs(sock_name: Name<'_>, id: Uuid) { + let (session, rx) = triba::Session::new(id, sock_name).await.unwrap(); loop {} } diff --git a/src/cbs/manager.rs b/src/cbs/manager.rs index 1176034..81f1337 100644 --- a/src/cbs/manager.rs +++ b/src/cbs/manager.rs @@ -17,14 +17,15 @@ use tokio::{ sync::{mpsc, oneshot}, }; use tokio_util::sync::CancellationToken; -use triba_packet as packet; +use triba_packet::{Request, Response}; use uuid::Uuid; use x25519_dalek::{EphemeralSecret, PublicKey}; enum Event { IncomingConnection(Stream), SpawnCBS(oneshot::Sender>), - SendPacket(Uuid, packet::Body), + SendRequest(Uuid, Request), + SendResponse(Uuid, Response, u64), } async fn poll_socket( @@ -152,9 +153,13 @@ impl Manager { let sock_name = sock_name.clone(); tokio::spawn(super::dummy::cbs(sock_name.clone(), id)); } - Event::SendPacket(id, body) => { + Event::SendRequest(id, body) => { let conn = connections.get(&id).unwrap(); - conn.send_packet(body).await.unwrap(); + conn.send_request(body).await.unwrap(); + } + Event::SendResponse(id, body, req) => { + let conn = connections.get(&id).unwrap(); + conn.send_response(body, req).await.unwrap(); } } } @@ -170,8 +175,13 @@ impl Manager { rx.await? } - pub fn send_packet(&self, cbs: Uuid, body: packet::Body) -> Result<()> { - self.tx.send(Event::SendPacket(cbs, body))?; + pub fn send_request(&self, cbs: Uuid, body: Request) -> Result<()> { + self.tx.send(Event::SendRequest(cbs, body))?; + Ok(()) + } + + pub fn send_response(&self, cbs: Uuid, body: Response, receiver: u64) -> Result<()> { + self.tx.send(Event::SendResponse(cbs, body, receiver))?; Ok(()) } }