Implement CBS API handling on the trinitrix core side. #23

Merged
antifallobst merged 8 commits from cbs-core into main 2024-05-21 21:09:48 +00:00
10 changed files with 882 additions and 8 deletions

341
Cargo.lock generated
View File

@ -17,6 +17,42 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "aead"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0"
dependencies = [
"crypto-common",
"generic-array",
]
[[package]]
name = "aes"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0"
dependencies = [
"cfg-if",
"cipher",
"cpufeatures",
]
[[package]]
name = "aes-gcm-siv"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae0784134ba9375416d469ec31e7c5f9fa94405049cf08c5ce5b4698be673e0d"
dependencies = [
"aead",
"aes",
"cipher",
"ctr",
"polyval",
"subtle",
"zeroize",
]
[[package]]
name = "ahash"
version = "0.8.11"
@ -172,6 +208,12 @@ version = "3.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c"
[[package]]
name = "byteorder"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "1.6.0"
@ -219,6 +261,16 @@ dependencies = [
"windows-targets 0.52.5",
]
[[package]]
name = "cipher"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad"
dependencies = [
"crypto-common",
"inout",
]
[[package]]
name = "clap"
version = "4.5.4"
@ -362,9 +414,46 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
dependencies = [
"generic-array",
"rand_core",
"typenum",
]
[[package]]
name = "ctr"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835"
dependencies = [
"cipher",
]
[[package]]
name = "curve25519-dalek"
version = "4.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a677b8922c94e01bdbb12126b0bc852f00447528dee1782229af9c720c3f348"
dependencies = [
"cfg-if",
"cpufeatures",
"curve25519-dalek-derive",
"fiat-crypto",
"platforms",
"rustc_version",
"subtle",
"zeroize",
]
[[package]]
name = "curve25519-dalek-derive"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "diff"
version = "0.1.13"
@ -417,6 +506,12 @@ dependencies = [
"serde",
]
[[package]]
name = "fiat-crypto"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d"
[[package]]
name = "file-size"
version = "1.0.3"
@ -547,6 +642,29 @@ version = "2.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b248f5224d1d606005e02c97f5aa4e88eeb230488bcc03bc9ca4d7991399f2b5"
[[package]]
name = "inout"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5"
dependencies = [
"generic-array",
]
[[package]]
name = "interprocess"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b4d0250d41da118226e55b3d50ca3f0d9e0a0f6829b92f543ac0054aeea1572"
dependencies = [
"futures-core",
"libc",
"recvmsg",
"tokio",
"widestring",
"windows-sys 0.52.0",
]
[[package]]
name = "is_terminal_polyfill"
version = "1.70.0"
@ -730,6 +848,12 @@ version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "opaque-debug"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381"
[[package]]
name = "option-ext"
version = "0.2.0"
@ -837,6 +961,30 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec"
[[package]]
name = "platforms"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db23d408679286588f4d4644f965003d056e3dd5abcaaa938116871d7ce2fee7"
[[package]]
name = "polyval"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d1fe60d06143b2430aa532c94cfe9e29783047f06c0d7fd359a9a51b729fa25"
dependencies = [
"cfg-if",
"cpufeatures",
"opaque-debug",
"universal-hash",
]
[[package]]
name = "ppv-lite86"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]]
name = "pretty_assertions"
version = "1.4.0"
@ -903,6 +1051,36 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom",
]
[[package]]
name = "ratatui"
version = "0.26.2"
@ -923,6 +1101,12 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "recvmsg"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3edd4d5d42c92f0a659926464d4cce56b562761267ecf0f469d85b7de384175"
[[package]]
name = "redox_syscall"
version = "0.5.1"
@ -972,6 +1156,28 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56"
[[package]]
name = "rmp"
version = "0.8.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "228ed7c16fa39782c3b3468e974aec2795e9089153cd08ee2e9aefb3613334c4"
dependencies = [
"byteorder",
"num-traits",
"paste",
]
[[package]]
name = "rmp-serde"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52e599a477cf9840e92f2cde9a7189e67b42c57532749bf90aea6ec10facd4db"
dependencies = [
"byteorder",
"rmp",
"serde",
]
[[package]]
name = "rustc-demangle"
version = "0.1.24"
@ -984,6 +1190,15 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc_version"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
dependencies = [
"semver",
]
[[package]]
name = "rustversion"
version = "1.0.16"
@ -1002,6 +1217,12 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "semver"
version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b"
[[package]]
name = "serde"
version = "1.0.201"
@ -1088,6 +1309,16 @@ version = "1.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
[[package]]
name = "socket2"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871"
dependencies = [
"libc",
"windows-sys 0.52.0",
]
[[package]]
name = "stability"
version = "0.2.0"
@ -1132,6 +1363,12 @@ dependencies = [
"syn",
]
[[package]]
name = "subtle"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]]
name = "syn"
version = "2.0.61"
@ -1170,9 +1407,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787"
dependencies = [
"backtrace",
"bytes",
"libc",
"mio",
"num_cpus",
"pin-project-lite",
"socket2",
"tokio-macros",
"windows-sys 0.48.0",
]
[[package]]
@ -1199,26 +1441,68 @@ dependencies = [
"tokio",
]
[[package]]
name = "triba"
version = "0.1.0"
dependencies = [
"aes-gcm-siv",
"interprocess",
"rand",
"rmp-serde",
"serde",
"thiserror",
"tokio",
"tokio-util",
"triba-packet",
"uuid",
"x25519-dalek",
]
[[package]]
name = "triba-packet"
version = "0.1.0"
dependencies = [
"aes-gcm-siv",
"interprocess",
"rmp-serde",
"serde",
"strum",
"thiserror",
"tokio",
"uuid",
"x25519-dalek",
]
[[package]]
name = "trinitrix"
version = "0.1.0"
dependencies = [
"aes-gcm-siv",
"anyhow",
"clap",
"cli-log",
"crossterm 0.25.0",
"directories",
"interprocess",
"keymaps",
"libloading",
"mlua",
"once_cell",
"pretty_assertions",
"rand",
"ratatui",
"rmp-serde",
"serde",
"strum",
"tokio",
"tokio-util",
"triba",
"triba-packet",
"trinitry",
"trixy",
"tui-textarea",
"uuid",
"x25519-dalek",
]
[[package]]
@ -1299,12 +1583,31 @@ version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68f5e5f3158ecfd4b8ff6fe086db7c8467a2dfdac97fe420f2b7c4aa97af66d6"
[[package]]
name = "universal-hash"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc1de2c688dc15305988b563c3854064043356019f97a4b46276fe734c4f07ea"
dependencies = [
"crypto-common",
"subtle",
]
[[package]]
name = "utf8parse"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]]
name = "uuid"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0"
dependencies = [
"getrandom",
]
[[package]]
name = "version_check"
version = "0.9.4"
@ -1371,6 +1674,12 @@ version = "0.2.92"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96"
[[package]]
name = "widestring"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7219d36b6eac893fa81e84ebe06485e7dcbb616177469b142df14f1f4deb1311"
[[package]]
name = "winapi"
version = "0.3.9"
@ -1541,6 +1850,18 @@ version = "0.52.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bec47e5bfd1bff0eeaf6d8b485cc1074891a197ab4225d504cb7a1ab88b02bf0"
[[package]]
name = "x25519-dalek"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7e468321c81fb07fa7f4c636c3972b9100f0346e5b6a9f2bd0603a52f7ed277"
dependencies = [
"curve25519-dalek",
"rand_core",
"serde",
"zeroize",
]
[[package]]
name = "yansi"
version = "0.5.1"
@ -1566,3 +1887,23 @@ dependencies = [
"quote",
"syn",
]
[[package]]
name = "zeroize"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4756f7db3f7b5574938c3eb1c117038b8e07f95ee6718c0efad4ac21508f1efd"
dependencies = [
"zeroize_derive",
]
[[package]]
name = "zeroize_derive"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

