Skip to content

Commit

Permalink
add cancellation tokens in stead of channels for shutting down (#136)
Browse files Browse the repository at this point in the history
* add cancelation token for the notifications task

now, in stead of a channel, the task recieves a cancelation token, monitoring that one. Not much of a change from the previous model, but at least the intent is clearer
untill all tasks gain the ability to use the cancelation token, the sigint signal handler will have to support both options of shutting down

add tokio-utils to workspace dependencies and make tts use cancelation tokens

fix clippy warnings and add cancellation token to the at-spi event processor

add cancellation tokens to another at-spi processor

add cancellation tokens to the input processing handler

add cancellation tokens to the screenreader events processor, the one which handles input events

fix formatting problems reported by check

remove the last use of the shutdown channel from the codebase, the sigint watcher now  only cancels that token

Update ssip-client-async version

Move Receives insated of mutably borrinwg them for the duration of the running of Odilia

Update Cargo.lock

Cargo format

Fix clippy issues

Update cargo lock

* timeout all the things

this introduces a timeout of hardcoded 500 milliseconds to the future which awaits all tasks to complete.
If, for example, a task is stuck somewhere and doesn't answer to cancelation tokens, we will exit the program forcefully, with an error code and the error will be displayed before exitting.
I believe around 500 milliseconds is enough for all tasks to get unstuck, but this will eventually be changeable within the configuration file.
For now though, the odilia is hanging on exit problem should probably be relatively well fixed with this
  • Loading branch information
albertotirla committed Mar 11, 2024
1 parent dc84bd9 commit b77a72d
Show file tree
Hide file tree
Showing 10 changed files with 265 additions and 195 deletions.
296 changes: 184 additions & 112 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ eyre = "0.6.8"
nix = "0.26.2"
serde_json = "1.0.89"
serde = { version = "1.0.194", features = ["derive"] }
ssip-client-async = { default-features = false, features = ["tokio"], version = "0.10.0" }
ssip-client-async = { default-features = false, features = ["tokio"], version = "0.12.0" }
tokio = { version = "^1.22.0", default-features = false, features = ["sync", "macros", "rt", "signal", "tracing"] }
tokio-util = { version = "0.7.10", features = ["rt"] }
tracing = "^0.1.37"
tracing-log = "^0.1.3"
tracing-subscriber = { version = "0.3.16", default-features = false, features = ["env-filter", "parking_lot"] }
Expand Down
1 change: 1 addition & 0 deletions input/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ once_cell = "1.16.0"
serde_json.workspace = true
sysinfo = { version = "0.26.8", default_features = false }
tokio.workspace = true
tokio-util.workspace=true
tracing.workspace = true
7 changes: 4 additions & 3 deletions input/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use std::{
time::{SystemTime, UNIX_EPOCH},
};
use sysinfo::{ProcessExt, System, SystemExt};
use tokio::{fs, io::AsyncReadExt, net::UnixListener, sync::broadcast, sync::mpsc::Sender};
use tokio::{fs, io::AsyncReadExt, net::UnixListener, sync::mpsc::Sender};
use tokio_util::sync::CancellationToken;

fn get_log_file_name() -> String {
let time = if let Ok(n) = SystemTime::now().duration_since(UNIX_EPOCH) {
Expand Down Expand Up @@ -54,7 +55,7 @@ fn get_log_file_name() -> String {
/// If there is no way to get access to the directory, then this function will call `exit(1)`; TODO: should probably return a result instead.
pub async fn sr_event_receiver(
event_sender: Sender<ScreenReaderEvent>,
shutdown_rx: &mut broadcast::Receiver<i32>,
shutdown: CancellationToken,
) -> eyre::Result<()> {
let (pid_file_path, sock_file_path) = get_file_paths();
let log_file_name = get_log_file_name();
Expand Down Expand Up @@ -153,7 +154,7 @@ pub async fn sr_event_receiver(
}
continue;
}
_ = shutdown_rx.recv() => {
() = shutdown.cancelled() => {
tracing::debug!("Shutting down input socker.");
break;
}
Expand Down
1 change: 1 addition & 0 deletions odilia/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ xdg = "2.4.1"
zbus.workspace = true
odilia-notify = { version = "0.1.0", path = "../odilia-notify" }
clap = { version = "4.5.1", features = ["derive"] }
tokio-util.workspace=true

[dev-dependencies]
lazy_static = "1.4.0"
Expand Down
22 changes: 10 additions & 12 deletions odilia/src/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ mod object;
use std::{collections::HashMap, sync::Arc};

use futures::stream::StreamExt;
use tokio::sync::{
broadcast,
mpsc::{Receiver, Sender},
};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio_util::sync::CancellationToken;

use crate::state::ScreenReaderState;
use atspi_client::{accessible_ext::AccessibleExt, convertable::Convertable};
Expand Down Expand Up @@ -66,8 +64,8 @@ pub async fn structural_navigation(

pub async fn sr_event(
state: Arc<ScreenReaderState>,
sr_events: &mut Receiver<ScreenReaderEvent>,
shutdown_rx: &mut broadcast::Receiver<i32>,
mut sr_events: Receiver<ScreenReaderEvent>,
shutdown: CancellationToken,
) -> eyre::Result<()> {
loop {
tokio::select! {
Expand All @@ -94,7 +92,7 @@ pub async fn sr_event(
};
continue;
}
_ = shutdown_rx.recv() => {
() = shutdown.cancelled() => {
tracing::debug!("sr_event cancelled");
break;
}
Expand All @@ -107,7 +105,7 @@ pub async fn sr_event(
pub async fn receive(
state: Arc<ScreenReaderState>,
tx: Sender<Event>,
shutdown_rx: &mut broadcast::Receiver<i32>,
shutdown: CancellationToken,
) {
let events = state.atspi.event_stream();
tokio::pin!(events);
Expand All @@ -123,7 +121,7 @@ pub async fn receive(
}
continue;
}
_ = shutdown_rx.recv() => {
() = shutdown.cancelled() => {
tracing::debug!("receive function is done");
break;
}
Expand All @@ -134,8 +132,8 @@ pub async fn receive(
//#[tracing::instrument(level = "debug")]
pub async fn process(
state: Arc<ScreenReaderState>,
rx: &mut Receiver<Event>,
shutdown_rx: &mut broadcast::Receiver<i32>,
mut rx: Receiver<Event>,
shutdown: CancellationToken,
) {
loop {
tokio::select! {
Expand All @@ -153,7 +151,7 @@ pub async fn process(
};
continue;
}
_ = shutdown_rx.recv() => {
() = shutdown.cancelled() => {
tracing::debug!("process function is done");
break;
}
Expand Down
114 changes: 55 additions & 59 deletions odilia/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,27 @@ mod events;
mod logging;
mod state;

use std::{process::exit, sync::Arc};
use std::{process::exit, sync::Arc, time::Duration};

use crate::cli::Args;
use crate::state::ScreenReaderState;
use clap::Parser;
use eyre::WrapErr;
use futures::{future::FutureExt, StreamExt};
use odilia_input::sr_event_receiver;
use odilia_notify::listen_to_dbus_notifications;
use ssip_client_async::Priority;
use tokio::{
signal::unix::{signal, SignalKind},
sync::broadcast::{self, error::SendError},
sync::mpsc,
time::timeout,
};

use crate::cli::Args;
use crate::state::ScreenReaderState;
use odilia_input::sr_event_receiver;
use odilia_notify::listen_to_dbus_notifications;
use ssip_client_async::Priority;
use tokio_util::{sync::CancellationToken, task::TaskTracker};

use atspi_common::events::{document, object};
async fn notifications_monitor(
state: Arc<ScreenReaderState>,
shutdown_rx: &mut broadcast::Receiver<i32>,
shutdown: CancellationToken,
) -> eyre::Result<()> {
let mut stream = listen_to_dbus_notifications().await?;
loop {
Expand All @@ -43,44 +43,58 @@ async fn notifications_monitor(
format!("new notification: {}, {}.", notification.title, notification.body);
state.say(Priority::Important, notification_message).await;
},
_ = shutdown_rx.recv() => {
() = shutdown.cancelled() => {
tracing::debug!("Shutting down notification task.");
break;
},
}
}
Ok(())
}
async fn sigterm_signal_watcher(shutdown_tx: broadcast::Sender<i32>) -> eyre::Result<()> {
async fn sigterm_signal_watcher(
token: CancellationToken,
tracker: TaskTracker,
) -> eyre::Result<()> {
let timeout_duration = Duration::from_millis(500); //todo: perhaps take this from the configuration file at some point
let mut c = signal(SignalKind::interrupt())?;
tracing::debug!("Watching for Ctrl+C");
c.recv().await;
tracing::debug!("Asking all processes to stop.");
let _: Result<usize, SendError<i32>> = shutdown_tx.send(0);
tracing::debug!("cancelling all tokens");
token.cancel();
tracing::debug!(?timeout_duration, "waiting for all tasks to finish");
timeout(timeout_duration, tracker.wait()).await?;
tracing::debug!("All listeners have stopped.");
tracing::debug!("Goodbye, Odilia!");
Ok(())
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> eyre::Result<()> {
let args = Args::parse();

logging::init();
// Make sure applications with dynamic accessibility supprt do expose their AT-SPI2 interfaces.

//initialize the primary token for task cancelation
let token = CancellationToken::new();

//initialize a task tracker, which will allow us to wait for all tasks to finish
let tracker = TaskTracker::new();

// Make sure applications with dynamic accessibility support do expose their AT-SPI2 interfaces.
if let Err(e) = atspi_connection::set_session_accessibility(true).await {
tracing::debug!("Could not set AT-SPI2 IsEnabled property because: {}", e);
}
let (shutdown_tx, _) = broadcast::channel(1);
let (sr_event_tx, mut sr_event_rx) = mpsc::channel(128);
let (sr_event_tx, sr_event_rx) = mpsc::channel(128);
// this channel must NEVER fill up; it will cause the thread receiving events to deadlock due to a zbus design choice.
// If you need to make it bigger, then make it bigger, but do NOT let it ever fill up.
let (atspi_event_tx, mut atspi_event_rx) = mpsc::channel(128);
let (atspi_event_tx, atspi_event_rx) = mpsc::channel(128);
// this is the chanel which handles all SSIP commands. If SSIP is not allowed to operate on a separate task, then wdaiting for the receiving message can block other long-running operations like structural navigation.
// Although in the future, this may possibly be remidied through a proper cache, I think it still makes sense to separate SSIP's IO operations to a separate task.
// Like the channel above, it is very important that this is *never* full, since it can cause deadlocking if the other task sending the request is working with zbus.
let (ssip_req_tx, ssip_req_rx) = mpsc::channel::<ssip_client_async::tokio::Request>(128);
let (ssip_req_tx, ssip_req_rx) = mpsc::channel::<ssip_client_async::Request>(128);
// Initialize state
let state = Arc::new(ScreenReaderState::new(ssip_req_tx, args.config.as_deref()).await?);
let mut ssip = odilia_tts::create_ssip_client().await?;
let ssip = odilia_tts::create_ssip_client().await?;

if state.say(Priority::Message, "Welcome to Odilia!".to_string()).await {
tracing::debug!("Welcome message spoken.");
Expand All @@ -100,50 +114,32 @@ async fn main() -> eyre::Result<()> {
state.add_cache_match_rule(),
)?;

let mut shutdown_rx_ssip_recv = shutdown_tx.subscribe();
let ssip_event_receiver = odilia_tts::handle_ssip_commands(
&mut ssip,
ssip_req_rx,
&mut shutdown_rx_ssip_recv,
)
.map(|r| r.wrap_err("Could no process SSIP request"));
let mut shutdown_rx_atspi_recv = shutdown_tx.subscribe();
let ssip_event_receiver =
odilia_tts::handle_ssip_commands(ssip, ssip_req_rx, token.clone())
.map(|r| r.wrap_err("Could no process SSIP request"));
let atspi_event_receiver =
events::receive(Arc::clone(&state), atspi_event_tx, &mut shutdown_rx_atspi_recv)
events::receive(Arc::clone(&state), atspi_event_tx, token.clone())
.map(|()| Ok::<_, eyre::Report>(()));
let mut shutdown_rx_atspi_proc_recv = shutdown_tx.subscribe();
let atspi_event_processor = events::process(
Arc::clone(&state),
&mut atspi_event_rx,
&mut shutdown_rx_atspi_proc_recv,
)
.map(|()| Ok::<_, eyre::Report>(()));
let mut shutdown_rx_odilia_recv = shutdown_tx.subscribe();
let odilia_event_receiver = sr_event_receiver(sr_event_tx, &mut shutdown_rx_odilia_recv)
let atspi_event_processor =
events::process(Arc::clone(&state), atspi_event_rx, token.clone())
.map(|()| Ok::<_, eyre::Report>(()));
let odilia_event_receiver = sr_event_receiver(sr_event_tx, token.clone())
.map(|r| r.wrap_err("Could not process Odilia events"));
let mut shutdown_rx_odilia_proc_recv = shutdown_tx.subscribe();
let odilia_event_processor = events::sr_event(
Arc::clone(&state),
&mut sr_event_rx,
&mut shutdown_rx_odilia_proc_recv,
)
.map(|r| r.wrap_err("Could not process Odilia event"));
let mut shutdown_rx_notif = shutdown_tx.subscribe();
let notification_task = notifications_monitor(Arc::clone(&state), &mut shutdown_rx_notif)
.map(|r| r.wrap_err("Could not process signal shutdown."));
let signal_receiver = sigterm_signal_watcher(shutdown_tx)
let odilia_event_processor =
events::sr_event(Arc::clone(&state), sr_event_rx, token.clone())
.map(|r| r.wrap_err("Could not process Odilia event"));
let notification_task = notifications_monitor(Arc::clone(&state), token.clone())
.map(|r| r.wrap_err("Could not process signal shutdown."));

tokio::try_join!(
signal_receiver,
atspi_event_receiver,
atspi_event_processor,
odilia_event_receiver,
odilia_event_processor,
ssip_event_receiver,
notification_task,
)?;
tracing::debug!("All listeners have stopped.");
tracing::debug!("Goodbye, Odilia!");
tracker.spawn(atspi_event_receiver);
tracker.spawn(atspi_event_processor);
tracker.spawn(odilia_event_receiver);
tracker.spawn(odilia_event_processor);
tracker.spawn(ssip_event_receiver);
tracker.spawn(notification_task);
tracker.close();
let _ = sigterm_signal_watcher(token, tracker)
.await
.wrap_err("can not process interrupt signal");
Ok(())
}
2 changes: 1 addition & 1 deletion odilia/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{fs, sync::atomic::AtomicI32};

use circular_queue::CircularQueue;
use eyre::WrapErr;
use ssip_client_async::{tokio::Request as SSIPRequest, MessageScope, Priority};
use ssip_client_async::{MessageScope, Priority, Request as SSIPRequest};
use tokio::sync::{mpsc::Sender, Mutex};
use tracing::debug;
use zbus::{fdo::DBusProxy, names::UniqueName, zvariant::ObjectPath, MatchRule, MessageType};
Expand Down
1 change: 1 addition & 0 deletions tts/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ edition = "2021"
[dependencies]
ssip-client-async.workspace = true
tokio.workspace = true
tokio-util.workspace=true
tracing.workspace = true
eyre.workspace = true
13 changes: 6 additions & 7 deletions tts/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@

use eyre::Context;
use ssip_client_async::{
fifo::asynchronous_tokio::Builder,
tokio::{AsyncClient, Request},
ClientName,
fifo::asynchronous_tokio::Builder, tokio::AsyncClient, ClientName, Request,
};
use std::{
io::ErrorKind,
Expand All @@ -22,8 +20,9 @@ use std::{
use tokio::{
io::{BufReader, BufWriter},
net::unix::{OwnedReadHalf, OwnedWriteHalf},
sync::{broadcast, mpsc::Receiver},
sync::mpsc::Receiver,
};
use tokio_util::sync::CancellationToken;

/// Creates a new async SSIP client which can be sent commends, and can await responses to.
/// # Errors
Expand Down Expand Up @@ -75,9 +74,9 @@ pub async fn create_ssip_client(
/// Errors may also be returned during cleanup via the `shutdown_tx` parameter, since shutting down the connection to speech dispatcher can also potentially error.
/// Any of these failures will result in this function exiting with an `Err(_)` variant.
pub async fn handle_ssip_commands(
client: &mut AsyncClient<BufReader<OwnedReadHalf>, BufWriter<OwnedWriteHalf>>,
mut client: AsyncClient<BufReader<OwnedReadHalf>, BufWriter<OwnedWriteHalf>>,
requests: Receiver<Request>,
shutdown_tx: &mut broadcast::Receiver<i32>,
shutdown: CancellationToken,
) -> eyre::Result<()> {
tokio::pin!(requests);
loop {
Expand All @@ -91,7 +90,7 @@ pub async fn handle_ssip_commands(
tracing::debug!("Response from server: {:#?}", response);
}
}
_ = shutdown_tx.recv() => {
() = shutdown.cancelled() => {
tracing::debug!("Saying goodbye message.");
client
.send(Request::Speak).await?
Expand Down

0 comments on commit b77a72d

Please sign in to comment.