forked from trinitrix/core
1
0
Fork 0

feature (events): added a matrix event listener that writes to the global event stream

This commit is contained in:
antifallobst 2023-07-06 15:13:41 +02:00
parent dfc87ff937
commit b1c0007098
3 changed files with 75 additions and 48 deletions

View File

@ -69,6 +69,11 @@ impl AccountsManager {
} }
} }
pub async fn restore(&mut self) -> Result<()> {
self.login(self.current_account).await?;
Ok(())
}
pub async fn add(&mut self, homeserver: &String, username: &String, password: &String) -> Result<u32> { pub async fn add(&mut self, homeserver: &String, username: &String, password: &String) -> Result<u32> {
let id = self.num_accounts; let id = self.num_accounts;
self.num_accounts += 1; self.num_accounts += 1;
@ -123,11 +128,12 @@ impl AccountsManager {
.build() .build()
.await?; .await?;
client.restore_login(account.session.clone()).await?; client.restore_login(account.session.clone()).await?;
self.clients.insert(account_id as usize, Some(client));
} else { } else {
info!("Using cached client for account: '{}'", &account.session.user_id); info!("Using cached client for account: '{}'", &account.session.user_id);
}; };
info!("Restored account: '{}' device ID: {}", &account.session.user_id, &account.session.device_id); info!("Restored account");
self.current_account = account_id; self.current_account = account_id;
@ -172,10 +178,13 @@ impl AccountsManager {
self.get(self.current_account) self.get(self.current_account)
} }
pub fn client(&self) -> &Option<Client> { pub fn client(&self) -> Option<&Client> {
match self.clients.get(self.current_account as usize) { match self.clients.get(self.current_account as usize) {
None => &None, None => None,
Some(c) => c, Some(oc) => match oc {
None => None,
Some(c) => Some(c),
},
} }
} }

View File