View File

@ -32,18 +32,28 @@ clap = { version = "4.5.4", features = ["derive"] }
cli-log = "2.0"
anyhow = "1.0"
tokio = { version = "1.37", features = [
"macros",
"rt-multi-thread",
"fs",
"time",
"macros",
"rt-multi-thread",
"fs",
"time",
"io-util"
] }
tokio-util = { version = "0.7.10" }
uuid = { version = "1.8.0", features = ["v4"] }
rand = "0.8.5"
serde = { version = "1.0.201", features = ["derive"] }
rmp-serde = "1.3.0"
strum = { version = "0.26.2", features = ["derive"] }
# config
trinitry = { version = "0.1.0" }
keymaps = { version = "0.1.1", features = ["crossterm"] }
directories = "5.0.1"
# crypto
x25519-dalek = "2.0.1"
aes-gcm-siv = { version = "0.11.1", features = ["aes"] }
# c api
libloading = "0.8.3"
trixy = { version = "0.1.1" }
@ -57,6 +67,11 @@ ratatui = "0.26.2"
tui-textarea = { version = "0.4", features = ["crossterm"] }
crossterm = { version = "0.25" }
# Trinitrx Backend API specific
interprocess = { version = "2.1.0", features = ["tokio"] }
triba-packet = { path = "../triba-packet" }
triba = { path = "../triba" }
[dev-dependencies]
pretty_assertions = "1.4.0"

