Skip to content

Commit

Permalink
Cleanups and new inner services
Browse files Browse the repository at this point in the history
- Remove unused deps
- Use if let ... on loop { ... } instead of while let Some(...)
- Switch to using the ChoiceService model
- Remove attempt at "generic handlers"; handled by ChoiceService
  • Loading branch information
TTWNO committed Jun 17, 2024
1 parent beb33e6 commit 81f1d1c
Showing 1 changed file with 47 additions and 47 deletions.
94 changes: 47 additions & 47 deletions odilia/src/tower/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

use crate::state::ScreenReaderState;
use crate::tower::{
async_try::AsyncTryIntoLayer, from_state::TryFromState, service_set::ServiceSet,
state_svc::StateLayer, sync_try::TryIntoLayer, Handler,
async_try::AsyncTryIntoLayer, choice::ChoiceService, choice::Chooser,
from_state::TryFromState, service_set::ServiceSet, state_svc::StateLayer,
sync_try::TryIntoLayer, Handler,
};
use atspi::AtspiError;
use atspi::BusProperties;
Expand All @@ -15,7 +16,6 @@ use std::fmt::Debug;
use std::sync::Arc;

use futures::{Stream, StreamExt};
use std::collections::{BTreeMap, HashMap};

use tower::util::BoxCloneService;
use tower::Layer;
Expand Down Expand Up @@ -45,54 +45,50 @@ where
}
}

#[derive(Clone)]
pub struct GenericHandlers<Req, Res, Find, E> {
handlers: BTreeMap<Find, ServiceSet<BoxCloneService<Req, Res, E>>>,
}
impl<Req, Res, Find, E> GenericHandlers<Req, Res, Find, E> {
pub fn new() -> Self {
GenericHandlers {
handlers: BTreeMap::new(),
}
}
pub fn add_listener(mut self, svc: BoxCloneService<Req, Res, E>, find: Find) -> Self
where Find: Ord {
self.handlers.entry(find).or_default().push(svc);
self
}
}

type Response = Vec<Command>;
type Request = Event;
type Error = OdiliaError;

type AtspiHandler = BoxCloneService<Event, Vec<Command>, Error>;
type CommandHandler = BoxCloneService<Command, (), Error>;

impl Chooser<(&'static str, &'static str)> for Event {
fn identifier(&self) -> (&'static str, &'static str) {
(self.interface(), self.member())
}
}
impl Chooser<CommandDiscriminants> for Command {
fn identifier(&self) -> CommandDiscriminants {
self.ctype()
}
}

pub struct Handlers {
state: Arc<ScreenReaderState>,
atspi: HashMap<(&'static str, &'static str), ServiceSet<AtspiHandler>>,
command: BTreeMap<CommandDiscriminants, ServiceSet<CommandHandler>>,
atspi: ChoiceService<(&'static str, &'static str), ServiceSet<AtspiHandler>, Event>,
command: ChoiceService<CommandDiscriminants, ServiceSet<CommandHandler>, Command>,
}

impl Handlers {
pub fn new(state: Arc<ScreenReaderState>) -> Self {
Handlers { state, atspi: HashMap::new(), command: BTreeMap::new() }
Handlers { state, atspi: ChoiceService::new(), command: ChoiceService::new() }
}
pub async fn command_handler(mut self, mut commands: Receiver<Command>) {
while let Some(cmd) = commands.recv().await {
let dn = cmd.ctype();
loop {
let maybe_cmd = commands.recv().await;
let cmd = if let Some(cmd) = maybe_cmd {
cmd
} else {
tracing::error!("Error cmd: {maybe_cmd:?}");
continue;
};
// NOTE: Why not use join_all(...) ?
// Because this drives the futures concurrently, and we want ordered handlers.
// Otherwise, we cannot guarentee that the caching functions get run first.
// we could move caching to a separate, ordered system, then parallelize the other functions,
// if we determine this is a performance problem.
if let Some(hand_fn) = self.command.get_mut(&dn) {
if let Err(e) = hand_fn.call(cmd).await {
tracing::error!("{e:?}");
}
} else {
tracing::trace!("There are no associated handler functions for the command '{}'", cmd.ctype());
if let Err(e) = self.command.call(cmd).await {
tracing::error!("{e:?}");
}
}
}
Expand All @@ -102,25 +98,27 @@ impl Handlers {
R: Stream<Item = Result<Event, AtspiError>> + Unpin,
{
std::pin::pin!(&mut events);
while let Some(Ok(ev)) = events.next().await {
let dn = (ev.member(), ev.interface());
loop {
let maybe_ev = events.next().await;
let ev = if let Some(Ok(ev)) = maybe_ev {
ev
} else {
tracing::error!("Error in processing {maybe_ev:?}");
continue;
};
let (interface, member) = (ev.member(), ev.interface());
// NOTE: Why not use join_all(...) ?
// Because this drives the futures concurrently, and we want ordered handlers.
// Otherwise, we cannot guarentee that the caching functions get run first.
// we could move caching to a separate, ordered system, then parallelize the other functions,
// if we determine this is a performance problem.
let mut results = vec![];
match self.atspi.get_mut(&dn) {
Some(hand) => {
results = hand
.call(ev)
.await
.expect("ServiceSet failed to uphold its contract");
}
None => {
tracing::trace!("There are no associated handler functions for {}:{}", ev.interface(), ev.member());
}
}
let results =
if let Ok(res) = self.atspi.call(ev).await {
res
} else {
tracing::error!("There are no associated handler functions for {}:{}", interface, member);
continue;
};
for res in results {
match res {
Ok(oks) => {
Expand Down Expand Up @@ -162,7 +160,9 @@ impl Handlers {
// this is safe because we wrap the service in a Reuslt<R, Infallible> so that we can preserve
// any return type we want, including ones with no errors
// R -> Result<(), Error>
let hand_service = handler.into_service::<R>().map_result(|r| r.unwrap().into());
let hand_service = handler
.into_service::<R>()
.map_result(|r| r.expect("This should never fail").into());
// Service(<Arc<S>, C>) -> T
let params_service = params_layer.layer(hand_service);
// Service<C> -> (Arc<S>, C)
Expand Down Expand Up @@ -200,7 +200,7 @@ impl Handlers {
let ti_layer: TryIntoLayer<E, Request> = TryIntoLayer::new();
let ws = handler
.into_service::<R>()
.map_result(|r| r.unwrap().try_into_commands());
.map_result(|r| r.expect("This should never fail").try_into_commands());
let ie_serv = ie_layer.layer(ws);
let ch_serv = state_layer.layer(ie_serv);
let serv = ti_layer.layer(ch_serv);
Expand Down

0 comments on commit 81f1d1c

Please sign in to comment.