From 9ef721d80c7aec6bd48ca64291bfd15a1660b643 Mon Sep 17 00:00:00 2001 From: antifallobst Date: Tue, 21 May 2024 15:07:13 +0200 Subject: [PATCH] feat: adapted the recent changes and additions (commit 2e84eca15abce541f03fcfdaeaf9cbd436c83351) in the triba crate --- src/lib.rs | 218 +++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 180 insertions(+), 38 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index c4d4c5c..db1e553 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,7 +9,7 @@ use interprocess::local_socket::tokio::{prelude::*, RecvHalf, Stream}; use rand::thread_rng; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::sync::{mpsc, oneshot}; -use triba_packet::{IdPool, Packet, Request, Response}; +use triba_packet::{FormField, IdPool, Message, Packet, Request, Response}; use uuid::Uuid; use x25519_dalek::{EphemeralSecret, PublicKey}; @@ -20,8 +20,7 @@ enum Event { } pub struct Session { - req_tx: mpsc::UnboundedSender<(Request, oneshot::Sender)>, - resp_tx: mpsc::UnboundedSender<(Response, u64)>, + tx: mpsc::UnboundedSender, } impl Session { @@ -85,32 +84,6 @@ impl Session { }); } - let (cbs_req_tx, mut cbs_req_rx) = mpsc::unbounded_channel(); - - { - let tx = tx.clone(); - - tokio::spawn(async move { - loop { - let (body, resp) = cbs_req_rx.recv().await.unwrap(); - tx.send(Event::ToCoreReq(body, resp)).unwrap(); - } - }); - } - - let (cbs_resp_tx, mut cbs_resp_rx) = mpsc::unbounded_channel(); - - { - let tx = tx.clone(); - - tokio::spawn(async move { - loop { - let (body, req) = cbs_resp_rx.recv().await.unwrap(); - tx.send(Event::ToCoreResp(body, req)).unwrap(); - } - }); - } - let (core_tx, mut core_rx) = mpsc::unbounded_channel(); { @@ -121,7 +94,7 @@ impl Session { loop { match rx.recv().await.unwrap() { Event::FromCore(p) => match p { - Packet::Response { id, req, body } => { + Packet::Response { req, body, .. } => { let resp: oneshot::Sender = responses.remove(&req).unwrap(); resp.send(body).unwrap(); @@ -152,10 +125,7 @@ impl Session { }); } - let session = Self { - req_tx: cbs_req_tx, - resp_tx: cbs_resp_tx, - }; + let session = Self { tx }; session.request_handshake_upgrade_connection().await?; @@ -185,8 +155,8 @@ impl Session { async fn request_handshake_upgrade_connection(&self) -> Result<(), Error> { let (tx, rx) = oneshot::channel(); - self.req_tx - .send((Request::HandshakeUpgradeConnection, tx)) + self.tx + .send(Event::ToCoreReq(Request::HandshakeUpgradeConnection, tx)) .map_err(|e| Error::External(e.to_string()))?; match rx.await.map_err(|e| Error::External(e.to_string()))? { @@ -198,9 +168,181 @@ impl Session { } } + pub async fn request_close(&self) -> Result<(), Error> { + let (tx, rx) = oneshot::channel(); + + self.tx + .send(Event::ToCoreReq(Request::Close, tx)) + .map_err(|e| Error::External(e.to_string()))?; + + match rx.await.map_err(|e| Error::External(e.to_string()))? { + Response::Ackknowledged => Ok(()), + resp => Err(Error::UnexpectedPacket( + Packet::response(0, 0, Response::Ackknowledged), + Packet::response(0, 0, resp), + )), + } + } + + pub async fn request_account_setup_select_option( + &self, + options: Vec, + ) -> Result { + let (tx, rx) = oneshot::channel(); + + self.tx + .send(Event::ToCoreReq( + Request::AccountSetupSelectOption(options), + tx, + )) + .map_err(|e| Error::External(e.to_string()))?; + + match rx.await.map_err(|e| Error::External(e.to_string()))? { + Response::AccountSetupSelectOption(option) => Ok(option), + resp => Err(Error::UnexpectedPacket( + Packet::response(0, 0, Response::AccountSetupSelectOption(0)), + Packet::response(0, 0, resp), + )), + } + } + + pub async fn request_account_setup_get_form( + &self, + form: HashMap, + ) -> Result, Error> { + let (tx, rx) = oneshot::channel(); + + self.tx + .send(Event::ToCoreReq(Request::AccountSetupGetForm(form), tx)) + .map_err(|e| Error::External(e.to_string()))?; + + match rx.await.map_err(|e| Error::External(e.to_string()))? { + Response::AccountSetupFilledForm(form) => Ok(form), + resp => Err(Error::UnexpectedPacket( + Packet::response(0, 0, Response::AccountSetupFilledForm(HashMap::new())), + Packet::response(0, 0, resp), + )), + } + } + + pub async fn request_account_setup_success(&self) -> Result<(), Error> { + let (tx, rx) = oneshot::channel(); + + self.tx + .send(Event::ToCoreReq(Request::AccountSetupSuccess, tx)) + .map_err(|e| Error::External(e.to_string()))?; + + match rx.await.map_err(|e| Error::External(e.to_string()))? { + Response::Ackknowledged => Ok(()), + resp => Err(Error::UnexpectedPacket( + Packet::response(0, 0, Response::Ackknowledged), + Packet::response(0, 0, resp), + )), + } + } + + pub async fn request_account_setup_failure(&self, error: String) -> Result<(), Error> { + let (tx, rx) = oneshot::channel(); + + self.tx + .send(Event::ToCoreReq(Request::AccountSetupFailure(error), tx)) + .map_err(|e| Error::External(e.to_string()))?; + + match rx.await.map_err(|e| Error::External(e.to_string()))? { + Response::Ackknowledged => Ok(()), + resp => Err(Error::UnexpectedPacket( + Packet::response(0, 0, Response::Ackknowledged), + Packet::response(0, 0, resp), + )), + } + } + + pub async fn request_initial_sync_ready(&self) -> Result<(), Error> { + let (tx, rx) = oneshot::channel(); + + self.tx + .send(Event::ToCoreReq(Request::InitialSyncReady, tx)) + .map_err(|e| Error::External(e.to_string()))?; + + match rx.await.map_err(|e| Error::External(e.to_string()))? { + Response::Ackknowledged => Ok(()), + resp => Err(Error::UnexpectedPacket( + Packet::response(0, 0, Response::Ackknowledged), + Packet::response(0, 0, resp), + )), + } + } + + pub async fn request_chat_received_message( + &self, + chat: String, + message: Message, + ) -> Result<(), Error> { + let (tx, rx) = oneshot::channel(); + + self.tx + .send(Event::ToCoreReq( + Request::ChatReceivedMessage(chat, message), + tx, + )) + .map_err(|e| Error::External(e.to_string()))?; + + match rx.await.map_err(|e| Error::External(e.to_string()))? { + Response::Ackknowledged => Ok(()), + resp => Err(Error::UnexpectedPacket( + Packet::response(0, 0, Response::Ackknowledged), + Packet::response(0, 0, resp), + )), + } + } + + pub async fn request_chat_received_event( + &self, + chat: String, + event: String, + ) -> Result<(), Error> { + let (tx, rx) = oneshot::channel(); + + self.tx + .send(Event::ToCoreReq( + Request::ChatReceivedEvent(chat, event), + tx, + )) + .map_err(|e| Error::External(e.to_string()))?; + + match rx.await.map_err(|e| Error::External(e.to_string()))? { + Response::Ackknowledged => Ok(()), + resp => Err(Error::UnexpectedPacket( + Packet::response(0, 0, Response::Ackknowledged), + Packet::response(0, 0, resp), + )), + } + } + pub fn response_success(&self, receiver: u64) -> Result<(), Error> { - self.resp_tx - .send((Response::Success, receiver)) + self.tx + .send(Event::ToCoreResp(Response::Success, receiver)) + .map_err(|e| Error::External(e.to_string()))?; + Ok(()) + } + + pub fn response_ackknowledged(&self, receiver: u64) -> Result<(), Error> { + self.tx + .send(Event::ToCoreResp(Response::Ackknowledged, receiver)) + .map_err(|e| Error::External(e.to_string()))?; + Ok(()) + } + + pub fn response_unexpected_request(&self, receiver: u64) -> Result<(), Error> { + self.tx + .send(Event::ToCoreResp(Response::UnexpectedRequest, receiver)) + .map_err(|e| Error::External(e.to_string()))?; + Ok(()) + } + + pub fn response_ressource_timeout(&self, receiver: u64) -> Result<(), Error> { + self.tx + .send(Event::ToCoreResp(Response::RessourceTimeout, receiver)) .map_err(|e| Error::External(e.to_string()))?; Ok(()) }