View File

@ -0,0 +1,61 @@
# Trinitrix Backend API (TriBA)
**Disclaimer:** These docs are WIP and going to receive a lot of improvement.
## Basic concept
The core starts a CBS as its child process and gives it as first Arg a base64 encoded UUID.
The CBS then connects to the local fs (or namespaced) socket.
After performing a handshake, which includes exchange of encryption keys, all communication
between core and CBS is encrypted (AES256-GCM-SIV) and serialized using [MessagePack](https://msgpack.org)
## Packets
Post-Handshake communication is structured in packets, which have the following structure in their raw form:
| Size (bytes) | Type | Content |
|--------------|-------------------|--------------------------------------------------------------------|
| 4 | uint32 | The size of the payload. |
| - | encrypted payload | The AES-GCM-SIV encrypted MessagePack serialization of the packet. |
A decrypted and deserialized payload contains either a response or a request.
A request looks as follows:
| Size | Name | Type | Content |
|------|--------|--------|-------------------------------------------------------------------------------------------------------------------|
| 8 | `id` | uint64 | The ID of _this_ packet. |
| - | `body` | enum | The actual packet data. (this will be better documented, as soon, as I dive into the mPack serialization details) |
A response looks like this:
| Size | Name | Type | Content |
|------|--------|--------|-------------------------------------------------------|
| 8 | `id` | uint64 | The ID of _this_ packet. |
| 8 | `req` | uint64 | The ID of the request packet this response refers to. |
| - | `body` | enum | The actual packet data. |
**Every request packet, that is sent over the socket, has to get a linked response packet.**
### IDs
Packet IDs are expected to be an incremental counter.
There is no difference between requests and responses originating from the same socket side when it comes to IDs.
So both - requests and responses - should share the same counter.
## Handshake
The handshaking process after connecting to the socket looks as follows:
1. The CBS sends its ID as 16 raw bytes.
2. When the ID is not known to the core, it aborts the handshaking process by closing the connection.
3. The core sends its Public Key for this connection. Again just 32 raw bytes.
4. The CBS sends its Public Key for this connection.
5. The core sends a 12 byte nonce value.
6. __Connection Upgrade:__ From this point on, all communication is structured by packets.
The packet encryption key is calculated using x25519 Diffie-Hellman and the previously exchanged keys.
The nonce from step 5 will be used as nonce for all packets.
7. The CBS sends the `Request::HandshakeUpgradeConnection` packet.
8. The core responds with `Response::Success`.
9. (In here there is going to happen API version information exchange etc.)
10. The Core sends a `Request::HandshakeSuccess`
11. The CBS responds with `Response::Succcess`

View File

@ -22,7 +22,7 @@
mod handlers;
pub mod listeners;
use anyhow::{Context, Result};
use anyhow::{Context, Error, Result};
use crate::{
app::{command_interface::Commands, App},
@ -31,10 +31,31 @@ use crate::{
use cli_log::{trace, warn};
use crossterm::event::Event as CrosstermEvent;
use handlers::{command, input};
use uuid::Uuid;
#[derive(Debug)]
pub enum ErrorEvent {
CBSCrash(Uuid, Error),
}
impl ErrorEvent {
pub async fn handle<U: TrinitrixUi>(self, app: &mut App<U>) -> Result<EventStatus> {
match self {
Self::CBSCrash(cbs, err) => {
// TODO: Kill CBS process
// TODO: Kill CBS connection threads
cli_log::error!("The CBS handler for {cbs} crashed: {err}");
Ok(EventStatus::Ok)
}
}
}
}
#[derive(Debug)]
pub enum Event {
InputEvent(CrosstermEvent),
CBSPacket(Uuid, triba_packet::Packet),
Error(ErrorEvent),
// FIXME(@soispha): The `String` here is just wrong <2024-05-03>
CommandEvent(Commands, Option<trixy::oneshot::Sender<String>>),
@ -49,6 +70,13 @@ impl Event {
.await
.with_context(|| format!("Failed to handle command event: `{:#?}`", event)),
Event::CBSPacket(cbs, packet) => {
cli_log::info!("Received packet from cbs {cbs}: {packet:?}");
Ok(EventStatus::Ok)
}
Event::Error(err) => err.handle(app).await,
Event::LuaCommand(lua_code) => {
warn!(
"Got lua code to execute, but no exectuter is available:\n{}",

View File

@ -42,6 +42,7 @@ use crate::{
events::{Event, EventStatus},
status::{State, Status},
},
cbs,
ui::ui_trait::TrinitrixUi,
};
@ -49,6 +50,8 @@ pub struct App<U: TrinitrixUi> {
ui: U,
status: Status,
cbs_manager: cbs::Manager,
tx: mpsc::Sender<Event>,
rx: mpsc::Receiver<Event>,
@ -63,17 +66,22 @@ pub struct App<U: TrinitrixUi> {
pub static COMMAND_TRANSMITTER: OnceLock<Sender<Event>> = OnceLock::new();
impl<U: TrinitrixUi> App<U> {
pub fn new(ui: U) -> Result<Self> {
pub async fn new(ui: U) -> Result<Self> {
let (tx, rx) = mpsc::channel(256);
COMMAND_TRANSMITTER
.set(tx.clone())
.expect("The cell should always be empty at this point");
let cbs_manager = cbs::Manager::new(tx.clone()).await;
cbs_manager.spawn_cbs().await?; // TODO: remove, this is just a dummy - antifallobst <2024-05-21>
Ok(Self {
ui,
status: Status::new(),
cbs_manager,
tx: tx.clone(),
rx,
input_listener_killer: CancellationToken::new(),

206
src/cbs/connection.rs Normal file
View File

@ -0,0 +1,206 @@
use crate::app::events::{ErrorEvent as AppErrorEvent, Event as AppEvent};
use aes_gcm_siv::{Aes256GcmSiv, Nonce};
use anyhow::{anyhow, Context, Result};
use interprocess::local_socket::tokio::{RecvHalf, SendHalf};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use triba_packet::{IdPool, Packet, Request, Response};
use uuid::Uuid;
enum Event {
ToCBSReq(Request),
ToCBSResp(Response, u64),
FromCBS(Packet),
}
pub struct Connection {
tx: mpsc::UnboundedSender<Event>,
}
impl Connection {
pub async fn send_request(&self, body: Request) -> Result<()> {
self.tx.send(Event::ToCBSReq(body))?;
Ok(())
}
pub async fn send_response(&self, body: Response, receiver: u64) -> Result<()> {
self.tx.send(Event::ToCBSResp(body, receiver))?;
Ok(())
}
}
pub struct UnstableConnection {
kill_token: CancellationToken,
id: Uuid,
main_tx: mpsc::Sender<AppEvent>,
}
impl UnstableConnection {
pub fn new(kill_token: CancellationToken, id: Uuid, main_tx: mpsc::Sender<AppEvent>) -> Self {
Self {
kill_token,
id,
main_tx,
}
}
pub async fn stabilize(
&self,
cipher: Aes256GcmSiv,
nonce: Nonce,
mut sock_rx: RecvHalf,
mut sock_tx: SendHalf,
) -> Result<Connection> {
let (tx, mut rx) = mpsc::unbounded_channel();
let mut id_pool = IdPool::new();
let packet = Packet::recv(&mut sock_rx, &cipher, &nonce).await?;
match packet {
Packet::Request { id, body } => {
match body {
Request::HandshakeUpgradeConnection => {
Packet::response(id_pool.acquire(), id, Response::Success)
.send(&mut sock_tx, &cipher, &nonce)
.await?;
cli_log::info!(
"CBS {id}: upgraded connection to encrypted messagepack",
id = self.id
);
}
req => return Err(anyhow!(
"expected cbs to send: Request::HandshakeUpgradeConnection, but got: Request::{req}"
))
}
}
body => {
return Err(anyhow!(
"expected cbs to send: Request::HandshakeUpgradeConnection, but got: {body}"
))
}
}
Packet::request(id_pool.acquire(), Request::HandshakeSuccess)
.send(&mut sock_tx, &cipher, &nonce)
.await?;
match Packet::recv(&mut sock_rx, &cipher, &nonce).await? {
Packet::Response { body, .. } => match body {
Response::Success => {}
req => {
return Err(anyhow!(
"expected cbs to send: Response::Success, but got: Request::{req}"
))
}
},
body => {
return Err(anyhow!(
"expected cbs to send: Request::Success, but got: {body}"
))
}
}
// Poll packets from socket
{
let cipher = cipher.clone();
let nonce = nonce.clone();
let tx = tx.clone();
let id = self.id.clone();
let main_tx = self.main_tx.clone();
tokio::spawn(async move {
match poll_from_socket(&mut sock_rx, &cipher, &nonce, tx).await {
Err(e) => main_tx
.send(AppEvent::Error(AppErrorEvent::CBSCrash(id, e)))
.await
.expect("Failed to propagate error back to main queue."),
Ok(_) => (),
}
});
}
// Handle and route all packets
{
let cipher = cipher.clone();
let nonce = nonce.clone();
let kill_token = self.kill_token.clone();
let main_tx = self.main_tx.clone();
let id = self.id.clone();
tokio::spawn(async move {
match route_packets(
rx,
kill_token,
id_pool,
&mut sock_tx,
&cipher,
&nonce,
main_tx.clone(),
id,
)
.await
{
Err(e) => main_tx
.send(AppEvent::Error(AppErrorEvent::CBSCrash(id, e)))
.await
.expect("Failed to propagate error back to main queue."),
Ok(_) => (),
}
});
}
Ok(Connection { tx })
}
}
async fn poll_from_socket(
rx: &mut RecvHalf,
cipher: &Aes256GcmSiv,
nonce: &Nonce,
tx: mpsc::UnboundedSender<Event>,
) -> Result<()> {
loop {
let packet = Packet::recv(rx, cipher, nonce).await?;
tx.send(Event::FromCBS(packet))?;
}
}
async fn route_packets(
mut rx: mpsc::UnboundedReceiver<Event>,
kill_token: CancellationToken,
mut id_pool: IdPool,
sock_tx: &mut SendHalf,
cipher: &Aes256GcmSiv,
nonce: &Nonce,
main_tx: mpsc::Sender<AppEvent>,
id: Uuid,
) -> Result<()> {
loop {
let event = tokio::select! {
event = rx.recv() => event.context("The cbs event queue was closed unexpectedly.")?,
_ = kill_token.cancelled() => break,
};
match event {
Event::ToCBSReq(req) => {
Packet::request(id_pool.acquire(), req)
.send(sock_tx, cipher, nonce)
.await?;
}
Event::ToCBSResp(resp, req) => {
Packet::response(id_pool.acquire(), req, resp)
.send(sock_tx, cipher, nonce)
.await?;
}
Event::FromCBS(packet) => {
main_tx
.send(AppEvent::CBSPacket(id.clone(), packet))
.await?
}
}
}
Ok(())
}

10
src/cbs/dummy.rs Normal file
View File

@ -0,0 +1,10 @@
use interprocess::local_socket::Name;
use uuid::Uuid;
pub async fn cbs(sock_name: Name<'_>, id: Uuid) {
let (session, rx) = triba::Session::new(id, sock_name).await.unwrap();
loop {
tokio::task::yield_now().await;
}
}

195
src/cbs/manager.rs Normal file
View File

@ -0,0 +1,195 @@
use super::{Connection, UnstableConnection};
use crate::app::events::Event as AppEvent;
use aes_gcm_siv::{Aes256GcmSiv, KeyInit, Nonce};
use anyhow::{anyhow, Result};
use interprocess::local_socket::{
self,
tokio::{prelude::*, Stream},
GenericFilePath, GenericNamespaced, ListenerOptions,
};
use rand::{
distributions::Alphanumeric,
{thread_rng, Rng},
};
use std::collections::HashMap;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt, BufReader},
sync::{mpsc, oneshot},
};
use tokio_util::sync::CancellationToken;
use triba_packet::{Request, Response};
use uuid::Uuid;
use x25519_dalek::{EphemeralSecret, PublicKey};
enum Event {
IncomingConnection(Stream),
SpawnCBS(oneshot::Sender<Result<Uuid>>),
SendRequest(Uuid, Request),
SendResponse(Uuid, Response, u64),
}
async fn poll_socket(
sock_name: local_socket::Name<'_>,
tx: mpsc::UnboundedSender<Event>,
) -> Result<()> {
let listener = ListenerOptions::new()
.name(sock_name)
.create_tokio()
.unwrap();
loop {
let stream = listener.accept().await?;
tx.send(Event::IncomingConnection(stream)).unwrap();
}
}
async fn handle_incoming(
stream: Stream,
connections: &mut HashMap<Uuid, (UnstableConnection, oneshot::Sender<Result<Uuid>>)>,
) -> Result<(Uuid, Connection)> {
let (raw_rx, mut tx) = stream.split();
let mut rx = BufReader::new(&raw_rx);
let id = {
let mut buffer = [0u8; 16];
rx.read(&mut buffer).await?;
Uuid::from_bytes(buffer)
};
cli_log::info!("Received CBS connection with id: {id}");
let conn = match connections.get(&id) {
Some((conn, _)) => conn,
None => {
return Err(anyhow!("The CBS id {id} is unknown."));
}
};
let dh_own_secret = EphemeralSecret::random_from_rng(thread_rng());
let dh_own_public = PublicKey::from(&dh_own_secret);
tx.write(dh_own_public.as_bytes()).await?;
let dh_cbs_public = {
let mut buffer = [0u8; 32];
rx.read(&mut buffer).await?;
PublicKey::from(buffer)
};
let shared_secret = dh_own_secret.diffie_hellman(&dh_cbs_public);
let nonce = Nonce::from(rand::random::<[u8; 12]>());
tx.write(nonce.as_slice()).await?;
let cipher = Aes256GcmSiv::new(shared_secret.as_bytes().into());
let conn = conn.stabilize(cipher, nonce, raw_rx, tx).await?;
Ok((id, conn))
}
pub struct Manager {
tx: mpsc::UnboundedSender<Event>,
}
impl Manager {
pub async fn new(main_tx: mpsc::Sender<AppEvent>) -> Self {
let (tx, mut rx) = mpsc::unbounded_channel();
let sock_name = {
let suffix = thread_rng()
.sample_iter(&Alphanumeric)
.take(30)
.map(char::from)
.collect::<String>();
if GenericNamespaced::is_supported() {
format!("example_{suffix}.sock")
.to_ns_name::<GenericNamespaced>()
.unwrap()
} else {
format!("/tmp/example_{suffix}.sock")
.to_fs_name::<GenericFilePath>()
.unwrap()
}
};
// Start socket connection listener
tokio::spawn(poll_socket(sock_name.clone(), tx.clone()));
tokio::spawn(async move {
let mut unstable_connections: HashMap<
Uuid,
(UnstableConnection, oneshot::Sender<Result<Uuid>>),
> = HashMap::new();
let mut connections: HashMap<Uuid, Connection> = HashMap::new();
loop {
match rx.recv().await.unwrap() {
Event::IncomingConnection(stream) => {
match handle_incoming(stream, &mut unstable_connections).await {
Ok((id, conn)) => {
let (_, tx) = unstable_connections.remove(&id).unwrap();
connections.insert(id, conn);
tx.send(Ok(id)).unwrap();
cli_log::info!("CBS {id}: Connection stabilized.")
}
Err(e) => {
cli_log::error!("Handshake failed: {e}");
}
}
}
Event::SpawnCBS(tx) => {
let id = Uuid::new_v4();
unstable_connections.insert(
id,
(
UnstableConnection::new(
CancellationToken::new(),
id,
main_tx.clone(),
),
tx,
),
);
cli_log::info!("Spawned CBS with ID: {id}");
// Start dummy cbs
let sock_name = sock_name.clone();
tokio::spawn(super::dummy::cbs(sock_name.clone(), id));
}
Event::SendRequest(id, body) => {
let conn = connections.get(&id).unwrap();
conn.send_request(body).await.unwrap();
}
Event::SendResponse(id, body, req) => {
let conn = connections.get(&id).unwrap();
conn.send_response(body, req).await.unwrap();
}
}
}
});
Self { tx }
}
pub async fn spawn_cbs(&self) -> Result<Uuid> {
let (tx, rx) = oneshot::channel();
self.tx.send(Event::SpawnCBS(tx))?;
rx.await?
}
pub fn send_request(&self, cbs: Uuid, body: Request) -> Result<()> {
self.tx.send(Event::SendRequest(cbs, body))?;
Ok(())
}
pub fn send_response(&self, cbs: Uuid, body: Response, receiver: u64) -> Result<()> {
self.tx.send(Event::SendResponse(cbs, body, receiver))?;
Ok(())
}
}

9
src/cbs/mod.rs Normal file
View File

@ -0,0 +1,9 @@
pub mod connection;
mod dummy;
pub mod manager;
// TODO: This whole module uses hell a lot of unwraps,
// which need to be replaced by proper error handling. - antifallobst 2024-05-14
pub use connection::{Connection, UnstableConnection};
pub use manager::Manager;

View File

@ -20,6 +20,7 @@
*/
mod app;
mod cbs;
mod cli;
mod ui;
@ -38,13 +39,13 @@ async fn main() -> anyhow::Result<()> {
let args = Args::parse();
match args.subcommand {
Command::Tui {} => {
let mut app = app::App::new(Tui::new().context("Failed to setup tui")?)?;
let mut app = app::App::new(Tui::new().context("Failed to setup tui")?).await?;
// NOTE(@soispha): The `None` here is temporary <2024-05-08>
app.run(None, args.plugin_path).await?;
}
Command::Repl {} => {
let mut app = app::App::new(Repl::new().context("Failed to setup repl")?)?;
let mut app = app::App::new(Repl::new().context("Failed to setup repl")?).await?;
// NOTE(@soispha): The `None` here is temporary <2024-05-03>
app.run(None, args.plugin_path).await?;