Implement CBS API handling on the trinitrix core side. #23

Merged
antifallobst merged 8 commits from cbs-core into main 2024-05-21 21:09:48 +00:00
9 changed files with 839 additions and 7 deletions
Showing only changes of commit 64d5bdd9c5 - Show all commits

307
Cargo.lock generated
View File

@ -17,6 +17,42 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "aead"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0"
dependencies = [
"crypto-common",
"generic-array",
]
[[package]]
name = "aes"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0"
dependencies = [
"cfg-if",
"cipher",
"cpufeatures",
]
[[package]]
name = "aes-gcm-siv"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae0784134ba9375416d469ec31e7c5f9fa94405049cf08c5ce5b4698be673e0d"
dependencies = [
"aead",
"aes",
"cipher",
"ctr",
"polyval",
"subtle",
"zeroize",
]
[[package]]
name = "ahash"
version = "0.8.11"
@ -172,6 +208,12 @@ version = "3.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c"
[[package]]
name = "byteorder"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.6.0"
@ -219,6 +261,16 @@ dependencies = [
"windows-targets 0.52.5",
]
[[package]]
name = "cipher"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad"
dependencies = [
"crypto-common",
"inout",
]
[[package]]
name = "clap"
version = "4.5.4"
@ -362,9 +414,46 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
dependencies = [
"generic-array",
"rand_core",
"typenum",
]
[[package]]
name = "ctr"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835"
dependencies = [
"cipher",
]
[[package]]
name = "curve25519-dalek"
version = "4.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a677b8922c94e01bdbb12126b0bc852f00447528dee1782229af9c720c3f348"
dependencies = [
"cfg-if",
"cpufeatures",
"curve25519-dalek-derive",
"fiat-crypto",
"platforms",
"rustc_version",
"subtle",
"zeroize",
]
[[package]]
name = "curve25519-dalek-derive"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "diff"
version = "0.1.13"
@ -417,6 +506,12 @@ dependencies = [
"serde",
]
[[package]]
name = "fiat-crypto"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d"
[[package]]
name = "file-size"
version = "1.0.3"
@ -547,6 +642,29 @@ version = "2.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b248f5224d1d606005e02c97f5aa4e88eeb230488bcc03bc9ca4d7991399f2b5"
[[package]]
name = "inout"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5"
dependencies = [
"generic-array",
]
[[package]]
name = "interprocess"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b4d0250d41da118226e55b3d50ca3f0d9e0a0f6829b92f543ac0054aeea1572"
dependencies = [
"futures-core",
"libc",
"recvmsg",
"tokio",
"widestring",
"windows-sys 0.52.0",
]
[[package]]
name = "is_terminal_polyfill"
version = "1.70.0"
@ -730,6 +848,12 @@ version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "opaque-debug"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381"
[[package]]
name = "option-ext"
version = "0.2.0"
@ -837,6 +961,30 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec"
[[package]]
name = "platforms"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db23d408679286588f4d4644f965003d056e3dd5abcaaa938116871d7ce2fee7"
[[package]]
name = "polyval"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d1fe60d06143b2430aa532c94cfe9e29783047f06c0d7fd359a9a51b729fa25"
dependencies = [
"cfg-if",
"cpufeatures",
"opaque-debug",
"universal-hash",
]
[[package]]
name = "ppv-lite86"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "pretty_assertions"
version = "1.4.0"
@ -903,6 +1051,36 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom",
]
[[package]]
name = "ratatui"
version = "0.26.2"
@ -923,6 +1101,12 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "recvmsg"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3edd4d5d42c92f0a659926464d4cce56b562761267ecf0f469d85b7de384175"
[[package]]
name = "redox_syscall"
version = "0.5.1"
@ -972,6 +1156,28 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56"
[[package]]
name = "rmp"
version = "0.8.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "228ed7c16fa39782c3b3468e974aec2795e9089153cd08ee2e9aefb3613334c4"
dependencies = [
"byteorder",
"num-traits",
"paste",
]
[[package]]
name = "rmp-serde"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52e599a477cf9840e92f2cde9a7189e67b42c57532749bf90aea6ec10facd4db"
dependencies = [
"byteorder",
"rmp",
"serde",
]
[[package]]
name = "rustc-demangle"
version = "0.1.24"
@ -984,6 +1190,15 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc_version"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
dependencies = [
"semver",
]
[[package]]
name = "rustversion"
version = "1.0.16"
@ -1002,6 +1217,12 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "semver"
version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b"
[[package]]
name = "serde"
version = "1.0.201"
@ -1088,6 +1309,16 @@ version = "1.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
[[package]]
name = "socket2"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "stability"
version = "0.2.0"
@ -1132,6 +1363,12 @@ dependencies = [
"syn",
]
[[package]]
name = "subtle"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]]
name = "syn"
version = "2.0.61"
@ -1170,9 +1407,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787"
dependencies = [
"backtrace",
"bytes",
"libc",
"mio",
"num_cpus",
"pin-project-lite",
"socket2",
"tokio-macros",
"windows-sys 0.48.0",
]
[[package]]
@ -1203,22 +1445,30 @@ dependencies = [
name = "trinitrix"
version = "0.1.0"
dependencies = [
"aes-gcm-siv",
"anyhow",
"clap",
"cli-log",
"crossterm 0.25.0",
"directories",
"interprocess",
"keymaps",
"libloading",
"mlua",
"once_cell",
"pretty_assertions",
"rand",
"ratatui",
"rmp-serde",
"serde",
"strum",
"tokio",
"tokio-util",
"trinitry",
"trixy",
"tui-textarea",
"uuid",
"x25519-dalek",
]
[[package]]
@ -1299,12 +1549,31 @@ version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68f5e5f3158ecfd4b8ff6fe086db7c8467a2dfdac97fe420f2b7c4aa97af66d6"
[[package]]
name = "universal-hash"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc1de2c688dc15305988b563c3854064043356019f97a4b46276fe734c4f07ea"
dependencies = [
"crypto-common",
"subtle",
]
[[package]]
name = "utf8parse"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]]
name = "uuid"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0"
dependencies = [
"getrandom",
]
[[package]]
name = "version_check"
version = "0.9.4"
@ -1371,6 +1640,12 @@ version = "0.2.92"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96"
[[package]]
name = "widestring"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7219d36b6eac893fa81e84ebe06485e7dcbb616177469b142df14f1f4deb1311"
[[package]]
name = "winapi"
version = "0.3.9"
@ -1541,6 +1816,18 @@ version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0"
[[package]]
name = "x25519-dalek"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7e468321c81fb07fa7f4c636c3972b9100f0346e5b6a9f2bd0603a52f7ed277"
dependencies = [
"curve25519-dalek",
"rand_core",
"serde",
"zeroize",
]
[[package]]
name = "yansi"
version = "0.5.1"
@ -1566,3 +1853,23 @@ dependencies = [
"quote",
"syn",
]
[[package]]
name = "zeroize"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4756f7db3f7b5574938c3eb1c117038b8e07f95ee6718c0efad4ac21508f1efd"
dependencies = [
"zeroize_derive",
]
[[package]]
name = "zeroize_derive"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

