feat: adapted the recent changes and additions (commit 2e84eca15abce541f03fcfdaeaf9cbd436c83351) in the triba-packet crate

This commit is contained in:
antifallobst 2024-05-21 15:07:13 +02:00
parent 184c6f39cf
commit 37d3a649ea
Signed by: antifallobst
GPG Key ID: 2B4F402172791BAF
1 changed files with 180 additions and 38 deletions

View File

@ -9,7 +9,7 @@ use interprocess::local_socket::tokio::{prelude::*, RecvHalf, Stream};
use rand::thread_rng;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::sync::{mpsc, oneshot};
use triba_packet::{IdPool, Packet, Request, Response};
use triba_packet::{FormField, IdPool, Message, Packet, Request, Response};
use uuid::Uuid;
use x25519_dalek::{EphemeralSecret, PublicKey};
@ -20,8 +20,7 @@ enum Event {
}
pub struct Session {
req_tx: mpsc::UnboundedSender<(Request, oneshot::Sender<Response>)>,
resp_tx: mpsc::UnboundedSender<(Response, u64)>,
tx: mpsc::UnboundedSender<Event>,
}
impl Session {
@ -85,32 +84,6 @@ impl Session {
});
}
let (cbs_req_tx, mut cbs_req_rx) = mpsc::unbounded_channel();
{
let tx = tx.clone();
tokio::spawn(async move {
loop {
let (body, resp) = cbs_req_rx.recv().await.unwrap();
tx.send(Event::ToCoreReq(body, resp)).unwrap();
}
});
}
let (cbs_resp_tx, mut cbs_resp_rx) = mpsc::unbounded_channel();
{
let tx = tx.clone();
tokio::spawn(async move {
loop {
let (body, req) = cbs_resp_rx.recv().await.unwrap();
tx.send(Event::ToCoreResp(body, req)).unwrap();
}
});
}
let (core_tx, mut core_rx) = mpsc::unbounded_channel();
{
@ -121,7 +94,7 @@ impl Session {
loop {
match rx.recv().await.unwrap() {
Event::FromCore(p) => match p {
Packet::Response { id, req, body } => {
Packet::Response { req, body, .. } => {
let resp: oneshot::Sender<Response> =
responses.remove(&req).unwrap();
resp.send(body).unwrap();
@ -152,10 +125,7 @@ impl Session {
});
}
let session = Self {
req_tx: cbs_req_tx,
resp_tx: cbs_resp_tx,
};
let session = Self { tx };
session.request_handshake_upgrade_connection().await?;
@ -185,8 +155,8 @@ impl Session {
async fn request_handshake_upgrade_connection(&self) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
self.req_tx
.send((Request::HandshakeUpgradeConnection, tx))
self.tx
.send(Event::ToCoreReq(Request::HandshakeUpgradeConnection, tx))
.map_err(|e| Error::External(e.to_string()))?;
match rx.await.map_err(|e| Error::External(e.to_string()))? {
@ -198,9 +168,181 @@ impl Session {
}
}
pub async fn request_close(&self) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
self.tx
.send(Event::ToCoreReq(Request::Close, tx))
.map_err(|e| Error::External(e.to_string()))?;
match rx.await.map_err(|e| Error::External(e.to_string()))? {
Response::Ackknowledged => Ok(()),
resp => Err(Error::UnexpectedPacket(
Packet::response(0, 0, Response::Ackknowledged),
Packet::response(0, 0, resp),
)),
}
}
pub async fn request_account_setup_select_option(
&self,
options: Vec<String>,
) -> Result<u64, Error> {
let (tx, rx) = oneshot::channel();
self.tx
.send(Event::ToCoreReq(
Request::AccountSetupSelectOption(options),
tx,
))
.map_err(|e| Error::External(e.to_string()))?;
match rx.await.map_err(|e| Error::External(e.to_string()))? {
Response::AccountSetupSelectOption(option) => Ok(option),
resp => Err(Error::UnexpectedPacket(
Packet::response(0, 0, Response::AccountSetupSelectOption(0)),
Packet::response(0, 0, resp),
)),
}
}
pub async fn request_account_setup_get_form(
&self,
form: HashMap<String, (FormField, bool)>,
) -> Result<HashMap<String, String>, Error> {
let (tx, rx) = oneshot::channel();
self.tx
.send(Event::ToCoreReq(Request::AccountSetupGetForm(form), tx))
.map_err(|e| Error::External(e.to_string()))?;
match rx.await.map_err(|e| Error::External(e.to_string()))? {
Response::AccountSetupFilledForm(form) => Ok(form),
resp => Err(Error::UnexpectedPacket(
Packet::response(0, 0, Response::AccountSetupFilledForm(HashMap::new())),
Packet::response(0, 0, resp),
)),
}
}
pub async fn request_account_setup_success(&self) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
self.tx
.send(Event::ToCoreReq(Request::AccountSetupSuccess, tx))
.map_err(|e| Error::External(e.to_string()))?;
match rx.await.map_err(|e| Error::External(e.to_string()))? {
Response::Ackknowledged => Ok(()),
resp => Err(Error::UnexpectedPacket(
Packet::response(0, 0, Response::Ackknowledged),
Packet::response(0, 0, resp),
)),
}
}
pub async fn request_account_setup_failure(&self, error: String) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
self.tx
.send(Event::ToCoreReq(Request::AccountSetupFailure(error), tx))
.map_err(|e| Error::External(e.to_string()))?;
match rx.await.map_err(|e| Error::External(e.to_string()))? {
Response::Ackknowledged => Ok(()),
resp => Err(Error::UnexpectedPacket(
Packet::response(0, 0, Response::Ackknowledged),
Packet::response(0, 0, resp),
)),
}
}
pub async fn request_initial_sync_ready(&self) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
self.tx
.send(Event::ToCoreReq(Request::InitialSyncReady, tx))
.map_err(|e| Error::External(e.to_string()))?;
match rx.await.map_err(|e| Error::External(e.to_string()))? {
Response::Ackknowledged => Ok(()),
resp => Err(Error::UnexpectedPacket(
Packet::response(0, 0, Response::Ackknowledged),
Packet::response(0, 0, resp),
)),
}
}
pub async fn request_chat_received_message(
&self,
chat: String,
message: Message,
) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
self.tx
.send(Event::ToCoreReq(
Request::ChatReceivedMessage(chat, message),
tx,
))
.map_err(|e| Error::External(e.to_string()))?;
match rx.await.map_err(|e| Error::External(e.to_string()))? {
Response::Ackknowledged => Ok(()),
resp => Err(Error::UnexpectedPacket(
Packet::response(0, 0, Response::Ackknowledged),
Packet::response(0, 0, resp),
)),
}
}
pub async fn request_chat_received_event(
&self,
chat: String,
event: String,
) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
self.tx
.send(Event::ToCoreReq(
Request::ChatReceivedEvent(chat, event),
tx,
))
.map_err(|e| Error::External(e.to_string()))?;
match rx.await.map_err(|e| Error::External(e.to_string()))? {
Response::Ackknowledged => Ok(()),
resp => Err(Error::UnexpectedPacket(
Packet::response(0, 0, Response::Ackknowledged),
Packet::response(0, 0, resp),
)),
}
}
pub fn response_success(&self, receiver: u64) -> Result<(), Error> {
self.resp_tx
.send((Response::Success, receiver))
self.tx
.send(Event::ToCoreResp(Response::Success, receiver))
.map_err(|e| Error::External(e.to_string()))?;
Ok(())
}
pub fn response_ackknowledged(&self, receiver: u64) -> Result<(), Error> {
self.tx
.send(Event::ToCoreResp(Response::Ackknowledged, receiver))
.map_err(|e| Error::External(e.to_string()))?;
Ok(())
}
pub fn response_unexpected_request(&self, receiver: u64) -> Result<(), Error> {
self.tx
.send(Event::ToCoreResp(Response::UnexpectedRequest, receiver))
.map_err(|e| Error::External(e.to_string()))?;
Ok(())
}
pub fn response_ressource_timeout(&self, receiver: u64) -> Result<(), Error> {
self.tx
.send(Event::ToCoreResp(Response::RessourceTimeout, receiver))
.map_err(|e| Error::External(e.to_string()))?;
Ok(())
}