From 762ea6067db3d63d480051ae563ca7b811699285 Mon Sep 17 00:00:00 2001 From: antifallobst Date: Tue, 21 May 2024 15:03:41 +0200 Subject: [PATCH] feat(cbs): hooked up the cbs handling system to the main message queue --- src/app/events/mod.rs | 7 +++++++ src/app/mod.rs | 4 ++-- src/cbs/connection.rs | 41 ++++++++++++++++++++++++++++++++--------- src/cbs/manager.rs | 12 ++++++++++-- 4 files changed, 51 insertions(+), 13 deletions(-) diff --git a/src/app/events/mod.rs b/src/app/events/mod.rs index 75e181e..7e3ab6a 100644 --- a/src/app/events/mod.rs +++ b/src/app/events/mod.rs @@ -31,10 +31,12 @@ use crate::{ use cli_log::{trace, warn}; use crossterm::event::Event as CrosstermEvent; use handlers::{command, input}; +use uuid::Uuid; #[derive(Debug)] pub enum Event { InputEvent(CrosstermEvent), + CBSPacket(Uuid, triba_packet::Packet), // FIXME(@soispha): The `String` here is just wrong <2024-05-03> CommandEvent(Commands, Option>), @@ -49,6 +51,11 @@ impl Event { .await .with_context(|| format!("Failed to handle command event: `{:#?}`", event)), + Event::CBSPacket(cbs, packet) => { + cli_log::info!("Received packet from cbs {cbs}: {packet:?}"); + Ok(EventStatus::Ok) + } + Event::LuaCommand(lua_code) => { warn!( "Got lua code to execute, but no exectuter is available:\n{}", diff --git a/src/app/mod.rs b/src/app/mod.rs index 33b2c7d..365b448 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -73,8 +73,8 @@ impl App { .set(tx.clone()) .expect("The cell should always be empty at this point"); - let cbs_manager = cbs::Manager::new().await; - cbs_manager.spawn_cbs().await?; + let cbs_manager = cbs::Manager::new(tx.clone()).await; + cbs_manager.spawn_cbs().await?; // TODO: remove, this is just a dummy - antifallobst <2024-05-21> Ok(Self { ui, diff --git a/src/cbs/connection.rs b/src/cbs/connection.rs index 4397a48..7a56b96 100644 --- a/src/cbs/connection.rs +++ b/src/cbs/connection.rs @@ -1,10 +1,9 @@ +use crate::app::events::Event as AppEvent; + use aes_gcm_siv::{Aes256GcmSiv, Nonce}; use anyhow::{anyhow, Result}; use interprocess::local_socket::tokio::{RecvHalf, SendHalf}; -use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - sync::mpsc, -}; +use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use triba_packet::{IdPool, Packet, Request, Response}; use uuid::Uuid; @@ -35,11 +34,16 @@ enum Event { pub struct UnstableConnection { kill_token: CancellationToken, id: Uuid, + main_tx: mpsc::Sender, } impl UnstableConnection { - pub fn new(kill_token: CancellationToken, id: Uuid) -> Self { - Self { kill_token, id } + pub fn new(kill_token: CancellationToken, id: Uuid, main_tx: mpsc::Sender) -> Self { + Self { + kill_token, + id, + main_tx, + } } pub async fn stabilize( @@ -83,6 +87,22 @@ impl UnstableConnection { .send(&mut sock_tx, &cipher, &nonce) .await?; + match Packet::recv(&mut sock_rx, &cipher, &nonce).await? { + Packet::Response { body, .. } => match body { + Response::Success => {} + req => { + return Err(anyhow!( + "expected cbs to send: Response::Success, but got: Request::{req}" + )) + } + }, + body => { + return Err(anyhow!( + "expected cbs to send: Request::Success, but got: {body}" + )) + } + } + // Poll packets from socket { let cipher = cipher.clone(); @@ -125,6 +145,8 @@ impl UnstableConnection { let cipher = cipher.clone(); let nonce = nonce.clone(); let kill_token = self.kill_token.clone(); + let main_tx = self.main_tx.clone(); + let id = self.id.clone(); tokio::spawn(async move { loop { @@ -146,9 +168,10 @@ impl UnstableConnection { .await .unwrap(); } - Event::FromCBS(packet) => { - cli_log::info!("Core received CBS packet: {packet:?}"); - } + Event::FromCBS(packet) => main_tx + .send(AppEvent::CBSPacket(id.clone(), packet)) + .await + .unwrap(), } } }); diff --git a/src/cbs/manager.rs b/src/cbs/manager.rs index 81f1337..5bda03f 100644 --- a/src/cbs/manager.rs +++ b/src/cbs/manager.rs @@ -1,4 +1,5 @@ use super::{Connection, UnstableConnection}; +use crate::app::events::Event as AppEvent; use aes_gcm_siv::{Aes256GcmSiv, KeyInit, Nonce}; use anyhow::{anyhow, Result}; @@ -93,7 +94,7 @@ pub struct Manager { } impl Manager { - pub async fn new() -> Self { + pub async fn new(main_tx: mpsc::Sender) -> Self { let (tx, mut rx) = mpsc::unbounded_channel(); let sock_name = { @@ -144,7 +145,14 @@ impl Manager { let id = Uuid::new_v4(); unstable_connections.insert( id, - (UnstableConnection::new(CancellationToken::new(), id), tx), + ( + UnstableConnection::new( + CancellationToken::new(), + id, + main_tx.clone(), + ), + tx, + ), ); cli_log::info!("Spawned CBS with ID: {id}");