Skip to content

Commit

Permalink
sdk-ui: add fn Room::event_with_config
Browse files Browse the repository at this point in the history
This method works the same as `Room::event` but you can provide a custom `RequestConfig` to it.

It's especially useful for the pinned events timeline, since we need a max number of retries and a max number of concurrent requests. With this we can remove some unnecessary complexity.
  • Loading branch information
jmartinesp committed Aug 7, 2024
1 parent c3848ca commit 4dc071e
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 23 deletions.
44 changes: 23 additions & 21 deletions crates/matrix-sdk-ui/src/timeline/pinned_events_loader.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use std::{fmt::Formatter, sync::Arc};
use std::{fmt::Formatter, num::NonZeroUsize, sync::Arc};

use itertools::Itertools;
use matrix_sdk::{
event_cache::paginator::PaginatorError, pinned_events_cache::PinnedEventCache, Room,
SendOutsideWasm, SyncOutsideWasm,
config::RequestConfig, event_cache::paginator::PaginatorError,
pinned_events_cache::PinnedEventCache, Room, SendOutsideWasm, SyncOutsideWasm,
};
use matrix_sdk_base::deserialized_responses::SyncTimelineEvent;
use ruma::{EventId, MilliSecondsSinceUnixEpoch, OwnedEventId};
use thiserror::Error;
use tokio::sync::Semaphore;
use tracing::info;

const MAX_CONCURRENT_REQUESTS: usize = 10;
Expand Down Expand Up @@ -65,25 +64,23 @@ impl PinnedEventsLoader {
}

if !event_ids_to_request.is_empty() {
let semaphore = Arc::new(Semaphore::new(self.max_concurrent_requests));
let provider = Arc::new(self.room.clone());
let mut handles = Vec::new();

let config = Some(
RequestConfig::default()
.retry_limit(3)
.max_concurrent_requests(NonZeroUsize::new(self.max_concurrent_requests)),
);

for id in event_ids_to_request {
handles.push(tokio::spawn({
let semaphore = Arc::clone(&semaphore);
let provider = Arc::clone(&provider);
async move {
let permit = semaphore
.acquire()
.await
.map_err(|_| PinnedEventsLoaderError::SemaphoreNotAcquired)?;
let ret = provider
.event(&id)
provider
.event_with_config(&id, config)
.await
.map_err(|_| PinnedEventsLoaderError::EventNotFound(id.to_owned()));
drop(permit);
ret
.map_err(|_| PinnedEventsLoaderError::EventNotFound(id.to_owned()))
}
}));
}
Expand Down Expand Up @@ -152,7 +149,11 @@ impl PinnedEventsLoader {
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
pub trait PinnedEventsRoom: SendOutsideWasm + SyncOutsideWasm {
/// Load a single room event.
async fn event(&self, event_id: &EventId) -> Result<SyncTimelineEvent, PaginatorError>;
async fn event_with_config(
&self,
event_id: &EventId,
request_config: Option<RequestConfig>,
) -> Result<SyncTimelineEvent, PaginatorError>;

/// Get the pinned event ids for a room.
fn pinned_event_ids(&self) -> Vec<OwnedEventId>;
Expand All @@ -167,8 +168,12 @@ pub trait PinnedEventsRoom: SendOutsideWasm + SyncOutsideWasm {
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl PinnedEventsRoom for Room {
async fn event(&self, event_id: &EventId) -> Result<SyncTimelineEvent, PaginatorError> {
self.event(event_id)
async fn event_with_config(
&self,
event_id: &EventId,
request_config: Option<RequestConfig>,
) -> Result<SyncTimelineEvent, PaginatorError> {
self.event_with_config(event_id, request_config)
.await
.map(|e| e.into())
.map_err(|err| PaginatorError::SdkError(Box::new(err)))
Expand All @@ -195,9 +200,6 @@ impl std::fmt::Debug for PinnedEventsLoader {
/// Errors related to `PinnedEventsLoader` usage.
#[derive(Error, Debug)]
pub enum PinnedEventsLoaderError {
#[error("Semaphore for requests couldn't be acquired. It was probably aborted.")]
SemaphoreNotAcquired,

#[error("No event found for the given event id.")]
EventNotFound(OwnedEventId),

Expand Down
7 changes: 6 additions & 1 deletion crates/matrix-sdk-ui/src/timeline/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use futures_core::Stream;
use futures_util::{FutureExt, StreamExt};
use indexmap::IndexMap;
use matrix_sdk::{
config::RequestConfig,
deserialized_responses::{SyncTimelineEvent, TimelineEvent},
event_cache::paginator::{PaginableRoom, PaginatorError},
room::{EventWithContextResponse, Messages, MessagesOptions},
Expand Down Expand Up @@ -321,7 +322,11 @@ impl PaginableRoom for TestRoomDataProvider {
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
impl PinnedEventsRoom for TestRoomDataProvider {
async fn event(&self, _event_id: &EventId) -> Result<SyncTimelineEvent, PaginatorError> {
async fn event_with_config(
&self,
_event_id: &EventId,
_config: Option<RequestConfig>,
) -> Result<SyncTimelineEvent, PaginatorError> {
unimplemented!();
}

Expand Down
12 changes: 11 additions & 1 deletion crates/matrix-sdk/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,19 @@ impl Room {

/// Fetch the event with the given `EventId` in this room.
pub async fn event(&self, event_id: &EventId) -> Result<TimelineEvent> {
self.event_with_config(event_id, None).await
}

/// Fetch the event with the given `EventId` in this room, using the
/// provided `RequestConfig`.
pub async fn event_with_config(
&self,
event_id: &EventId,
request_config: Option<RequestConfig>,
) -> Result<TimelineEvent> {
let request =
get_room_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
let event = self.client.send(request, None).await?.event;
let event = self.client.send(request, request_config).await?.event;
self.try_decrypt_event(event).await
}

Expand Down

0 comments on commit 4dc071e

Please sign in to comment.