View File

@ -32,18 +32,28 @@ clap = { version = "4.5.4", features = ["derive"] }
cli-log = "2.0"
anyhow = "1.0"
tokio = { version = "1.37", features = [
"macros",
"rt-multi-thread",
"fs",
"time",
"macros",
"rt-multi-thread",
"fs",
"time",
"io-util"
] }
tokio-util = { version = "0.7.10" }
uuid = { version = "1.8.0", features = ["v4"] }
rand = "0.8.5"
serde = { version = "1.0.201", features = ["derive"] }
rmp-serde = "1.3.0"
strum = { version = "0.26.2", features = ["derive"] }
# config
trinitry = { version = "0.1.0" }
keymaps = { version = "0.1.1", features = ["crossterm"] }
directories = "5.0.1"
# crypto
x25519-dalek = "2.0.1"
aes-gcm-siv = { version = "0.11.1", features = ["aes"] }
# c api
libloading = "0.8.3"
trixy = { version = "0.1.1" }
@ -57,6 +67,9 @@ ratatui = "0.26.2"
tui-textarea = { version = "0.4", features = ["crossterm"] }
crossterm = { version = "0.25" }
# Trinitrx Backend API specific
interprocess = { version = "2.1.0", features = ["tokio"] }
[dev-dependencies]
pretty_assertions = "1.4.0"

