forked from trinitrix/core
1
0
Fork 0

Fix(lua_command::handle): Move lua_command handler to separate thread

This makes it possible to have lua code execute commands and receive
their output value, without risking a deadlock.
This commit is contained in:
Benedikt Peetz 2023-07-24 23:45:44 +02:00
parent c7a4d5a8ab
commit 1fe04ca5c6
Signed by: bpeetz
GPG Key ID: A5E94010C3A642AD
2 changed files with 69 additions and 57 deletions

View File

@ -1,38 +1,14 @@
use std::{sync::Arc, time::Duration}; use anyhow::Result;
use cli_log::trace;
use anyhow::{Context, Result};
use cli_log::{debug, info};
use tokio::{task, time::timeout};
use crate::app::{events::event_types::EventStatus, App}; use crate::app::{events::event_types::EventStatus, App};
// This function is here mainly to reserve this spot for further processing of the lua command.
// TODO(@Soispha): Move the lua executor thread code from app to this module
pub async fn handle(app: &mut App<'_>, command: String) -> Result<EventStatus> { pub async fn handle(app: &mut App<'_>, command: String) -> Result<EventStatus> {
info!("Recieved ci command: `{command}`; executing.."); trace!("Recieved ci command: `{command}`; executing..");
let local = task::LocalSet::new(); app.lua_command_tx.send(command).await?;
// Run the local task set.
let output = local
.run_until(async move {
let lua = Arc::clone(&app.lua);
debug!("before_handle");
let c_handle = task::spawn_local(async move {
lua.load(&command)
// FIXME this assumes string output only
.eval_async::<String>()
.await
.with_context(|| format!("Failed to execute: `{command}`"))
});
debug!("after_handle");
c_handle
})
.await;
debug!("after_thread");
let output = timeout(Duration::from_secs(10), output)
.await
.context("Failed to join lua command executor")???;
info!("Command returned: `{}`", output);
Ok(EventStatus::Ok) Ok(EventStatus::Ok)
} }

View File

@ -2,22 +2,26 @@ pub mod command_interface;
pub mod events; pub mod events;
pub mod status; pub mod status;
use std::path::Path; use std::{path::Path, thread};
use anyhow::{Context, Error, Result}; use anyhow::{Context, Error, Result};
use cli_log::info; use cli_log::{error, info};
use matrix_sdk::Client; use matrix_sdk::Client;
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use status::{State, Status}; use status::{State, Status};
use tokio::{ use tokio::{
runtime::Builder,
sync::{mpsc, Mutex}, sync::{mpsc, Mutex},
task::LocalSet, task::{self, LocalSet},
}; };
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use crate::{ use crate::{
accounts::{Account, AccountsManager}, accounts::{Account, AccountsManager},
app::{command_interface::generate_ci_functions, events::event_types::Event}, app::{
command_interface::{generate_ci_functions, Command},
events::event_types::Event,
},
ui::{central, setup}, ui::{central, setup},
}; };
@ -39,34 +43,66 @@ pub struct App<'ui> {
impl App<'_> { impl App<'_> {
pub fn new() -> Result<Self> { pub fn new() -> Result<Self> {
fn set_up_lua(tx: mpsc::Sender<Event>) -> mpsc::Sender<String> { fn set_up_lua(event_call_tx: mpsc::Sender<Event>) -> mpsc::Sender<String> {
async fn exec_lua_command(
command: &str,
event_call_tx: mpsc::Sender<Event>,
) -> Result<()> {
let second_event_call_tx = event_call_tx.clone();
let lua = LUA
.get_or_init(|| {
Mutex::new(generate_ci_functions(
mlua::Lua::new(),
second_event_call_tx,
))
})
.lock()
.await;
info!("Recieved command to execute: `{}`", &command);
let output = lua
.load(command)
// FIXME this assumes string output only
.eval_async::<String>()
.await;
match output {
Ok(out) => {
info!("Function `{}` returned: `{}`", command, out);
}
Err(err) => {
error!("Function `{}` returned error: `{}`", command, err);
event_call_tx
.send(Event::CommandEvent(
Command::RaiseError(err.to_string()),
None,
))
.await?;
}
};
Ok(())
}
info!("Setting up Lua context.."); info!("Setting up Lua context..");
static LUA: OnceCell<Mutex<mlua::Lua>> = OnceCell::new(); static LUA: OnceCell<Mutex<mlua::Lua>> = OnceCell::new();
let (lua_command_tx, mut rx) = mpsc::channel(256); let (lua_command_tx, mut rx) = mpsc::channel::<String>(256);
let local_set = LocalSet::new(); thread::spawn(move || {
local_set.spawn_local(async move { let rt = Builder::new_current_thread().enable_all().build().expect(
let lua = LUA "Should always be able to build tokio runtime for lua command handling",
.get_or_init(|| Mutex::new(generate_ci_functions(mlua::Lua::new(), tx))) );
.lock() let local = LocalSet::new();
.await; local.spawn_local(async move {
info!("Initialized Lua context"); info!("Lua command handling initialized, waiting for commands..");
while let Some(command) = rx.recv().await {
info!("Recieved lua command: {}", &command);
let local_event_call_tx = event_call_tx.clone();
while let Some(command) = rx.recv().await { task::spawn_local(async move {
info!("Recieved command to execute: `{}`", &command); exec_lua_command(&command, local_event_call_tx).await.expect("This should return all relevent errors by other messages, this should never error");
let output = lua });
.load(&command) }
// FIXME this assumes string output only });
.eval_async::<String>() rt.block_on(local);
.await
.with_context(|| format!("Failed to execute: `{command}`"));
info!(
"Function `{}` returned: `{}`",
command,
output.unwrap_or("<returned error>".to_owned())
);
}
}); });
lua_command_tx lua_command_tx