diff --git a/src/accounts/mod.rs b/src/accounts/mod.rs index 7f0a82a..2e2fc08 100644 --- a/src/accounts/mod.rs +++ b/src/accounts/mod.rs @@ -69,6 +69,11 @@ impl AccountsManager { } } + pub async fn restore(&mut self) -> Result<()> { + self.login(self.current_account).await?; + Ok(()) + } + pub async fn add(&mut self, homeserver: &String, username: &String, password: &String) -> Result { let id = self.num_accounts; self.num_accounts += 1; @@ -123,11 +128,12 @@ impl AccountsManager { .build() .await?; client.restore_login(account.session.clone()).await?; + self.clients.insert(account_id as usize, Some(client)); } else { info!("Using cached client for account: '{}'", &account.session.user_id); }; - info!("Restored account: '{}' device ID: {}", &account.session.user_id, &account.session.device_id); + info!("Restored account"); self.current_account = account_id; @@ -172,10 +178,13 @@ impl AccountsManager { self.get(self.current_account) } - pub fn client(&self) -> &Option { + pub fn client(&self) -> Option<&Client> { match self.clients.get(self.current_account as usize) { - None => &None, - Some(c) => c, + None => None, + Some(oc) => match oc { + None => None, + Some(c) => Some(c), + }, } } diff --git a/src/app/event.rs b/src/app/event.rs index 5ec1752..7248281 100644 --- a/src/app/event.rs +++ b/src/app/event.rs @@ -4,16 +4,11 @@ use tokio::time::Duration; use tokio::sync::{mpsc, broadcast}; use tokio_util::sync::CancellationToken; use anyhow::{Result, Error}; -use matrix_sdk::{ - Client, - room::{Room}, - config::SyncSettings, - ruma::events::room::{ - member::StrippedRoomMemberEvent, - message::{MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent}, - }, - event_handler::Ctx -}; +use matrix_sdk::{Client, room::{Room}, config::SyncSettings, ruma::events::room::{ + member::StrippedRoomMemberEvent, + message::{MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent}, +}, LoopCtrl}; +use cli_log::{error, warn, info}; #[derive(Debug)] pub enum EventStatus { @@ -25,6 +20,7 @@ pub enum EventStatus { #[derive(Debug)] pub struct Event { input_event: Option, + matrix_event: Option } pub struct EventBuilder { @@ -35,6 +31,7 @@ impl Default for Event { fn default() -> Self { Self { input_event: None, + matrix_event: None, } } } @@ -54,9 +51,15 @@ impl EventBuilder { self } + fn matrix_event(&mut self, matrix_event: matrix_sdk::deserialized_responses::SyncResponse) -> &Self { + self.event.matrix_event = Some(matrix_event); + self + } + fn build(&self) -> Event { Event { input_event: self.event.input_event.clone(), + matrix_event: self.event.matrix_event.clone(), } } } @@ -64,6 +67,11 @@ impl EventBuilder { impl Event { pub async fn handle(&self, app: &mut App<'_>) -> Result { + if self.matrix_event.is_some() { + info!("Matrix Event: {:#?}", self.matrix_event.clone().unwrap()); + return Ok(EventStatus::Ok); + } + let status = match app.status.state() { State::None => EventStatus::Ok, State::Main => self.handle_main(app).await?, @@ -152,8 +160,6 @@ impl Event { async fn poll_input_events_stage_2(channel: mpsc::Sender) -> Result<()> { loop { if crossterm::event::poll(Duration::from_millis(100))? { - // It's guaranteed that `read` won't block, because `poll` returned - // `Ok(true)`. let event = EventBuilder::default() .input_event(crossterm::event::read()?) .build(); @@ -172,36 +178,30 @@ pub async fn poll_input_events(channel: mpsc::Sender, kill: CancellationT } } -pub async fn poll_matrix_events_stage_2(channel: mpsc::Sender, app: &App<'_>) -> Result<()> { - app.accounts_manager.client(); +async fn poll_matrix_events_stage_2(channel: mpsc::Sender, client: Client) -> Result<()> { + let sync_settings = SyncSettings::default(); + // .token(sync_token) + // .timeout(Duration::from_secs(30)); - let client = match app.client(){ - Some(c) => c, - None => return Err(Error::msg("Failed to fetch current client")), - }; + let tx = &channel; - client.add_event_handler_context(channel.clone()); - client.add_event_handler(on_stripped_state_member); + client.sync_with_callback(sync_settings, |response| async move { + let event = EventBuilder::default() + .matrix_event(response) + .build(); + + match tx.send(event).await { + Ok(_) => LoopCtrl::Continue, + Err(_) => LoopCtrl::Break, + } + }).await?; Ok(()) } -pub async fn poll_matrix_events(channel: mpsc::Sender, kill: CancellationToken, app: &App<'_>) -> Result<()> { +pub async fn poll_matrix_events(channel: mpsc::Sender, kill: CancellationToken, client: Client) -> Result<()> { tokio::select! { - output = poll_matrix_events_stage_2(channel, app) => output, - _ = kill.cancelled() => Err(Error::msg("received kill signal")) + output = poll_matrix_events_stage_2(channel, client) => output, + _ = kill.cancelled() => Err(Error::msg("received kill signal")), } -} - -async fn on_stripped_state_member( - room_member: StrippedRoomMemberEvent, - client: Client, - room: Room, - context: Ctx> -) -> Result<()> { - let event = EventBuilder::default() - .build(); - - context.send(event).await?; - Ok(()) } \ No newline at end of file diff --git a/src/app/mod.rs b/src/app/mod.rs index 474d972..cc6193a 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -28,7 +28,10 @@ pub struct App<'a> { accounts_manager: accounts::AccountsManager, status: Status, + channel_tx: mpsc::Sender, + channel_rx: mpsc::Receiver, input_listener_killer: CancellationToken, + matrix_listener_killer: CancellationToken, } impl Drop for App<'_> { @@ -47,23 +50,31 @@ impl App<'_> { None }; + let (channel_tx, channel_rx) = mpsc::channel(256); + Self { ui: ui::UI::new(), accounts_manager: AccountsManager::new(config), status: Status::default(), + + channel_tx, + channel_rx, input_listener_killer: CancellationToken::new(), + matrix_listener_killer: CancellationToken::new(), } } pub async fn run(&mut self) -> Result<()> { - let (channel_tx, mut channel_rx) = mpsc::channel(256); // Spawn input event listener - tokio::task::spawn(event::poll_input_events(channel_tx.clone(), self.input_listener_killer.clone())); + tokio::task::spawn(event::poll_input_events(self.channel_tx.clone(), self.input_listener_killer.clone())); if self.account().is_err() { info!("No saved sessions found -> jumping into setup"); - self.setup(&mut channel_rx).await?; + self.setup().await?; + } else { + self.accounts_manager.restore().await?; + self.init_account().await?; } @@ -71,7 +82,7 @@ impl App<'_> { self.status.set_state(State::Main); self.ui.update(&self.status).await?; - let event: event::Event = match channel_rx.recv().await { + let event: event::Event = match self.channel_rx.recv().await { Some(e) => e, None => return Err(Error::msg("Event channel has no senders")) }; @@ -87,14 +98,14 @@ impl App<'_> { Ok(()) } - async fn setup(&mut self, receiver: &mut mpsc::Receiver) -> Result<()> { + async fn setup(&mut self) -> Result<()> { self.ui.setup_ui = Some(ui::SetupUI::new()); loop { self.status.set_state(State::Setup); self.ui.update_setup().await?; - let event: event::Event = match receiver.recv().await { + let event: event::Event = match self.channel_rx.recv().await { Some(e) => e, None => return Err(Error::msg("Event channel has no senders")) }; @@ -111,7 +122,14 @@ impl App<'_> { let client = match self.client() { Some(c) => c, None => return Err(Error::msg("failed to get current client")) - }; + }.clone(); + + if !self.matrix_listener_killer.is_cancelled() { + self.matrix_listener_killer.cancel(); + self.matrix_listener_killer = CancellationToken::new(); + } + + tokio::task::spawn(event::poll_matrix_events(self.channel_tx.clone(), self.matrix_listener_killer.clone(), client)); info!("Initializing client for the current account"); @@ -141,7 +159,7 @@ impl App<'_> { } } - pub fn client(&self) -> &Option { + pub fn client(&self) -> Option<&Client> { self.accounts_manager.client() } } \ No newline at end of file