@ -4,16 +4,11 @@ use tokio::time::Duration;
use tokio::sync::{mpsc, broadcast}; use tokio::sync::{mpsc, broadcast};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use anyhow::{Result, Error}; use anyhow::{Result, Error};
use matrix_sdk::{ use matrix_sdk::{Client, room::{Room}, config::SyncSettings, ruma::events::room::{
Client, member::StrippedRoomMemberEvent,
room::{Room}, message::{MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent},
config::SyncSettings, }, LoopCtrl};
ruma::events::room::{ use cli_log::{error, warn, info};
member::StrippedRoomMemberEvent,
message::{MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent},
},
event_handler::Ctx
};
#[derive(Debug)] #[derive(Debug)]
pub enum EventStatus { pub enum EventStatus {
@ -25,6 +20,7 @@ pub enum EventStatus {
#[derive(Debug)] #[derive(Debug)]
pub struct Event { pub struct Event {
input_event: Option<crossterm::event::Event>, input_event: Option<crossterm::event::Event>,
matrix_event: Option<matrix_sdk::deserialized_responses::SyncResponse>
} }
pub struct EventBuilder { pub struct EventBuilder {
@ -35,6 +31,7 @@ impl Default for Event {
fn default() -> Self { fn default() -> Self {
Self { Self {
input_event: None, input_event: None,
matrix_event: None,
} }
} }
} }
@ -54,9 +51,15 @@ impl EventBuilder {
self self
} }
fn matrix_event(&mut self, matrix_event: matrix_sdk::deserialized_responses::SyncResponse) -> &Self {
self.event.matrix_event = Some(matrix_event);
self
}
fn build(&self) -> Event { fn build(&self) -> Event {
Event { Event {
input_event: self.event.input_event.clone(), input_event: self.event.input_event.clone(),
matrix_event: self.event.matrix_event.clone(),
} }
} }
} }
@ -64,6 +67,11 @@ impl EventBuilder {
impl Event { impl Event {
pub async fn handle(&self, app: &mut App<'_>) -> Result<EventStatus> { pub async fn handle(&self, app: &mut App<'_>) -> Result<EventStatus> {
if self.matrix_event.is_some() {
info!("Matrix Event: {:#?}", self.matrix_event.clone().unwrap());
return Ok(EventStatus::Ok);
}
let status = match app.status.state() { let status = match app.status.state() {
State::None => EventStatus::Ok, State::None => EventStatus::Ok,
State::Main => self.handle_main(app).await?, State::Main => self.handle_main(app).await?,
@ -152,8 +160,6 @@ impl Event {
async fn poll_input_events_stage_2(channel: mpsc::Sender<Event>) -> Result<()> { async fn poll_input_events_stage_2(channel: mpsc::Sender<Event>) -> Result<()> {
loop { loop {
if crossterm::event::poll(Duration::from_millis(100))? { if crossterm::event::poll(Duration::from_millis(100))? {
// It's guaranteed that `read` won't block, because `poll` returned
// `Ok(true)`.
let event = EventBuilder::default() let event = EventBuilder::default()
.input_event(crossterm::event::read()?) .input_event(crossterm::event::read()?)
.build(); .build();
@ -172,36 +178,30 @@ pub async fn poll_input_events(channel: mpsc::Sender<Event>, kill: CancellationT
} }
} }
pub async fn poll_matrix_events_stage_2(channel: mpsc::Sender<Event>, app: &App<'_>) -> Result<()> { async fn poll_matrix_events_stage_2(channel: mpsc::Sender<Event>, client: Client) -> Result<()> {
app.accounts_manager.client(); let sync_settings = SyncSettings::default();
// .token(sync_token)
// .timeout(Duration::from_secs(30));
let client = match app.client(){ let tx = &channel;
Some(c) => c,
None => return Err(Error::msg("Failed to fetch current client")),
};
client.add_event_handler_context(channel.clone()); client.sync_with_callback(sync_settings, |response| async move {
client.add_event_handler(on_stripped_state_member); let event = EventBuilder::default()
.matrix_event(response)
.build();
match tx.send(event).await {
Ok(_) => LoopCtrl::Continue,
Err(_) => LoopCtrl::Break,
}
}).await?;
Ok(()) Ok(())
} }
pub async fn poll_matrix_events(channel: mpsc::Sender<Event>, kill: CancellationToken, app: &App<'_>) -> Result<()> { pub async fn poll_matrix_events(channel: mpsc::Sender<Event>, kill: CancellationToken, client: Client) -> Result<()> {
tokio::select! { tokio::select! {
output = poll_matrix_events_stage_2(channel, app) => output, output = poll_matrix_events_stage_2(channel, client) => output,
_ = kill.cancelled() => Err(Error::msg("received kill signal")) _ = kill.cancelled() => Err(Error::msg("received kill signal")),
} }
} }
async fn on_stripped_state_member(
room_member: StrippedRoomMemberEvent,
client: Client,
room: Room,
context: Ctx<mpsc::Sender<Event>>
) -> Result<()> {
let event = EventBuilder::default()
.build();
context.send(event).await?;
Ok(())
}

View File

@ -28,7 +28,10 @@ pub struct App<'a> {
accounts_manager: accounts::AccountsManager, accounts_manager: accounts::AccountsManager,
status: Status, status: Status,
channel_tx: mpsc::Sender<event::Event>,
channel_rx: mpsc::Receiver<event::Event>,
input_listener_killer: CancellationToken, input_listener_killer: CancellationToken,
matrix_listener_killer: CancellationToken,
} }
impl Drop for App<'_> { impl Drop for App<'_> {
@ -47,23 +50,31 @@ impl App<'_> {
None None
}; };
let (channel_tx, channel_rx) = mpsc::channel(256);
Self { Self {
ui: ui::UI::new(), ui: ui::UI::new(),
accounts_manager: AccountsManager::new(config), accounts_manager: AccountsManager::new(config),
status: Status::default(), status: Status::default(),
channel_tx,
channel_rx,
input_listener_killer: CancellationToken::new(), input_listener_killer: CancellationToken::new(),
matrix_listener_killer: CancellationToken::new(),
} }
} }
pub async fn run(&mut self) -> Result<()> { pub async fn run(&mut self) -> Result<()> {
let (channel_tx, mut channel_rx) = mpsc::channel(256);
// Spawn input event listener // Spawn input event listener
tokio::task::spawn(event::poll_input_events(channel_tx.clone(), self.input_listener_killer.clone())); tokio::task::spawn(event::poll_input_events(self.channel_tx.clone(), self.input_listener_killer.clone()));
if self.account().is_err() { if self.account().is_err() {
info!("No saved sessions found -> jumping into setup"); info!("No saved sessions found -> jumping into setup");
self.setup(&mut channel_rx).await?; self.setup().await?;
} else {
self.accounts_manager.restore().await?;
self.init_account().await?;
} }
@ -71,7 +82,7 @@ impl App<'_> {
self.status.set_state(State::Main); self.status.set_state(State::Main);
self.ui.update(&self.status).await?; self.ui.update(&self.status).await?;
let event: event::Event = match channel_rx.recv().await { let event: event::Event = match self.channel_rx.recv().await {
Some(e) => e, Some(e) => e,
None => return Err(Error::msg("Event channel has no senders")) None => return Err(Error::msg("Event channel has no senders"))
}; };
@ -87,14 +98,14 @@ impl App<'_> {
Ok(()) Ok(())
} }
async fn setup(&mut self, receiver: &mut mpsc::Receiver<event::Event>) -> Result<()> { async fn setup(&mut self) -> Result<()> {
self.ui.setup_ui = Some(ui::SetupUI::new()); self.ui.setup_ui = Some(ui::SetupUI::new());
loop { loop {
self.status.set_state(State::Setup); self.status.set_state(State::Setup);
self.ui.update_setup().await?; self.ui.update_setup().await?;
let event: event::Event = match receiver.recv().await { let event: event::Event = match self.channel_rx.recv().await {
Some(e) => e, Some(e) => e,
None => return Err(Error::msg("Event channel has no senders")) None => return Err(Error::msg("Event channel has no senders"))
}; };
@ -111,7 +122,14 @@ impl App<'_> {
let client = match self.client() { let client = match self.client() {
Some(c) => c, Some(c) => c,
None => return Err(Error::msg("failed to get current client")) None => return Err(Error::msg("failed to get current client"))
}; }.clone();
if !self.matrix_listener_killer.is_cancelled() {
self.matrix_listener_killer.cancel();
self.matrix_listener_killer = CancellationToken::new();
}
tokio::task::spawn(event::poll_matrix_events(self.channel_tx.clone(), self.matrix_listener_killer.clone(), client));
info!("Initializing client for the current account"); info!("Initializing client for the current account");
@ -141,7 +159,7 @@ impl App<'_> {
} }
} }
pub fn client(&self) -> &Option<Client> { pub fn client(&self) -> Option<&Client> {
self.accounts_manager.client() self.accounts_manager.client()
} }
} }