feat(cbs): hooked up the cbs handling system to the main message queue

This commit is contained in:
antifallobst 2024-05-21 15:03:41 +02:00
parent 916ea87537
commit 762ea6067d
Signed by: antifallobst
GPG Key ID: 2B4F402172791BAF
4 changed files with 51 additions and 13 deletions

View File

@ -31,10 +31,12 @@ use crate::{
use cli_log::{trace, warn}; use cli_log::{trace, warn};
use crossterm::event::Event as CrosstermEvent; use crossterm::event::Event as CrosstermEvent;
use handlers::{command, input}; use handlers::{command, input};
use uuid::Uuid;
#[derive(Debug)] #[derive(Debug)]
pub enum Event { pub enum Event {
InputEvent(CrosstermEvent), InputEvent(CrosstermEvent),
CBSPacket(Uuid, triba_packet::Packet),
// FIXME(@soispha): The `String` here is just wrong <2024-05-03> // FIXME(@soispha): The `String` here is just wrong <2024-05-03>
CommandEvent(Commands, Option<trixy::oneshot::Sender<String>>), CommandEvent(Commands, Option<trixy::oneshot::Sender<String>>),
@ -49,6 +51,11 @@ impl Event {
.await .await
.with_context(|| format!("Failed to handle command event: `{:#?}`", event)), .with_context(|| format!("Failed to handle command event: `{:#?}`", event)),
Event::CBSPacket(cbs, packet) => {
cli_log::info!("Received packet from cbs {cbs}: {packet:?}");
Ok(EventStatus::Ok)
}
Event::LuaCommand(lua_code) => { Event::LuaCommand(lua_code) => {
warn!( warn!(
"Got lua code to execute, but no exectuter is available:\n{}", "Got lua code to execute, but no exectuter is available:\n{}",

View File

@ -73,8 +73,8 @@ impl<U: TrinitrixUi> App<U> {
.set(tx.clone()) .set(tx.clone())
.expect("The cell should always be empty at this point"); .expect("The cell should always be empty at this point");
let cbs_manager = cbs::Manager::new().await; let cbs_manager = cbs::Manager::new(tx.clone()).await;
cbs_manager.spawn_cbs().await?; cbs_manager.spawn_cbs().await?; // TODO: remove, this is just a dummy - antifallobst <2024-05-21>
Ok(Self { Ok(Self {
ui, ui,

View File

@ -1,10 +1,9 @@
use crate::app::events::Event as AppEvent;
use aes_gcm_siv::{Aes256GcmSiv, Nonce}; use aes_gcm_siv::{Aes256GcmSiv, Nonce};
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use interprocess::local_socket::tokio::{RecvHalf, SendHalf}; use interprocess::local_socket::tokio::{RecvHalf, SendHalf};
use tokio::{ use tokio::sync::mpsc;
io::{AsyncReadExt, AsyncWriteExt},
sync::mpsc,
};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use triba_packet::{IdPool, Packet, Request, Response}; use triba_packet::{IdPool, Packet, Request, Response};
use uuid::Uuid; use uuid::Uuid;
@ -35,11 +34,16 @@ enum Event {
pub struct UnstableConnection { pub struct UnstableConnection {
kill_token: CancellationToken, kill_token: CancellationToken,
id: Uuid, id: Uuid,
main_tx: mpsc::Sender<AppEvent>,
} }
impl UnstableConnection { impl UnstableConnection {
pub fn new(kill_token: CancellationToken, id: Uuid) -> Self { pub fn new(kill_token: CancellationToken, id: Uuid, main_tx: mpsc::Sender<AppEvent>) -> Self {
Self { kill_token, id } Self {
kill_token,
id,
main_tx,
}
} }
pub async fn stabilize( pub async fn stabilize(
@ -83,6 +87,22 @@ impl UnstableConnection {
.send(&mut sock_tx, &cipher, &nonce) .send(&mut sock_tx, &cipher, &nonce)
.await?; .await?;
match Packet::recv(&mut sock_rx, &cipher, &nonce).await? {
Packet::Response { body, .. } => match body {
Response::Success => {}
req => {
return Err(anyhow!(
"expected cbs to send: Response::Success, but got: Request::{req}"
))
}
},
body => {
return Err(anyhow!(
"expected cbs to send: Request::Success, but got: {body}"
))
}
}
// Poll packets from socket // Poll packets from socket
{ {
let cipher = cipher.clone(); let cipher = cipher.clone();
@ -125,6 +145,8 @@ impl UnstableConnection {
let cipher = cipher.clone(); let cipher = cipher.clone();
let nonce = nonce.clone(); let nonce = nonce.clone();
let kill_token = self.kill_token.clone(); let kill_token = self.kill_token.clone();
let main_tx = self.main_tx.clone();
let id = self.id.clone();
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
@ -146,9 +168,10 @@ impl UnstableConnection {
.await .await
.unwrap(); .unwrap();
} }
Event::FromCBS(packet) => { Event::FromCBS(packet) => main_tx
cli_log::info!("Core received CBS packet: {packet:?}"); .send(AppEvent::CBSPacket(id.clone(), packet))
} .await
.unwrap(),
} }
} }
}); });

View File

@ -1,4 +1,5 @@
use super::{Connection, UnstableConnection}; use super::{Connection, UnstableConnection};
use crate::app::events::Event as AppEvent;
use aes_gcm_siv::{Aes256GcmSiv, KeyInit, Nonce}; use aes_gcm_siv::{Aes256GcmSiv, KeyInit, Nonce};
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
@ -93,7 +94,7 @@ pub struct Manager {
} }
impl Manager { impl Manager {
pub async fn new() -> Self { pub async fn new(main_tx: mpsc::Sender<AppEvent>) -> Self {
let (tx, mut rx) = mpsc::unbounded_channel(); let (tx, mut rx) = mpsc::unbounded_channel();
let sock_name = { let sock_name = {
@ -144,7 +145,14 @@ impl Manager {
let id = Uuid::new_v4(); let id = Uuid::new_v4();
unstable_connections.insert( unstable_connections.insert(
id, id,
(UnstableConnection::new(CancellationToken::new(), id), tx), (
UnstableConnection::new(
CancellationToken::new(),
id,
main_tx.clone(),
),
tx,
),
); );
cli_log::info!("Spawned CBS with ID: {id}"); cli_log::info!("Spawned CBS with ID: {id}");