View File

@ -42,6 +42,7 @@ use crate::{
events::{Event, EventStatus},
status::{State, Status},
},
cbs,
ui::ui_trait::TrinitrixUi,
};
@ -49,6 +50,8 @@ pub struct App<U: TrinitrixUi> {
ui: U,
status: Status,
cbs_manager: cbs::Manager,
tx: mpsc::Sender<Event>,
rx: mpsc::Receiver<Event>,
@ -63,17 +66,22 @@ pub struct App<U: TrinitrixUi> {
pub static COMMAND_TRANSMITTER: OnceLock<Sender<Event>> = OnceLock::new();
impl<U: TrinitrixUi> App<U> {
pub fn new(ui: U) -> Result<Self> {
pub async fn new(ui: U) -> Result<Self> {
let (tx, rx) = mpsc::channel(256);
COMMAND_TRANSMITTER
.set(tx.clone())
.expect("The cell should always be empty at this point");
let cbs_manager = cbs::Manager::new().await;
cbs_manager.spawn_cbs().await?;
Ok(Self {
ui,
status: Status::new(),
cbs_manager,
tx: tx.clone(),
rx,
input_listener_killer: CancellationToken::new(),

126
src/cbs/connection.rs Normal file
View File

@ -0,0 +1,126 @@
use super::packet::{self, IdPool, Packet};
use aes_gcm_siv::{Aes256GcmSiv, Nonce};
use anyhow::{anyhow, Result};
use interprocess::local_socket::tokio::{RecvHalf, SendHalf};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::mpsc,
};
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
pub struct Connection {
tx: mpsc::UnboundedSender<packet::Body>,
}
impl Connection {
pub async fn send_packet(&self, body: packet::Body) -> Result<()> {
self.tx.send(body)?;
Ok(())
}
}
enum Event {
ToCBS(Packet),
FromCBS(Packet),
}
pub struct UnstableConnection {
kill_token: CancellationToken,
id: Uuid,
}
impl UnstableConnection {
pub fn new(kill_token: CancellationToken, id: Uuid) -> Self {
Self { kill_token, id }
}
pub async fn stabilize(
&self,
cipher: Aes256GcmSiv,
nonce: Nonce,
mut sock_rx: RecvHalf,
mut sock_tx: SendHalf,
) -> Result<Connection> {
let (tx, mut rx) = mpsc::unbounded_channel();
let mut id_pool = IdPool::new();
let packet = Packet::recv(&mut sock_rx, &cipher, &nonce).await.unwrap();
match packet.body() {
packet::Body::HandshakeUpgradeConnection => {
cli_log::info!(
"CBS {id}: upgraded connection to encrypted messagepack",
id = self.id
);
}
body => {
return Err(anyhow!(
"expected cbs to send HandshakeUpgradeConnection but got {body}"
))
}
}
Packet::new(&mut id_pool, packet::Body::HandshakeSuccess)
.send(&mut sock_tx, &cipher, &nonce)
.await
.unwrap();
// Poll packets from socket
{
let cipher = cipher.clone();
let nonce = nonce.clone();
let tx = tx.clone();
tokio::spawn(async move {
loop {
let packet = Packet::recv(&mut sock_rx, &cipher, &nonce).await.unwrap();
tx.send(Event::FromCBS(packet)).unwrap();
}
});
}
let (core_tx, mut core_rx) = mpsc::unbounded_channel();
// Poll packets from core
tokio::spawn(async move {
loop {
let body = core_rx.recv().await.unwrap();
tx.send(Event::ToCBS(Packet::new(&mut id_pool, body)))
.unwrap();
}
});
// Handle and route all packets
{
let cipher = cipher.clone();
let nonce = nonce.clone();
let kill_token = self.kill_token.clone();
tokio::spawn(async move {
loop {
let event = tokio::select! {
event = rx.recv() => event.unwrap(),
_ = kill_token.cancelled() => break,
};
match event {
Event::ToCBS(packet) => {
sock_tx
.write(packet.pack(&cipher, &nonce).unwrap().as_slice())
.await
.unwrap();
}
Event::FromCBS(packet) => {
cli_log::info!("Core received CBS packet: {packet:?}")
}
}
}
});
}
Ok(Connection { tx: core_tx })
}
}

74
src/cbs/dummy.rs Normal file
View File

@ -0,0 +1,74 @@
use super::packet;
use crate::cbs::packet::Packet;
use aes_gcm_siv::{Aes256GcmSiv, KeyInit, Nonce};
use anyhow::anyhow;
use interprocess::local_socket::{
self,
tokio::{prelude::*, Stream},
};
use rand::thread_rng;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use uuid::Uuid;
use x25519_dalek::{EphemeralSecret, PublicKey};
pub async fn cbs(sock_name: local_socket::Name<'_>, id: Uuid) {
let stream = Stream::connect(sock_name).await.unwrap();
let (rx, mut tx) = stream.split();
let mut rx = BufReader::new(rx);
tx.write(id.as_bytes()).await.unwrap();
let dh_core_public = {
let mut buffer = [0u8; 32];
rx.read(&mut buffer).await.unwrap();
PublicKey::from(buffer)
};
let dh_own_secret = EphemeralSecret::random_from_rng(thread_rng());
let dh_own_public = PublicKey::from(&dh_own_secret);
tx.write(dh_own_public.as_bytes()).await.unwrap();
let shared_secret = dh_own_secret.diffie_hellman(&dh_core_public);
let nonce = {
let mut buffer = [0u8; 12];
rx.read(&mut buffer).await.unwrap();
Nonce::from(buffer)
};
let cipher = Aes256GcmSiv::new(shared_secret.as_bytes().into());
let mut id_pool = packet::IdPool::new();
tx.write(
Packet::new(&mut id_pool, packet::Body::HandshakeUpgradeConnection)
.pack(&cipher, &nonce)
.unwrap()
.as_slice(),
)
.await
.unwrap();
let packet = Packet::recv(&mut rx, &cipher, &nonce).await.unwrap();
match packet.body() {
packet::Body::HandshakeSuccess => {
cli_log::info!("[DUMMY CBS] Connection to core stabilized");
}
_ => {
cli_log::info!("[DUMMY CBS] Connection to core failed");
return;
}
}
Packet::new(
&mut id_pool,
packet::Body::Exit(packet::Error::Fatal("This is a test".to_string())),
)
.send(&mut tx, &cipher, &nonce)
.await
.unwrap();
loop {}
}

176
src/cbs/manager.rs Normal file
View File

@ -0,0 +1,176 @@
use super::{packet, Connection, UnstableConnection};
use aes_gcm_siv::{Aes256GcmSiv, KeyInit, Nonce};
use anyhow::{anyhow, Result};
use interprocess::local_socket::{
self,
tokio::{prelude::*, Stream},
GenericFilePath, GenericNamespaced, ListenerOptions,
};
use rand::{
distributions::Alphanumeric,
{thread_rng, Rng},
};
use std::collections::HashMap;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt, BufReader},
sync::{mpsc, oneshot},
};
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
use x25519_dalek::{EphemeralSecret, PublicKey};
enum Event {
IncomingConnection(Stream),
SpawnCBS(oneshot::Sender<Result<Uuid>>),
SendPacket(Uuid, packet::Body),
}
async fn poll_socket(
sock_name: local_socket::Name<'_>,
tx: mpsc::UnboundedSender<Event>,
) -> Result<()> {
let listener = ListenerOptions::new()
.name(sock_name)
.create_tokio()
.unwrap();
loop {
let stream = listener.accept().await?;
tx.send(Event::IncomingConnection(stream)).unwrap();
}
}
async fn handle_incoming(
stream: Stream,
connections: &mut HashMap<Uuid, (UnstableConnection, oneshot::Sender<Result<Uuid>>)>,
) -> Result<(Uuid, Connection)> {
let (raw_rx, mut tx) = stream.split();
let mut rx = BufReader::new(&raw_rx);
let id = {
let mut buffer = [0u8; 16];
rx.read(&mut buffer).await?;
Uuid::from_bytes(buffer)
};
cli_log::info!("Received CBS connection with id: {id}");
let conn = match connections.get(&id) {
Some((conn, _)) => conn,
None => {
return Err(anyhow!("The CBS id {id} is unknown."));
}
};
let dh_own_secret = EphemeralSecret::random_from_rng(thread_rng());
let dh_own_public = PublicKey::from(&dh_own_secret);
tx.write(dh_own_public.as_bytes()).await?;
let dh_cbs_public = {
let mut buffer = [0u8; 32];
rx.read(&mut buffer).await?;
PublicKey::from(buffer)
};
let shared_secret = dh_own_secret.diffie_hellman(&dh_cbs_public);
let nonce = Nonce::from(rand::random::<[u8; 12]>());
tx.write(nonce.as_slice()).await?;
let cipher = Aes256GcmSiv::new(shared_secret.as_bytes().into());
let conn = conn.stabilize(cipher, nonce, raw_rx, tx).await?;
Ok((id, conn))
}
pub struct Manager {
tx: mpsc::UnboundedSender<Event>,
}
impl Manager {
pub async fn new() -> Self {
let (tx, mut rx) = mpsc::unbounded_channel();
let sock_name = {
let suffix = thread_rng()
.sample_iter(&Alphanumeric)
.take(30)
.map(char::from)
.collect::<String>();
if GenericNamespaced::is_supported() {
format!("example_{suffix}.sock")
.to_ns_name::<GenericNamespaced>()
.unwrap()
} else {
format!("/tmp/example_{suffix}.sock")
.to_fs_name::<GenericFilePath>()
.unwrap()
}
};
// Start socket connection listener
tokio::spawn(poll_socket(sock_name.clone(), tx.clone()));
tokio::spawn(async move {
let mut unstable_connections: HashMap<
Uuid,
(UnstableConnection, oneshot::Sender<Result<Uuid>>),
> = HashMap::new();
let mut connections: HashMap<Uuid, Connection> = HashMap::new();
loop {
match rx.recv().await.unwrap() {
Event::IncomingConnection(stream) => {
match handle_incoming(stream, &mut unstable_connections).await {
Ok((id, conn)) => {
let (_, tx) = unstable_connections.remove(&id).unwrap();
connections.insert(id, conn);
tx.send(Ok(id)).unwrap();
cli_log::info!("CBS {id}: Connection stabilized.")
}
Err(e) => {
cli_log::error!("Handshake failed: {e}");
}
}
}
Event::SpawnCBS(tx) => {
let id = Uuid::new_v4();
unstable_connections.insert(
id,
(UnstableConnection::new(CancellationToken::new(), id), tx),
);
cli_log::info!("Spawned CBS with ID: {id}");
// Start dummy cbs
let sock_name = sock_name.clone();
tokio::spawn(super::dummy::cbs(sock_name.clone(), id));
}
Event::SendPacket(id, body) => {
let conn = connections.get(&id).unwrap();
conn.send_packet(body).await.unwrap();
}
}
}
});
Self { tx }
}
pub async fn spawn_cbs(&self) -> Result<Uuid> {
let (tx, rx) = oneshot::channel();
self.tx.send(Event::SpawnCBS(tx))?;
rx.await?
}
pub fn send_packet(&self, cbs: Uuid, body: packet::Body) -> Result<()> {
self.tx.send(Event::SendPacket(cbs, body))?;
Ok(())
}
}

10
src/cbs/mod.rs Normal file
View File

@ -0,0 +1,10 @@
pub mod connection;
mod dummy;
pub mod manager;
pub(self) mod packet;
// TODO: This whole module uses hell a lot of unwraps,
// which need to be replaced by proper error handling. - antifallobst 2024-05-14
pub use connection::{Connection, UnstableConnection};
pub use manager::Manager;

117
src/cbs/packet.rs Normal file
View File

@ -0,0 +1,117 @@
use aes_gcm_siv::{AeadInPlace, Aes256GcmSiv, Nonce};
use anyhow::{anyhow, Result};
use serde::{Deserialize, Serialize};
use strum::Display;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
pub struct IdPool {
last: u64,
}
impl IdPool {
pub fn new() -> Self {
Self { last: 0 }
}
pub fn acquire(&mut self) -> u64 {
self.last += 1;
self.last
}
}
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
pub struct Version(u32);
impl Version {
/// Splits the version into its three parts: (major, minor, patch)
pub fn extract(self) -> (u8, u16, u8) {
let major = ((self.0 & 0xFF000000) >> 24) as u8;
let minor = ((self.0 & 0x00FFFF00) >> 8) as u16;
let patch = (self.0 & 0x000000FF) as u8;
(major, minor, patch)
}
}
#[repr(u8)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Error {
Fatal(String),
}
#[repr(u32)]
#[derive(Debug, Clone, Serialize, Deserialize, Display)]
pub enum Body {
// Misc
Exit(Error) = 0x000_00000,
// Handshake
HandshakeUpgradeConnection = 0x001_00000,
HandshakeApiVersion(Version) = 0x001_00001,
HandshakeConfig = 0x001_00002,
HandshakeSuccess = 0x001_00003,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Packet {
id: u64,
body: Body,
}
impl Packet {
pub fn new(id_pool: &mut IdPool, body: Body) -> Self {
Self {
id: id_pool.acquire(),
body,
}
}
pub fn body(&self) -> &Body {
&self.body
}
// Serializes the packet to MessagePack and encrypts it.
pub fn pack(&self, cipher: &Aes256GcmSiv, nonce: &Nonce) -> Result<Vec<u8>> {
let mut data = rmp_serde::to_vec(self)?;
cipher
.encrypt_in_place(&nonce, b"", &mut data)
.map_err(|e| anyhow!("while packing packet: {e}"))?;
let mut vec = (data.len() as u32).to_le_bytes().to_vec();
vec.append(&mut data);
Ok(vec)
}
// Deserializes a packet from encrypted MessagePack.
pub fn unpack(data: &mut Vec<u8>, cipher: &Aes256GcmSiv, nonce: &Nonce) -> Result<Self> {
cipher
.decrypt_in_place(&nonce, b"", data)
.map_err(|e| anyhow!("while unpacking packet: {e}"))?;
Ok(rmp_serde::from_slice::<Packet>(data.as_slice())?)
}
pub async fn recv<T: AsyncRead + Unpin>(
rx: &mut T,
cipher: &Aes256GcmSiv,
nonce: &Nonce,
) -> Result<Self> {
let size = rx.read_u32_le().await? as usize;
let mut buffer = vec![0u8; size];
rx.read(&mut buffer).await?;
Packet::unpack(&mut buffer, &cipher, &nonce)
}
pub async fn send<T: AsyncWrite + Unpin>(
&self,
tx: &mut T,
cipher: &Aes256GcmSiv,
nonce: &Nonce,
) -> Result<()> {
let raw = self.pack(cipher, nonce)?;
tx.write(raw.as_slice()).await?;
Ok(())
}
}

View File

@ -20,6 +20,7 @@
*/
mod app;
mod cbs;
mod cli;
mod ui;
@ -38,13 +39,13 @@ async fn main() -> anyhow::Result<()> {
let args = Args::parse();
match args.subcommand {
Command::Tui {} => {
let mut app = app::App::new(Tui::new().context("Failed to setup tui")?)?;
let mut app = app::App::new(Tui::new().context("Failed to setup tui")?).await?;
// NOTE(@soispha): The `None` here is temporary <2024-05-08>
app.run(None, args.plugin_path).await?;
}
Command::Repl {} => {
let mut app = app::App::new(Repl::new().context("Failed to setup repl")?)?;
let mut app = app::App::new(Repl::new().context("Failed to setup repl")?).await?;
// NOTE(@soispha): The `None` here is temporary <2024-05-03>
app.run(None, args.plugin_path).await?;