From 64d5bdd9c5d73f23e121149b186e86548b17e124 Mon Sep 17 00:00:00 2001 From: antifallobst Date: Tue, 14 May 2024 14:01:54 +0200 Subject: [PATCH 1/8] feat: implemented the skeleton of the core CBS handler --- Cargo.lock | 307 ++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 21 ++- src/app/mod.rs | 10 +- src/cbs/connection.rs | 126 +++++++++++++++++ src/cbs/dummy.rs | 74 ++++++++++ src/cbs/manager.rs | 176 ++++++++++++++++++++++++ src/cbs/mod.rs | 10 ++ src/cbs/packet.rs | 117 ++++++++++++++++ src/main.rs | 5 +- 9 files changed, 839 insertions(+), 7 deletions(-) create mode 100644 src/cbs/connection.rs create mode 100644 src/cbs/dummy.rs create mode 100644 src/cbs/manager.rs create mode 100644 src/cbs/mod.rs create mode 100644 src/cbs/packet.rs diff --git a/Cargo.lock b/Cargo.lock index bd6f5a7..a48f766 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,42 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "aead" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0" +dependencies = [ + "crypto-common", + "generic-array", +] + +[[package]] +name = "aes" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + +[[package]] +name = "aes-gcm-siv" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae0784134ba9375416d469ec31e7c5f9fa94405049cf08c5ce5b4698be673e0d" +dependencies = [ + "aead", + "aes", + "cipher", + "ctr", + "polyval", + "subtle", + "zeroize", +] + [[package]] name = "ahash" version = "0.8.11" @@ -172,6 +208,12 @@ version = "3.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.6.0" @@ -219,6 +261,16 @@ dependencies = [ "windows-targets 0.52.5", ] +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "clap" version = "4.5.4" @@ -362,9 +414,46 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" dependencies = [ "generic-array", + "rand_core", "typenum", ] +[[package]] +name = "ctr" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835" +dependencies = [ + "cipher", +] + +[[package]] +name = "curve25519-dalek" +version = "4.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a677b8922c94e01bdbb12126b0bc852f00447528dee1782229af9c720c3f348" +dependencies = [ + "cfg-if", + "cpufeatures", + "curve25519-dalek-derive", + "fiat-crypto", + "platforms", + "rustc_version", + "subtle", + "zeroize", +] + +[[package]] +name = "curve25519-dalek-derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "diff" version = "0.1.13" @@ -417,6 +506,12 @@ dependencies = [ "serde", ] +[[package]] +name = "fiat-crypto" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" + [[package]] name = "file-size" version = "1.0.3" @@ -547,6 +642,29 @@ version = "2.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b248f5224d1d606005e02c97f5aa4e88eeb230488bcc03bc9ca4d7991399f2b5" +[[package]] +name = "inout" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" +dependencies = [ + "generic-array", +] + +[[package]] +name = "interprocess" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4d0250d41da118226e55b3d50ca3f0d9e0a0f6829b92f543ac0054aeea1572" +dependencies = [ + "futures-core", + "libc", + "recvmsg", + "tokio", + "widestring", + "windows-sys 0.52.0", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.0" @@ -730,6 +848,12 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "opaque-debug" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" + [[package]] name = "option-ext" version = "0.2.0" @@ -837,6 +961,30 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" +[[package]] +name = "platforms" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db23d408679286588f4d4644f965003d056e3dd5abcaaa938116871d7ce2fee7" + +[[package]] +name = "polyval" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d1fe60d06143b2430aa532c94cfe9e29783047f06c0d7fd359a9a51b729fa25" +dependencies = [ + "cfg-if", + "cpufeatures", + "opaque-debug", + "universal-hash", +] + +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + [[package]] name = "pretty_assertions" version = "1.4.0" @@ -903,6 +1051,36 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + [[package]] name = "ratatui" version = "0.26.2" @@ -923,6 +1101,12 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "recvmsg" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3edd4d5d42c92f0a659926464d4cce56b562761267ecf0f469d85b7de384175" + [[package]] name = "redox_syscall" version = "0.5.1" @@ -972,6 +1156,28 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" +[[package]] +name = "rmp" +version = "0.8.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "228ed7c16fa39782c3b3468e974aec2795e9089153cd08ee2e9aefb3613334c4" +dependencies = [ + "byteorder", + "num-traits", + "paste", +] + +[[package]] +name = "rmp-serde" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52e599a477cf9840e92f2cde9a7189e67b42c57532749bf90aea6ec10facd4db" +dependencies = [ + "byteorder", + "rmp", + "serde", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -984,6 +1190,15 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "rustversion" version = "1.0.16" @@ -1002,6 +1217,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "semver" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" + [[package]] name = "serde" version = "1.0.201" @@ -1088,6 +1309,16 @@ version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" +[[package]] +name = "socket2" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "stability" version = "0.2.0" @@ -1132,6 +1363,12 @@ dependencies = [ "syn", ] +[[package]] +name = "subtle" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" + [[package]] name = "syn" version = "2.0.61" @@ -1170,9 +1407,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" dependencies = [ "backtrace", + "bytes", + "libc", + "mio", "num_cpus", "pin-project-lite", + "socket2", "tokio-macros", + "windows-sys 0.48.0", ] [[package]] @@ -1203,22 +1445,30 @@ dependencies = [ name = "trinitrix" version = "0.1.0" dependencies = [ + "aes-gcm-siv", "anyhow", "clap", "cli-log", "crossterm 0.25.0", "directories", + "interprocess", "keymaps", "libloading", "mlua", "once_cell", "pretty_assertions", + "rand", "ratatui", + "rmp-serde", + "serde", + "strum", "tokio", "tokio-util", "trinitry", "trixy", "tui-textarea", + "uuid", + "x25519-dalek", ] [[package]] @@ -1299,12 +1549,31 @@ version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68f5e5f3158ecfd4b8ff6fe086db7c8467a2dfdac97fe420f2b7c4aa97af66d6" +[[package]] +name = "universal-hash" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc1de2c688dc15305988b563c3854064043356019f97a4b46276fe734c4f07ea" +dependencies = [ + "crypto-common", + "subtle", +] + [[package]] name = "utf8parse" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" +[[package]] +name = "uuid" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0" +dependencies = [ + "getrandom", +] + [[package]] name = "version_check" version = "0.9.4" @@ -1371,6 +1640,12 @@ version = "0.2.92" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" +[[package]] +name = "widestring" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7219d36b6eac893fa81e84ebe06485e7dcbb616177469b142df14f1f4deb1311" + [[package]] name = "winapi" version = "0.3.9" @@ -1541,6 +1816,18 @@ version = "0.52.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0" +[[package]] +name = "x25519-dalek" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7e468321c81fb07fa7f4c636c3972b9100f0346e5b6a9f2bd0603a52f7ed277" +dependencies = [ + "curve25519-dalek", + "rand_core", + "serde", + "zeroize", +] + [[package]] name = "yansi" version = "0.5.1" @@ -1566,3 +1853,23 @@ dependencies = [ "quote", "syn", ] + +[[package]] +name = "zeroize" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4756f7db3f7b5574938c3eb1c117038b8e07f95ee6718c0efad4ac21508f1efd" +dependencies = [ + "zeroize_derive", +] + +[[package]] +name = "zeroize_derive" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/Cargo.toml b/Cargo.toml index 857622b..1bc4095 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,18 +32,28 @@ clap = { version = "4.5.4", features = ["derive"] } cli-log = "2.0" anyhow = "1.0" tokio = { version = "1.37", features = [ - "macros", - "rt-multi-thread", - "fs", - "time", + "macros", + "rt-multi-thread", + "fs", + "time", + "io-util" ] } tokio-util = { version = "0.7.10" } +uuid = { version = "1.8.0", features = ["v4"] } +rand = "0.8.5" +serde = { version = "1.0.201", features = ["derive"] } +rmp-serde = "1.3.0" +strum = { version = "0.26.2", features = ["derive"] } # config trinitry = { version = "0.1.0" } keymaps = { version = "0.1.1", features = ["crossterm"] } directories = "5.0.1" +# crypto +x25519-dalek = "2.0.1" +aes-gcm-siv = { version = "0.11.1", features = ["aes"] } + # c api libloading = "0.8.3" trixy = { version = "0.1.1" } @@ -57,6 +67,9 @@ ratatui = "0.26.2" tui-textarea = { version = "0.4", features = ["crossterm"] } crossterm = { version = "0.25" } +# Trinitrx Backend API specific +interprocess = { version = "2.1.0", features = ["tokio"] } + [dev-dependencies] pretty_assertions = "1.4.0" diff --git a/src/app/mod.rs b/src/app/mod.rs index 46dfe91..33b2c7d 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -42,6 +42,7 @@ use crate::{ events::{Event, EventStatus}, status::{State, Status}, }, + cbs, ui::ui_trait::TrinitrixUi, }; @@ -49,6 +50,8 @@ pub struct App { ui: U, status: Status, + cbs_manager: cbs::Manager, + tx: mpsc::Sender, rx: mpsc::Receiver, @@ -63,17 +66,22 @@ pub struct App { pub static COMMAND_TRANSMITTER: OnceLock> = OnceLock::new(); impl App { - pub fn new(ui: U) -> Result { + pub async fn new(ui: U) -> Result { let (tx, rx) = mpsc::channel(256); COMMAND_TRANSMITTER .set(tx.clone()) .expect("The cell should always be empty at this point"); + let cbs_manager = cbs::Manager::new().await; + cbs_manager.spawn_cbs().await?; + Ok(Self { ui, status: Status::new(), + cbs_manager, + tx: tx.clone(), rx, input_listener_killer: CancellationToken::new(), diff --git a/src/cbs/connection.rs b/src/cbs/connection.rs new file mode 100644 index 0000000..489d1e9 --- /dev/null +++ b/src/cbs/connection.rs @@ -0,0 +1,126 @@ +use super::packet::{self, IdPool, Packet}; + +use aes_gcm_siv::{Aes256GcmSiv, Nonce}; +use anyhow::{anyhow, Result}; +use interprocess::local_socket::tokio::{RecvHalf, SendHalf}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + sync::mpsc, +}; +use tokio_util::sync::CancellationToken; +use uuid::Uuid; + +pub struct Connection { + tx: mpsc::UnboundedSender, +} + +impl Connection { + pub async fn send_packet(&self, body: packet::Body) -> Result<()> { + self.tx.send(body)?; + Ok(()) + } +} + +enum Event { + ToCBS(Packet), + FromCBS(Packet), +} + +pub struct UnstableConnection { + kill_token: CancellationToken, + id: Uuid, +} + +impl UnstableConnection { + pub fn new(kill_token: CancellationToken, id: Uuid) -> Self { + Self { kill_token, id } + } + + pub async fn stabilize( + &self, + cipher: Aes256GcmSiv, + nonce: Nonce, + mut sock_rx: RecvHalf, + mut sock_tx: SendHalf, + ) -> Result { + let (tx, mut rx) = mpsc::unbounded_channel(); + + let mut id_pool = IdPool::new(); + + let packet = Packet::recv(&mut sock_rx, &cipher, &nonce).await.unwrap(); + match packet.body() { + packet::Body::HandshakeUpgradeConnection => { + cli_log::info!( + "CBS {id}: upgraded connection to encrypted messagepack", + id = self.id + ); + } + body => { + return Err(anyhow!( + "expected cbs to send HandshakeUpgradeConnection but got {body}" + )) + } + } + + Packet::new(&mut id_pool, packet::Body::HandshakeSuccess) + .send(&mut sock_tx, &cipher, &nonce) + .await + .unwrap(); + + // Poll packets from socket + { + let cipher = cipher.clone(); + let nonce = nonce.clone(); + let tx = tx.clone(); + + tokio::spawn(async move { + loop { + let packet = Packet::recv(&mut sock_rx, &cipher, &nonce).await.unwrap(); + tx.send(Event::FromCBS(packet)).unwrap(); + } + }); + } + + let (core_tx, mut core_rx) = mpsc::unbounded_channel(); + + // Poll packets from core + tokio::spawn(async move { + loop { + let body = core_rx.recv().await.unwrap(); + + tx.send(Event::ToCBS(Packet::new(&mut id_pool, body))) + .unwrap(); + } + }); + + // Handle and route all packets + { + let cipher = cipher.clone(); + let nonce = nonce.clone(); + let kill_token = self.kill_token.clone(); + + tokio::spawn(async move { + loop { + let event = tokio::select! { + event = rx.recv() => event.unwrap(), + _ = kill_token.cancelled() => break, + }; + + match event { + Event::ToCBS(packet) => { + sock_tx + .write(packet.pack(&cipher, &nonce).unwrap().as_slice()) + .await + .unwrap(); + } + Event::FromCBS(packet) => { + cli_log::info!("Core received CBS packet: {packet:?}") + } + } + } + }); + } + + Ok(Connection { tx: core_tx }) + } +} diff --git a/src/cbs/dummy.rs b/src/cbs/dummy.rs new file mode 100644 index 0000000..9563d37 --- /dev/null +++ b/src/cbs/dummy.rs @@ -0,0 +1,74 @@ +use super::packet; +use crate::cbs::packet::Packet; +use aes_gcm_siv::{Aes256GcmSiv, KeyInit, Nonce}; +use anyhow::anyhow; +use interprocess::local_socket::{ + self, + tokio::{prelude::*, Stream}, +}; +use rand::thread_rng; +use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; +use uuid::Uuid; +use x25519_dalek::{EphemeralSecret, PublicKey}; + +pub async fn cbs(sock_name: local_socket::Name<'_>, id: Uuid) { + let stream = Stream::connect(sock_name).await.unwrap(); + + let (rx, mut tx) = stream.split(); + let mut rx = BufReader::new(rx); + + tx.write(id.as_bytes()).await.unwrap(); + + let dh_core_public = { + let mut buffer = [0u8; 32]; + rx.read(&mut buffer).await.unwrap(); + PublicKey::from(buffer) + }; + + let dh_own_secret = EphemeralSecret::random_from_rng(thread_rng()); + let dh_own_public = PublicKey::from(&dh_own_secret); + + tx.write(dh_own_public.as_bytes()).await.unwrap(); + + let shared_secret = dh_own_secret.diffie_hellman(&dh_core_public); + + let nonce = { + let mut buffer = [0u8; 12]; + rx.read(&mut buffer).await.unwrap(); + Nonce::from(buffer) + }; + + let cipher = Aes256GcmSiv::new(shared_secret.as_bytes().into()); + + let mut id_pool = packet::IdPool::new(); + + tx.write( + Packet::new(&mut id_pool, packet::Body::HandshakeUpgradeConnection) + .pack(&cipher, &nonce) + .unwrap() + .as_slice(), + ) + .await + .unwrap(); + + let packet = Packet::recv(&mut rx, &cipher, &nonce).await.unwrap(); + match packet.body() { + packet::Body::HandshakeSuccess => { + cli_log::info!("[DUMMY CBS] Connection to core stabilized"); + } + _ => { + cli_log::info!("[DUMMY CBS] Connection to core failed"); + return; + } + } + + Packet::new( + &mut id_pool, + packet::Body::Exit(packet::Error::Fatal("This is a test".to_string())), + ) + .send(&mut tx, &cipher, &nonce) + .await + .unwrap(); + + loop {} +} diff --git a/src/cbs/manager.rs b/src/cbs/manager.rs new file mode 100644 index 0000000..ac67cf3 --- /dev/null +++ b/src/cbs/manager.rs @@ -0,0 +1,176 @@ +use super::{packet, Connection, UnstableConnection}; + +use aes_gcm_siv::{Aes256GcmSiv, KeyInit, Nonce}; +use anyhow::{anyhow, Result}; +use interprocess::local_socket::{ + self, + tokio::{prelude::*, Stream}, + GenericFilePath, GenericNamespaced, ListenerOptions, +}; +use rand::{ + distributions::Alphanumeric, + {thread_rng, Rng}, +}; +use std::collections::HashMap; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt, BufReader}, + sync::{mpsc, oneshot}, +}; +use tokio_util::sync::CancellationToken; +use uuid::Uuid; +use x25519_dalek::{EphemeralSecret, PublicKey}; + +enum Event { + IncomingConnection(Stream), + SpawnCBS(oneshot::Sender>), + SendPacket(Uuid, packet::Body), +} + +async fn poll_socket( + sock_name: local_socket::Name<'_>, + tx: mpsc::UnboundedSender, +) -> Result<()> { + let listener = ListenerOptions::new() + .name(sock_name) + .create_tokio() + .unwrap(); + + loop { + let stream = listener.accept().await?; + tx.send(Event::IncomingConnection(stream)).unwrap(); + } +} + +async fn handle_incoming( + stream: Stream, + connections: &mut HashMap>)>, +) -> Result<(Uuid, Connection)> { + let (raw_rx, mut tx) = stream.split(); + let mut rx = BufReader::new(&raw_rx); + + let id = { + let mut buffer = [0u8; 16]; + rx.read(&mut buffer).await?; + Uuid::from_bytes(buffer) + }; + + cli_log::info!("Received CBS connection with id: {id}"); + + let conn = match connections.get(&id) { + Some((conn, _)) => conn, + None => { + return Err(anyhow!("The CBS id {id} is unknown.")); + } + }; + + let dh_own_secret = EphemeralSecret::random_from_rng(thread_rng()); + let dh_own_public = PublicKey::from(&dh_own_secret); + + tx.write(dh_own_public.as_bytes()).await?; + + let dh_cbs_public = { + let mut buffer = [0u8; 32]; + rx.read(&mut buffer).await?; + PublicKey::from(buffer) + }; + + let shared_secret = dh_own_secret.diffie_hellman(&dh_cbs_public); + + let nonce = Nonce::from(rand::random::<[u8; 12]>()); + tx.write(nonce.as_slice()).await?; + + let cipher = Aes256GcmSiv::new(shared_secret.as_bytes().into()); + + let conn = conn.stabilize(cipher, nonce, raw_rx, tx).await?; + + Ok((id, conn)) +} + +pub struct Manager { + tx: mpsc::UnboundedSender, +} + +impl Manager { + pub async fn new() -> Self { + let (tx, mut rx) = mpsc::unbounded_channel(); + + let sock_name = { + let suffix = thread_rng() + .sample_iter(&Alphanumeric) + .take(30) + .map(char::from) + .collect::(); + + if GenericNamespaced::is_supported() { + format!("example_{suffix}.sock") + .to_ns_name::() + .unwrap() + } else { + format!("/tmp/example_{suffix}.sock") + .to_fs_name::() + .unwrap() + } + }; + + // Start socket connection listener + tokio::spawn(poll_socket(sock_name.clone(), tx.clone())); + + tokio::spawn(async move { + let mut unstable_connections: HashMap< + Uuid, + (UnstableConnection, oneshot::Sender>), + > = HashMap::new(); + let mut connections: HashMap = HashMap::new(); + + loop { + match rx.recv().await.unwrap() { + Event::IncomingConnection(stream) => { + match handle_incoming(stream, &mut unstable_connections).await { + Ok((id, conn)) => { + let (_, tx) = unstable_connections.remove(&id).unwrap(); + connections.insert(id, conn); + + tx.send(Ok(id)).unwrap(); + cli_log::info!("CBS {id}: Connection stabilized.") + } + Err(e) => { + cli_log::error!("Handshake failed: {e}"); + } + } + } + Event::SpawnCBS(tx) => { + let id = Uuid::new_v4(); + unstable_connections.insert( + id, + (UnstableConnection::new(CancellationToken::new(), id), tx), + ); + + cli_log::info!("Spawned CBS with ID: {id}"); + + // Start dummy cbs + let sock_name = sock_name.clone(); + tokio::spawn(super::dummy::cbs(sock_name.clone(), id)); + } + Event::SendPacket(id, body) => { + let conn = connections.get(&id).unwrap(); + conn.send_packet(body).await.unwrap(); + } + } + } + }); + + Self { tx } + } + + pub async fn spawn_cbs(&self) -> Result { + let (tx, rx) = oneshot::channel(); + + self.tx.send(Event::SpawnCBS(tx))?; + rx.await? + } + + pub fn send_packet(&self, cbs: Uuid, body: packet::Body) -> Result<()> { + self.tx.send(Event::SendPacket(cbs, body))?; + Ok(()) + } +} diff --git a/src/cbs/mod.rs b/src/cbs/mod.rs new file mode 100644 index 0000000..44a3d35 --- /dev/null +++ b/src/cbs/mod.rs @@ -0,0 +1,10 @@ +pub mod connection; +mod dummy; +pub mod manager; +pub(self) mod packet; + +// TODO: This whole module uses hell a lot of unwraps, +// which need to be replaced by proper error handling. - antifallobst 2024-05-14 + +pub use connection::{Connection, UnstableConnection}; +pub use manager::Manager; diff --git a/src/cbs/packet.rs b/src/cbs/packet.rs new file mode 100644 index 0000000..4ba395f --- /dev/null +++ b/src/cbs/packet.rs @@ -0,0 +1,117 @@ +use aes_gcm_siv::{AeadInPlace, Aes256GcmSiv, Nonce}; +use anyhow::{anyhow, Result}; +use serde::{Deserialize, Serialize}; +use strum::Display; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; + +pub struct IdPool { + last: u64, +} + +impl IdPool { + pub fn new() -> Self { + Self { last: 0 } + } + + pub fn acquire(&mut self) -> u64 { + self.last += 1; + self.last + } +} + +#[derive(Debug, Copy, Clone, Serialize, Deserialize)] +pub struct Version(u32); + +impl Version { + /// Splits the version into its three parts: (major, minor, patch) + pub fn extract(self) -> (u8, u16, u8) { + let major = ((self.0 & 0xFF000000) >> 24) as u8; + let minor = ((self.0 & 0x00FFFF00) >> 8) as u16; + let patch = (self.0 & 0x000000FF) as u8; + + (major, minor, patch) + } +} + +#[repr(u8)] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Error { + Fatal(String), +} + +#[repr(u32)] +#[derive(Debug, Clone, Serialize, Deserialize, Display)] +pub enum Body { + // Misc + Exit(Error) = 0x000_00000, + + // Handshake + HandshakeUpgradeConnection = 0x001_00000, + HandshakeApiVersion(Version) = 0x001_00001, + HandshakeConfig = 0x001_00002, + HandshakeSuccess = 0x001_00003, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Packet { + id: u64, + body: Body, +} + +impl Packet { + pub fn new(id_pool: &mut IdPool, body: Body) -> Self { + Self { + id: id_pool.acquire(), + body, + } + } + + pub fn body(&self) -> &Body { + &self.body + } + + // Serializes the packet to MessagePack and encrypts it. + pub fn pack(&self, cipher: &Aes256GcmSiv, nonce: &Nonce) -> Result> { + let mut data = rmp_serde::to_vec(self)?; + cipher + .encrypt_in_place(&nonce, b"", &mut data) + .map_err(|e| anyhow!("while packing packet: {e}"))?; + + let mut vec = (data.len() as u32).to_le_bytes().to_vec(); + vec.append(&mut data); + + Ok(vec) + } + + // Deserializes a packet from encrypted MessagePack. + pub fn unpack(data: &mut Vec, cipher: &Aes256GcmSiv, nonce: &Nonce) -> Result { + cipher + .decrypt_in_place(&nonce, b"", data) + .map_err(|e| anyhow!("while unpacking packet: {e}"))?; + Ok(rmp_serde::from_slice::(data.as_slice())?) + } + + pub async fn recv( + rx: &mut T, + cipher: &Aes256GcmSiv, + nonce: &Nonce, + ) -> Result { + let size = rx.read_u32_le().await? as usize; + + let mut buffer = vec![0u8; size]; + rx.read(&mut buffer).await?; + + Packet::unpack(&mut buffer, &cipher, &nonce) + } + + pub async fn send( + &self, + tx: &mut T, + cipher: &Aes256GcmSiv, + nonce: &Nonce, + ) -> Result<()> { + let raw = self.pack(cipher, nonce)?; + tx.write(raw.as_slice()).await?; + Ok(()) + } +} diff --git a/src/main.rs b/src/main.rs index 4f2ee91..71fdb55 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,6 +20,7 @@ */ mod app; +mod cbs; mod cli; mod ui; @@ -38,13 +39,13 @@ async fn main() -> anyhow::Result<()> { let args = Args::parse(); match args.subcommand { Command::Tui {} => { - let mut app = app::App::new(Tui::new().context("Failed to setup tui")?)?; + let mut app = app::App::new(Tui::new().context("Failed to setup tui")?).await?; // NOTE(@soispha): The `None` here is temporary <2024-05-08> app.run(None, args.plugin_path).await?; } Command::Repl {} => { - let mut app = app::App::new(Repl::new().context("Failed to setup repl")?)?; + let mut app = app::App::new(Repl::new().context("Failed to setup repl")?).await?; // NOTE(@soispha): The `None` here is temporary <2024-05-03> app.run(None, args.plugin_path).await?; From 042da55f2869760426a90e7617391970548ba1a2 Mon Sep 17 00:00:00 2001 From: antifallobst Date: Tue, 14 May 2024 14:45:13 +0200 Subject: [PATCH 2/8] docs: documented the TriBA handshaking process --- docs/TrinitrixBackendAPI.md | 40 +++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 docs/TrinitrixBackendAPI.md diff --git a/docs/TrinitrixBackendAPI.md b/docs/TrinitrixBackendAPI.md new file mode 100644 index 0000000..15d2de2 --- /dev/null +++ b/docs/TrinitrixBackendAPI.md @@ -0,0 +1,40 @@ +# Trinitrix Backend API (TriBA) + +## Basic concept + +The core starts a CBS as its child process and gives it as first Arg a base64 encoded UUID. +The CBS then connects to the local fs (or namespaced) socket. +After performing a handshake, which includes exchange of encryption keys, all communication +between core and CBS is encrypted (AES256-GCM-SIV) and serialized using [MessagePack](https://msgpack.org) + +## Packets + +Post-Handshake communication is structured in packets, which have the following structure in their raw form: + +| Size (bytes) | Type | Content | +|--------------|-------------------|--------------------------------------------------------------------| +| 4 | uint32 | The size of the payload. | +| - | encrypted payload | The AES-GCM-SIV encrypted MessagePack serialization of the packet. | + +A decrypted and deserialized packet looks like this: + +| Size | Name | Type | Content | +|------|--------|--------|-------------------------------------------------------------------------------------------------------------------| +| 8 | `id` | uint64 | The ID of _this_ packet. Is expected to be an incrementing counter. | +| - | `body` | enum | The actual packet date. (this will be better documented, as soon, as I dive into the mPack serialization details) | + +## Handshake + +The handshaking process after connecting to the socket looks as follows: + +1. The CBS sends its ID as 16 raw bytes. +2. When the ID is not known to the core, it aborts the handshaking process by closing the connection. +3. The core sends its Public Key for this connection. Again just 32 raw bytes. +4. The CBS sends its Public Key for this connection. +5. The core sends a 12 byte nonce value. +6. __Connection Upgrade:__ From this point on, all communication is structured by packets. + The packet encryption key is calculated using x25519 Diffie-Hellman and the previously exchanged keys. + The nonce from step 5 will be used as nonce for all packets. +7. The CBS sends the `HandshakeUpgradeConnection` packet. +8. (In here there is going to happen API version information exchange etc.) +9. The Core responds with `HandshakeSuccess` \ No newline at end of file From 4519fe5db66249d6f085cefc91331847207cc4af Mon Sep 17 00:00:00 2001 From: antifallobst Date: Wed, 15 May 2024 16:51:45 +0200 Subject: [PATCH 3/8] refactor(cbs): moved the packet definitions into their own crate --- Cargo.lock | 16 ++++++ Cargo.toml | 1 + src/cbs/connection.rs | 3 +- src/cbs/dummy.rs | 6 +-- src/cbs/manager.rs | 3 +- src/cbs/mod.rs | 1 - src/cbs/packet.rs | 117 ------------------------------------------ 7 files changed, 22 insertions(+), 125 deletions(-) delete mode 100644 src/cbs/packet.rs diff --git a/Cargo.lock b/Cargo.lock index a48f766..0b5de4b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1441,6 +1441,21 @@ dependencies = [ "tokio", ] +[[package]] +name = "triba-packet" +version = "0.1.0" +dependencies = [ + "aes-gcm-siv", + "anyhow", + "interprocess", + "rmp-serde", + "serde", + "strum", + "tokio", + "uuid", + "x25519-dalek", +] + [[package]] name = "trinitrix" version = "0.1.0" @@ -1464,6 +1479,7 @@ dependencies = [ "strum", "tokio", "tokio-util", + "triba-packet", "trinitry", "trixy", "tui-textarea", diff --git a/Cargo.toml b/Cargo.toml index 1bc4095..c958b9b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,7 @@ crossterm = { version = "0.25" } # Trinitrx Backend API specific interprocess = { version = "2.1.0", features = ["tokio"] } +triba-packet = { path = "../triba-packet" } [dev-dependencies] pretty_assertions = "1.4.0" diff --git a/src/cbs/connection.rs b/src/cbs/connection.rs index 489d1e9..77bfd14 100644 --- a/src/cbs/connection.rs +++ b/src/cbs/connection.rs @@ -1,5 +1,3 @@ -use super::packet::{self, IdPool, Packet}; - use aes_gcm_siv::{Aes256GcmSiv, Nonce}; use anyhow::{anyhow, Result}; use interprocess::local_socket::tokio::{RecvHalf, SendHalf}; @@ -8,6 +6,7 @@ use tokio::{ sync::mpsc, }; use tokio_util::sync::CancellationToken; +use triba_packet::{self as packet, IdPool, Packet}; use uuid::Uuid; pub struct Connection { diff --git a/src/cbs/dummy.rs b/src/cbs/dummy.rs index 9563d37..33d7b82 100644 --- a/src/cbs/dummy.rs +++ b/src/cbs/dummy.rs @@ -1,13 +1,11 @@ -use super::packet; -use crate::cbs::packet::Packet; use aes_gcm_siv::{Aes256GcmSiv, KeyInit, Nonce}; -use anyhow::anyhow; use interprocess::local_socket::{ self, tokio::{prelude::*, Stream}, }; use rand::thread_rng; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; +use triba_packet::{self as packet, IdPool, Packet}; use uuid::Uuid; use x25519_dalek::{EphemeralSecret, PublicKey}; @@ -40,7 +38,7 @@ pub async fn cbs(sock_name: local_socket::Name<'_>, id: Uuid) { let cipher = Aes256GcmSiv::new(shared_secret.as_bytes().into()); - let mut id_pool = packet::IdPool::new(); + let mut id_pool = IdPool::new(); tx.write( Packet::new(&mut id_pool, packet::Body::HandshakeUpgradeConnection) diff --git a/src/cbs/manager.rs b/src/cbs/manager.rs index ac67cf3..1176034 100644 --- a/src/cbs/manager.rs +++ b/src/cbs/manager.rs @@ -1,4 +1,4 @@ -use super::{packet, Connection, UnstableConnection}; +use super::{Connection, UnstableConnection}; use aes_gcm_siv::{Aes256GcmSiv, KeyInit, Nonce}; use anyhow::{anyhow, Result}; @@ -17,6 +17,7 @@ use tokio::{ sync::{mpsc, oneshot}, }; use tokio_util::sync::CancellationToken; +use triba_packet as packet; use uuid::Uuid; use x25519_dalek::{EphemeralSecret, PublicKey}; diff --git a/src/cbs/mod.rs b/src/cbs/mod.rs index 44a3d35..7c576cf 100644 --- a/src/cbs/mod.rs +++ b/src/cbs/mod.rs @@ -1,7 +1,6 @@ pub mod connection; mod dummy; pub mod manager; -pub(self) mod packet; // TODO: This whole module uses hell a lot of unwraps, // which need to be replaced by proper error handling. - antifallobst 2024-05-14 diff --git a/src/cbs/packet.rs b/src/cbs/packet.rs deleted file mode 100644 index 4ba395f..0000000 --- a/src/cbs/packet.rs +++ /dev/null @@ -1,117 +0,0 @@ -use aes_gcm_siv::{AeadInPlace, Aes256GcmSiv, Nonce}; -use anyhow::{anyhow, Result}; -use serde::{Deserialize, Serialize}; -use strum::Display; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; - -pub struct IdPool { - last: u64, -} - -impl IdPool { - pub fn new() -> Self { - Self { last: 0 } - } - - pub fn acquire(&mut self) -> u64 { - self.last += 1; - self.last - } -} - -#[derive(Debug, Copy, Clone, Serialize, Deserialize)] -pub struct Version(u32); - -impl Version { - /// Splits the version into its three parts: (major, minor, patch) - pub fn extract(self) -> (u8, u16, u8) { - let major = ((self.0 & 0xFF000000) >> 24) as u8; - let minor = ((self.0 & 0x00FFFF00) >> 8) as u16; - let patch = (self.0 & 0x000000FF) as u8; - - (major, minor, patch) - } -} - -#[repr(u8)] -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum Error { - Fatal(String), -} - -#[repr(u32)] -#[derive(Debug, Clone, Serialize, Deserialize, Display)] -pub enum Body { - // Misc - Exit(Error) = 0x000_00000, - - // Handshake - HandshakeUpgradeConnection = 0x001_00000, - HandshakeApiVersion(Version) = 0x001_00001, - HandshakeConfig = 0x001_00002, - HandshakeSuccess = 0x001_00003, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Packet { - id: u64, - body: Body, -} - -impl Packet { - pub fn new(id_pool: &mut IdPool, body: Body) -> Self { - Self { - id: id_pool.acquire(), - body, - } - } - - pub fn body(&self) -> &Body { - &self.body - } - - // Serializes the packet to MessagePack and encrypts it. - pub fn pack(&self, cipher: &Aes256GcmSiv, nonce: &Nonce) -> Result> { - let mut data = rmp_serde::to_vec(self)?; - cipher - .encrypt_in_place(&nonce, b"", &mut data) - .map_err(|e| anyhow!("while packing packet: {e}"))?; - - let mut vec = (data.len() as u32).to_le_bytes().to_vec(); - vec.append(&mut data); - - Ok(vec) - } - - // Deserializes a packet from encrypted MessagePack. - pub fn unpack(data: &mut Vec, cipher: &Aes256GcmSiv, nonce: &Nonce) -> Result { - cipher - .decrypt_in_place(&nonce, b"", data) - .map_err(|e| anyhow!("while unpacking packet: {e}"))?; - Ok(rmp_serde::from_slice::(data.as_slice())?) - } - - pub async fn recv( - rx: &mut T, - cipher: &Aes256GcmSiv, - nonce: &Nonce, - ) -> Result { - let size = rx.read_u32_le().await? as usize; - - let mut buffer = vec![0u8; size]; - rx.read(&mut buffer).await?; - - Packet::unpack(&mut buffer, &cipher, &nonce) - } - - pub async fn send( - &self, - tx: &mut T, - cipher: &Aes256GcmSiv, - nonce: &Nonce, - ) -> Result<()> { - let raw = self.pack(cipher, nonce)?; - tx.write(raw.as_slice()).await?; - Ok(()) - } -} From f6a1b5200a96ad808698eb52593915a78683e3e1 Mon Sep 17 00:00:00 2001 From: antifallobst Date: Tue, 21 May 2024 04:20:40 +0200 Subject: [PATCH 4/8] refactor(cbs): added responses and dumped the dummy cbs code --- Cargo.lock | 20 +++++++++++- Cargo.toml | 1 + src/cbs/connection.rs | 76 +++++++++++++++++++++++++++++-------------- src/cbs/dummy.rs | 70 ++------------------------------------- src/cbs/manager.rs | 22 +++++++++---- 5 files changed, 90 insertions(+), 99 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0b5de4b..b14658d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1441,16 +1441,33 @@ dependencies = [ "tokio", ] +[[package]] +name = "triba" +version = "0.1.0" +dependencies = [ + "aes-gcm-siv", + "interprocess", + "rand", + "rmp-serde", + "serde", + "thiserror", + "tokio", + "tokio-util", + "triba-packet", + "uuid", + "x25519-dalek", +] + [[package]] name = "triba-packet" version = "0.1.0" dependencies = [ "aes-gcm-siv", - "anyhow", "interprocess", "rmp-serde", "serde", "strum", + "thiserror", "tokio", "uuid", "x25519-dalek", @@ -1479,6 +1496,7 @@ dependencies = [ "strum", "tokio", "tokio-util", + "triba", "triba-packet", "trinitry", "trixy", diff --git a/Cargo.toml b/Cargo.toml index c958b9b..4ca579b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,6 +70,7 @@ crossterm = { version = "0.25" } # Trinitrx Backend API specific interprocess = { version = "2.1.0", features = ["tokio"] } triba-packet = { path = "../triba-packet" } +triba = { path = "../triba" } [dev-dependencies] pretty_assertions = "1.4.0" diff --git a/src/cbs/connection.rs b/src/cbs/connection.rs index 77bfd14..a89c277 100644 --- a/src/cbs/connection.rs +++ b/src/cbs/connection.rs @@ -6,22 +6,29 @@ use tokio::{ sync::mpsc, }; use tokio_util::sync::CancellationToken; -use triba_packet::{self as packet, IdPool, Packet}; +use triba_packet::{IdPool, Packet, Request, Response}; use uuid::Uuid; pub struct Connection { - tx: mpsc::UnboundedSender, + req_tx: mpsc::UnboundedSender, + resp_tx: mpsc::UnboundedSender<(Response, u64)>, } impl Connection { - pub async fn send_packet(&self, body: packet::Body) -> Result<()> { - self.tx.send(body)?; + pub async fn send_request(&self, body: Request) -> Result<()> { + self.req_tx.send(body)?; + Ok(()) + } + + pub async fn send_response(&self, body: Response, receiver: u64) -> Result<()> { + self.resp_tx.send((body, receiver))?; Ok(()) } } enum Event { - ToCBS(Packet), + ToCBSReq(Request), + ToCBSResp(Response, u64), FromCBS(Packet), } @@ -46,9 +53,13 @@ impl UnstableConnection { let mut id_pool = IdPool::new(); - let packet = Packet::recv(&mut sock_rx, &cipher, &nonce).await.unwrap(); - match packet.body() { - packet::Body::HandshakeUpgradeConnection => { + let packet = Packet::recv(&mut sock_rx, &cipher, &nonce).await?; + match packet { + Packet::Request { id, body } => { + Packet::response(id_pool.acquire(), id, Response::Success) + .send(&mut sock_tx, &cipher, &nonce) + .await?; + cli_log::info!( "CBS {id}: upgraded connection to encrypted messagepack", id = self.id @@ -56,16 +67,11 @@ impl UnstableConnection { } body => { return Err(anyhow!( - "expected cbs to send HandshakeUpgradeConnection but got {body}" + "expected cbs to send: Request::HandshakeUpgradeConnection, but got: {body}" )) } } - Packet::new(&mut id_pool, packet::Body::HandshakeSuccess) - .send(&mut sock_tx, &cipher, &nonce) - .await - .unwrap(); - // Poll packets from socket { let cipher = cipher.clone(); @@ -80,15 +86,26 @@ impl UnstableConnection { }); } - let (core_tx, mut core_rx) = mpsc::unbounded_channel(); + let (core_req_tx, mut core_req_rx) = mpsc::unbounded_channel(); - // Poll packets from core + // Poll requests from core + { + let tx = tx.clone(); + tokio::spawn(async move { + loop { + let body = core_req_rx.recv().await.unwrap(); + tx.send(Event::ToCBSReq(body)).unwrap(); + } + }); + } + + let (core_resp_tx, mut core_resp_rx) = mpsc::unbounded_channel(); + + // Poll responses from core tokio::spawn(async move { loop { - let body = core_rx.recv().await.unwrap(); - - tx.send(Event::ToCBS(Packet::new(&mut id_pool, body))) - .unwrap(); + let (body, req) = core_resp_rx.recv().await.unwrap(); + tx.send(Event::ToCBSResp(body, req)).unwrap(); } }); @@ -106,20 +123,29 @@ impl UnstableConnection { }; match event { - Event::ToCBS(packet) => { - sock_tx - .write(packet.pack(&cipher, &nonce).unwrap().as_slice()) + Event::ToCBSReq(req) => { + Packet::request(id_pool.acquire(), req) + .send(&mut sock_tx, &cipher, &nonce) + .await + .unwrap(); + } + Event::ToCBSResp(resp, req) => { + Packet::response(id_pool.acquire(), req, resp) + .send(&mut sock_tx, &cipher, &nonce) .await .unwrap(); } Event::FromCBS(packet) => { - cli_log::info!("Core received CBS packet: {packet:?}") + cli_log::info!("Core received CBS packet: {packet:?}"); } } } }); } - Ok(Connection { tx: core_tx }) + Ok(Connection { + req_tx: core_req_tx, + resp_tx: core_resp_tx, + }) } } diff --git a/src/cbs/dummy.rs b/src/cbs/dummy.rs index 33d7b82..e5cb73d 100644 --- a/src/cbs/dummy.rs +++ b/src/cbs/dummy.rs @@ -1,72 +1,8 @@ -use aes_gcm_siv::{Aes256GcmSiv, KeyInit, Nonce}; -use interprocess::local_socket::{ - self, - tokio::{prelude::*, Stream}, -}; -use rand::thread_rng; -use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; -use triba_packet::{self as packet, IdPool, Packet}; +use interprocess::local_socket::Name; use uuid::Uuid; -use x25519_dalek::{EphemeralSecret, PublicKey}; -pub async fn cbs(sock_name: local_socket::Name<'_>, id: Uuid) { - let stream = Stream::connect(sock_name).await.unwrap(); - - let (rx, mut tx) = stream.split(); - let mut rx = BufReader::new(rx); - - tx.write(id.as_bytes()).await.unwrap(); - - let dh_core_public = { - let mut buffer = [0u8; 32]; - rx.read(&mut buffer).await.unwrap(); - PublicKey::from(buffer) - }; - - let dh_own_secret = EphemeralSecret::random_from_rng(thread_rng()); - let dh_own_public = PublicKey::from(&dh_own_secret); - - tx.write(dh_own_public.as_bytes()).await.unwrap(); - - let shared_secret = dh_own_secret.diffie_hellman(&dh_core_public); - - let nonce = { - let mut buffer = [0u8; 12]; - rx.read(&mut buffer).await.unwrap(); - Nonce::from(buffer) - }; - - let cipher = Aes256GcmSiv::new(shared_secret.as_bytes().into()); - - let mut id_pool = IdPool::new(); - - tx.write( - Packet::new(&mut id_pool, packet::Body::HandshakeUpgradeConnection) - .pack(&cipher, &nonce) - .unwrap() - .as_slice(), - ) - .await - .unwrap(); - - let packet = Packet::recv(&mut rx, &cipher, &nonce).await.unwrap(); - match packet.body() { - packet::Body::HandshakeSuccess => { - cli_log::info!("[DUMMY CBS] Connection to core stabilized"); - } - _ => { - cli_log::info!("[DUMMY CBS] Connection to core failed"); - return; - } - } - - Packet::new( - &mut id_pool, - packet::Body::Exit(packet::Error::Fatal("This is a test".to_string())), - ) - .send(&mut tx, &cipher, &nonce) - .await - .unwrap(); +pub async fn cbs(sock_name: Name<'_>, id: Uuid) { + let (session, rx) = triba::Session::new(id, sock_name).await.unwrap(); loop {} } diff --git a/src/cbs/manager.rs b/src/cbs/manager.rs index 1176034..81f1337 100644 --- a/src/cbs/manager.rs +++ b/src/cbs/manager.rs @@ -17,14 +17,15 @@ use tokio::{ sync::{mpsc, oneshot}, }; use tokio_util::sync::CancellationToken; -use triba_packet as packet; +use triba_packet::{Request, Response}; use uuid::Uuid; use x25519_dalek::{EphemeralSecret, PublicKey}; enum Event { IncomingConnection(Stream), SpawnCBS(oneshot::Sender>), - SendPacket(Uuid, packet::Body), + SendRequest(Uuid, Request), + SendResponse(Uuid, Response, u64), } async fn poll_socket( @@ -152,9 +153,13 @@ impl Manager { let sock_name = sock_name.clone(); tokio::spawn(super::dummy::cbs(sock_name.clone(), id)); } - Event::SendPacket(id, body) => { + Event::SendRequest(id, body) => { let conn = connections.get(&id).unwrap(); - conn.send_packet(body).await.unwrap(); + conn.send_request(body).await.unwrap(); + } + Event::SendResponse(id, body, req) => { + let conn = connections.get(&id).unwrap(); + conn.send_response(body, req).await.unwrap(); } } } @@ -170,8 +175,13 @@ impl Manager { rx.await? } - pub fn send_packet(&self, cbs: Uuid, body: packet::Body) -> Result<()> { - self.tx.send(Event::SendPacket(cbs, body))?; + pub fn send_request(&self, cbs: Uuid, body: Request) -> Result<()> { + self.tx.send(Event::SendRequest(cbs, body))?; + Ok(()) + } + + pub fn send_response(&self, cbs: Uuid, body: Response, receiver: u64) -> Result<()> { + self.tx.send(Event::SendResponse(cbs, body, receiver))?; Ok(()) } } From fc8efe50601d4b251ee3c9be8ed80ec2b97ddd02 Mon Sep 17 00:00:00 2001 From: antifallobst Date: Tue, 21 May 2024 04:59:23 +0200 Subject: [PATCH 5/8] fix(cbs): added missing parts of the handshake --- src/cbs/connection.rs | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/src/cbs/connection.rs b/src/cbs/connection.rs index a89c277..4397a48 100644 --- a/src/cbs/connection.rs +++ b/src/cbs/connection.rs @@ -56,14 +56,21 @@ impl UnstableConnection { let packet = Packet::recv(&mut sock_rx, &cipher, &nonce).await?; match packet { Packet::Request { id, body } => { - Packet::response(id_pool.acquire(), id, Response::Success) - .send(&mut sock_tx, &cipher, &nonce) - .await?; + match body { + Request::HandshakeUpgradeConnection => { + Packet::response(id_pool.acquire(), id, Response::Success) + .send(&mut sock_tx, &cipher, &nonce) + .await?; - cli_log::info!( - "CBS {id}: upgraded connection to encrypted messagepack", - id = self.id - ); + cli_log::info!( + "CBS {id}: upgraded connection to encrypted messagepack", + id = self.id + ); + } + req => return Err(anyhow!( + "expected cbs to send: Request::HandshakeUpgradeConnection, but got: Request::{req}" + )) + } } body => { return Err(anyhow!( @@ -72,6 +79,10 @@ impl UnstableConnection { } } + Packet::request(id_pool.acquire(), Request::HandshakeSuccess) + .send(&mut sock_tx, &cipher, &nonce) + .await?; + // Poll packets from socket { let cipher = cipher.clone(); From 916ea87537e2a46950160a69d90ebd2dedafc060 Mon Sep 17 00:00:00 2001 From: antifallobst Date: Tue, 21 May 2024 05:00:37 +0200 Subject: [PATCH 6/8] docs: updated the TriBA docs to match the response update --- docs/TrinitrixBackendAPI.md | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/docs/TrinitrixBackendAPI.md b/docs/TrinitrixBackendAPI.md index 15d2de2..8a5a1dc 100644 --- a/docs/TrinitrixBackendAPI.md +++ b/docs/TrinitrixBackendAPI.md @@ -1,5 +1,7 @@ # Trinitrix Backend API (TriBA) +**Disclaimer:** These docs are WIP and going to receive a lot of improvement. + ## Basic concept The core starts a CBS as its child process and gives it as first Arg a base64 encoded UUID. @@ -16,12 +18,29 @@ Post-Handshake communication is structured in packets, which have the following | 4 | uint32 | The size of the payload. | | - | encrypted payload | The AES-GCM-SIV encrypted MessagePack serialization of the packet. | -A decrypted and deserialized packet looks like this: +A decrypted and deserialized payload contains either a response or a request. +A request looks as follows: | Size | Name | Type | Content | |------|--------|--------|-------------------------------------------------------------------------------------------------------------------| -| 8 | `id` | uint64 | The ID of _this_ packet. Is expected to be an incrementing counter. | -| - | `body` | enum | The actual packet date. (this will be better documented, as soon, as I dive into the mPack serialization details) | +| 8 | `id` | uint64 | The ID of _this_ packet. | +| - | `body` | enum | The actual packet data. (this will be better documented, as soon, as I dive into the mPack serialization details) | + +A response looks like this: + +| Size | Name | Type | Content | +|------|--------|--------|-------------------------------------------------------| +| 8 | `id` | uint64 | The ID of _this_ packet. | +| 8 | `req` | uint64 | The ID of the request packet this response refers to. | +| - | `body` | enum | The actual packet data. | + +**Every request packet, that is sent over the socket, has to get a linked response packet.** + +### IDs + +Packet IDs are expected to be an incremental counter. +There is no difference between requests and responses originating from the same socket side when it comes to IDs. +So both - requests and responses - should share the same counter. ## Handshake @@ -35,6 +54,8 @@ The handshaking process after connecting to the socket looks as follows: 6. __Connection Upgrade:__ From this point on, all communication is structured by packets. The packet encryption key is calculated using x25519 Diffie-Hellman and the previously exchanged keys. The nonce from step 5 will be used as nonce for all packets. -7. The CBS sends the `HandshakeUpgradeConnection` packet. -8. (In here there is going to happen API version information exchange etc.) -9. The Core responds with `HandshakeSuccess` \ No newline at end of file +7. The CBS sends the `Request::HandshakeUpgradeConnection` packet. +8. The core responds with `Response::Success`. +9. (In here there is going to happen API version information exchange etc.) +10. The Core sends a `Request::HandshakeSuccess` +11. The CBS responds with `Response::Succcess` \ No newline at end of file From 762ea6067db3d63d480051ae563ca7b811699285 Mon Sep 17 00:00:00 2001 From: antifallobst Date: Tue, 21 May 2024 15:03:41 +0200 Subject: [PATCH 7/8] feat(cbs): hooked up the cbs handling system to the main message queue --- src/app/events/mod.rs | 7 +++++++ src/app/mod.rs | 4 ++-- src/cbs/connection.rs | 41 ++++++++++++++++++++++++++++++++--------- src/cbs/manager.rs | 12 ++++++++++-- 4 files changed, 51 insertions(+), 13 deletions(-) diff --git a/src/app/events/mod.rs b/src/app/events/mod.rs index 75e181e..7e3ab6a 100644 --- a/src/app/events/mod.rs +++ b/src/app/events/mod.rs @@ -31,10 +31,12 @@ use crate::{ use cli_log::{trace, warn}; use crossterm::event::Event as CrosstermEvent; use handlers::{command, input}; +use uuid::Uuid; #[derive(Debug)] pub enum Event { InputEvent(CrosstermEvent), + CBSPacket(Uuid, triba_packet::Packet), // FIXME(@soispha): The `String` here is just wrong <2024-05-03> CommandEvent(Commands, Option>), @@ -49,6 +51,11 @@ impl Event { .await .with_context(|| format!("Failed to handle command event: `{:#?}`", event)), + Event::CBSPacket(cbs, packet) => { + cli_log::info!("Received packet from cbs {cbs}: {packet:?}"); + Ok(EventStatus::Ok) + } + Event::LuaCommand(lua_code) => { warn!( "Got lua code to execute, but no exectuter is available:\n{}", diff --git a/src/app/mod.rs b/src/app/mod.rs index 33b2c7d..365b448 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -73,8 +73,8 @@ impl App { .set(tx.clone()) .expect("The cell should always be empty at this point"); - let cbs_manager = cbs::Manager::new().await; - cbs_manager.spawn_cbs().await?; + let cbs_manager = cbs::Manager::new(tx.clone()).await; + cbs_manager.spawn_cbs().await?; // TODO: remove, this is just a dummy - antifallobst <2024-05-21> Ok(Self { ui, diff --git a/src/cbs/connection.rs b/src/cbs/connection.rs index 4397a48..7a56b96 100644 --- a/src/cbs/connection.rs +++ b/src/cbs/connection.rs @@ -1,10 +1,9 @@ +use crate::app::events::Event as AppEvent; + use aes_gcm_siv::{Aes256GcmSiv, Nonce}; use anyhow::{anyhow, Result}; use interprocess::local_socket::tokio::{RecvHalf, SendHalf}; -use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - sync::mpsc, -}; +use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use triba_packet::{IdPool, Packet, Request, Response}; use uuid::Uuid; @@ -35,11 +34,16 @@ enum Event { pub struct UnstableConnection { kill_token: CancellationToken, id: Uuid, + main_tx: mpsc::Sender, } impl UnstableConnection { - pub fn new(kill_token: CancellationToken, id: Uuid) -> Self { - Self { kill_token, id } + pub fn new(kill_token: CancellationToken, id: Uuid, main_tx: mpsc::Sender) -> Self { + Self { + kill_token, + id, + main_tx, + } } pub async fn stabilize( @@ -83,6 +87,22 @@ impl UnstableConnection { .send(&mut sock_tx, &cipher, &nonce) .await?; + match Packet::recv(&mut sock_rx, &cipher, &nonce).await? { + Packet::Response { body, .. } => match body { + Response::Success => {} + req => { + return Err(anyhow!( + "expected cbs to send: Response::Success, but got: Request::{req}" + )) + } + }, + body => { + return Err(anyhow!( + "expected cbs to send: Request::Success, but got: {body}" + )) + } + } + // Poll packets from socket { let cipher = cipher.clone(); @@ -125,6 +145,8 @@ impl UnstableConnection { let cipher = cipher.clone(); let nonce = nonce.clone(); let kill_token = self.kill_token.clone(); + let main_tx = self.main_tx.clone(); + let id = self.id.clone(); tokio::spawn(async move { loop { @@ -146,9 +168,10 @@ impl UnstableConnection { .await .unwrap(); } - Event::FromCBS(packet) => { - cli_log::info!("Core received CBS packet: {packet:?}"); - } + Event::FromCBS(packet) => main_tx + .send(AppEvent::CBSPacket(id.clone(), packet)) + .await + .unwrap(), } } }); diff --git a/src/cbs/manager.rs b/src/cbs/manager.rs index 81f1337..5bda03f 100644 --- a/src/cbs/manager.rs +++ b/src/cbs/manager.rs @@ -1,4 +1,5 @@ use super::{Connection, UnstableConnection}; +use crate::app::events::Event as AppEvent; use aes_gcm_siv::{Aes256GcmSiv, KeyInit, Nonce}; use anyhow::{anyhow, Result}; @@ -93,7 +94,7 @@ pub struct Manager { } impl Manager { - pub async fn new() -> Self { + pub async fn new(main_tx: mpsc::Sender) -> Self { let (tx, mut rx) = mpsc::unbounded_channel(); let sock_name = { @@ -144,7 +145,14 @@ impl Manager { let id = Uuid::new_v4(); unstable_connections.insert( id, - (UnstableConnection::new(CancellationToken::new(), id), tx), + ( + UnstableConnection::new( + CancellationToken::new(), + id, + main_tx.clone(), + ), + tx, + ), ); cli_log::info!("Spawned CBS with ID: {id}"); From 77a545116b003e6fc35f45d994a127a1fb191c76 Mon Sep 17 00:00:00 2001 From: antifallobst Date: Tue, 21 May 2024 23:07:20 +0200 Subject: [PATCH 8/8] feat(cbs): implemented some basic error handling --- src/app/events/mod.rs | 23 +++++- src/cbs/connection.rs | 167 ++++++++++++++++++++++++------------------ src/cbs/dummy.rs | 4 +- 3 files changed, 119 insertions(+), 75 deletions(-) diff --git a/src/app/events/mod.rs b/src/app/events/mod.rs index 7e3ab6a..522ddf1 100644 --- a/src/app/events/mod.rs +++ b/src/app/events/mod.rs @@ -22,7 +22,7 @@ mod handlers; pub mod listeners; -use anyhow::{Context, Result}; +use anyhow::{Context, Error, Result}; use crate::{ app::{command_interface::Commands, App}, @@ -33,10 +33,29 @@ use crossterm::event::Event as CrosstermEvent; use handlers::{command, input}; use uuid::Uuid; +#[derive(Debug)] +pub enum ErrorEvent { + CBSCrash(Uuid, Error), +} + +impl ErrorEvent { + pub async fn handle(self, app: &mut App) -> Result { + match self { + Self::CBSCrash(cbs, err) => { + // TODO: Kill CBS process + // TODO: Kill CBS connection threads + cli_log::error!("The CBS handler for {cbs} crashed: {err}"); + Ok(EventStatus::Ok) + } + } + } +} + #[derive(Debug)] pub enum Event { InputEvent(CrosstermEvent), CBSPacket(Uuid, triba_packet::Packet), + Error(ErrorEvent), // FIXME(@soispha): The `String` here is just wrong <2024-05-03> CommandEvent(Commands, Option>), @@ -56,6 +75,8 @@ impl Event { Ok(EventStatus::Ok) } + Event::Error(err) => err.handle(app).await, + Event::LuaCommand(lua_code) => { warn!( "Got lua code to execute, but no exectuter is available:\n{}", diff --git a/src/cbs/connection.rs b/src/cbs/connection.rs index 7a56b96..b2df663 100644 --- a/src/cbs/connection.rs +++ b/src/cbs/connection.rs @@ -1,36 +1,35 @@ -use crate::app::events::Event as AppEvent; +use crate::app::events::{ErrorEvent as AppErrorEvent, Event as AppEvent}; use aes_gcm_siv::{Aes256GcmSiv, Nonce}; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use interprocess::local_socket::tokio::{RecvHalf, SendHalf}; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use triba_packet::{IdPool, Packet, Request, Response}; use uuid::Uuid; -pub struct Connection { - req_tx: mpsc::UnboundedSender, - resp_tx: mpsc::UnboundedSender<(Response, u64)>, -} - -impl Connection { - pub async fn send_request(&self, body: Request) -> Result<()> { - self.req_tx.send(body)?; - Ok(()) - } - - pub async fn send_response(&self, body: Response, receiver: u64) -> Result<()> { - self.resp_tx.send((body, receiver))?; - Ok(()) - } -} - enum Event { ToCBSReq(Request), ToCBSResp(Response, u64), FromCBS(Packet), } +pub struct Connection { + tx: mpsc::UnboundedSender, +} + +impl Connection { + pub async fn send_request(&self, body: Request) -> Result<()> { + self.tx.send(Event::ToCBSReq(body))?; + Ok(()) + } + + pub async fn send_response(&self, body: Response, receiver: u64) -> Result<()> { + self.tx.send(Event::ToCBSResp(body, receiver))?; + Ok(()) + } +} + pub struct UnstableConnection { kill_token: CancellationToken, id: Uuid, @@ -109,37 +108,20 @@ impl UnstableConnection { let nonce = nonce.clone(); let tx = tx.clone(); + let id = self.id.clone(); + let main_tx = self.main_tx.clone(); + tokio::spawn(async move { - loop { - let packet = Packet::recv(&mut sock_rx, &cipher, &nonce).await.unwrap(); - tx.send(Event::FromCBS(packet)).unwrap(); + match poll_from_socket(&mut sock_rx, &cipher, &nonce, tx).await { + Err(e) => main_tx + .send(AppEvent::Error(AppErrorEvent::CBSCrash(id, e))) + .await + .expect("Failed to propagate error back to main queue."), + Ok(_) => (), } }); } - let (core_req_tx, mut core_req_rx) = mpsc::unbounded_channel(); - - // Poll requests from core - { - let tx = tx.clone(); - tokio::spawn(async move { - loop { - let body = core_req_rx.recv().await.unwrap(); - tx.send(Event::ToCBSReq(body)).unwrap(); - } - }); - } - - let (core_resp_tx, mut core_resp_rx) = mpsc::unbounded_channel(); - - // Poll responses from core - tokio::spawn(async move { - loop { - let (body, req) = core_resp_rx.recv().await.unwrap(); - tx.send(Event::ToCBSResp(body, req)).unwrap(); - } - }); - // Handle and route all packets { let cipher = cipher.clone(); @@ -149,37 +131,76 @@ impl UnstableConnection { let id = self.id.clone(); tokio::spawn(async move { - loop { - let event = tokio::select! { - event = rx.recv() => event.unwrap(), - _ = kill_token.cancelled() => break, - }; - - match event { - Event::ToCBSReq(req) => { - Packet::request(id_pool.acquire(), req) - .send(&mut sock_tx, &cipher, &nonce) - .await - .unwrap(); - } - Event::ToCBSResp(resp, req) => { - Packet::response(id_pool.acquire(), req, resp) - .send(&mut sock_tx, &cipher, &nonce) - .await - .unwrap(); - } - Event::FromCBS(packet) => main_tx - .send(AppEvent::CBSPacket(id.clone(), packet)) - .await - .unwrap(), - } + match route_packets( + rx, + kill_token, + id_pool, + &mut sock_tx, + &cipher, + &nonce, + main_tx.clone(), + id, + ) + .await + { + Err(e) => main_tx + .send(AppEvent::Error(AppErrorEvent::CBSCrash(id, e))) + .await + .expect("Failed to propagate error back to main queue."), + Ok(_) => (), } }); } - Ok(Connection { - req_tx: core_req_tx, - resp_tx: core_resp_tx, - }) + Ok(Connection { tx }) } } + +async fn poll_from_socket( + rx: &mut RecvHalf, + cipher: &Aes256GcmSiv, + nonce: &Nonce, + tx: mpsc::UnboundedSender, +) -> Result<()> { + loop { + let packet = Packet::recv(rx, cipher, nonce).await?; + tx.send(Event::FromCBS(packet))?; + } +} + +async fn route_packets( + mut rx: mpsc::UnboundedReceiver, + kill_token: CancellationToken, + mut id_pool: IdPool, + sock_tx: &mut SendHalf, + cipher: &Aes256GcmSiv, + nonce: &Nonce, + main_tx: mpsc::Sender, + id: Uuid, +) -> Result<()> { + loop { + let event = tokio::select! { + event = rx.recv() => event.context("The cbs event queue was closed unexpectedly.")?, + _ = kill_token.cancelled() => break, + }; + + match event { + Event::ToCBSReq(req) => { + Packet::request(id_pool.acquire(), req) + .send(sock_tx, cipher, nonce) + .await?; + } + Event::ToCBSResp(resp, req) => { + Packet::response(id_pool.acquire(), req, resp) + .send(sock_tx, cipher, nonce) + .await?; + } + Event::FromCBS(packet) => { + main_tx + .send(AppEvent::CBSPacket(id.clone(), packet)) + .await? + } + } + } + Ok(()) +} diff --git a/src/cbs/dummy.rs b/src/cbs/dummy.rs index e5cb73d..aeb93bd 100644 --- a/src/cbs/dummy.rs +++ b/src/cbs/dummy.rs @@ -4,5 +4,7 @@ use uuid::Uuid; pub async fn cbs(sock_name: Name<'_>, id: Uuid) { let (session, rx) = triba::Session::new(id, sock_name).await.unwrap(); - loop {} + loop { + tokio::task::yield_now().await; + } }