From ad01e645451bcb1340b64d5241cfa5dff4caaf0e Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Thu, 8 Aug 2024 17:19:31 +0200 Subject: [PATCH] feat(sdk): Subscribe to many rooms only via Sliding Sync. This patch changes the `SlidingSync::subscribe_to_room` method to `subscribe_to_rooms`. Note the plural form. It's now mandatory to subscribe to a set of rooms. The idea is to avoid calling this method repeatedly. Why? Because each time the method is called, it sends a `SlidingSyncInternalMessage` of kind `SyncLoopSkipOverCurrentIteration`, i.e. it cancels the in-flight sliding sync request, to start over with a new one (with the new room subscription). A problem arises when the async runtime (here, Tokio) is busy: in this case, the internal message channel can be filled pretty easily because its size is 8. Messages are not consumed as fast as they are inserted. By changing this API: subscribing to multiple rooms will result in a single internal message, instead of one per room. Consequently, the rest of the patch moves the `subscribe` method of `room_list_service::Room` to `room_list_service::RoomListService` because it now concerns multiple rooms instead of a single one. --- bindings/matrix-sdk-ffi/src/room_list.rs | 24 +++++++++++--- .../matrix-sdk-ui/src/notification_client.rs | 4 +-- .../src/room_list_service/mod.rs | 24 ++++++++++++++ .../src/room_list_service/room.rs | 25 +------------- .../tests/integration/room_list_service.rs | 23 ++++++------- crates/matrix-sdk/src/sliding_sync/README.md | 2 +- crates/matrix-sdk/src/sliding_sync/mod.rs | 33 +++++++++---------- labs/multiverse/src/main.rs | 2 +- 8 files changed, 77 insertions(+), 60 deletions(-) diff --git a/bindings/matrix-sdk-ffi/src/room_list.rs b/bindings/matrix-sdk-ffi/src/room_list.rs index f0e2f5ba607..8b61352b2ad 100644 --- a/bindings/matrix-sdk-ffi/src/room_list.rs +++ b/bindings/matrix-sdk-ffi/src/room_list.rs @@ -132,6 +132,26 @@ impl RoomListService { } }))) } + + fn subscribe_to_rooms( + &self, + room_ids: Vec, + settings: Option, + ) -> Result<(), RoomListError> { + let room_ids = room_ids + .into_iter() + .map(|room_id| { + RoomId::parse(&room_id).map_err(|_| RoomListError::InvalidRoomId { error: room_id }) + }) + .collect::, _>>()?; + + self.inner.subscribe_to_rooms( + &room_ids.iter().map(AsRef::as_ref).collect::>(), + settings.map(Into::into), + ); + + Ok(()) + } } #[derive(uniffi::Object)] @@ -649,10 +669,6 @@ impl RoomListItem { self.inner.is_encrypted().await.unwrap_or(false) } - fn subscribe(&self, settings: Option) { - self.inner.subscribe(settings.map(Into::into)); - } - async fn latest_event(&self) -> Option> { self.inner.latest_event().await.map(EventTimelineItem).map(Arc::new) } diff --git a/crates/matrix-sdk-ui/src/notification_client.rs b/crates/matrix-sdk-ui/src/notification_client.rs index 4b323f9f14c..2760d235415 100644 --- a/crates/matrix-sdk-ui/src/notification_client.rs +++ b/crates/matrix-sdk-ui/src/notification_client.rs @@ -373,8 +373,8 @@ impl NotificationClient { .build() .await?; - sync.subscribe_to_room( - room_id.to_owned(), + sync.subscribe_to_rooms( + &[room_id], Some(assign!(http::request::RoomSubscription::default(), { required_state, timeline_limit: Some(uint!(16)) diff --git a/crates/matrix-sdk-ui/src/room_list_service/mod.rs b/crates/matrix-sdk-ui/src/room_list_service/mod.rs index 41f298a69fb..2a76e410f76 100644 --- a/crates/matrix-sdk-ui/src/room_list_service/mod.rs +++ b/crates/matrix-sdk-ui/src/room_list_service/mod.rs @@ -365,6 +365,30 @@ impl RoomListService { )) } + /// Subscribe to rooms. + /// + /// It means that all events from these rooms will be received every time, + /// no matter how the `RoomList` is configured. + pub fn subscribe_to_rooms( + &self, + room_ids: &[&RoomId], + settings: Option, + ) { + let mut settings = settings.unwrap_or_default(); + + // Make sure to always include the room creation event in the required state + // events, to know what the room version is. + if !settings + .required_state + .iter() + .any(|(event_type, _state_key)| *event_type == StateEventType::RoomCreate) + { + settings.required_state.push((StateEventType::RoomCreate, "".to_owned())); + } + + self.sliding_sync.subscribe_to_rooms(room_ids, Some(settings)) + } + #[cfg(test)] pub fn sliding_sync(&self) -> &SlidingSync { &self.sliding_sync diff --git a/crates/matrix-sdk-ui/src/room_list_service/room.rs b/crates/matrix-sdk-ui/src/room_list_service/room.rs index 946a8c399ff..cad3e845e84 100644 --- a/crates/matrix-sdk-ui/src/room_list_service/room.rs +++ b/crates/matrix-sdk-ui/src/room_list_service/room.rs @@ -19,8 +19,7 @@ use std::{ops::Deref, sync::Arc}; use async_once_cell::OnceCell as AsyncOnceCell; use matrix_sdk::SlidingSync; -use matrix_sdk_base::sliding_sync::http; -use ruma::{events::StateEventType, RoomId}; +use ruma::RoomId; use super::Error; use crate::{ @@ -88,28 +87,6 @@ impl Room { &self.inner.room } - /// Subscribe to this room. - /// - /// It means that all events from this room will be received every time, no - /// matter how the `RoomList` is configured. - pub fn subscribe(&self, settings: Option) { - let mut settings = settings.unwrap_or_default(); - - // Make sure to always include the room creation event in the required state - // events, to know what the room version is. - if !settings - .required_state - .iter() - .any(|(event_type, _state_key)| *event_type == StateEventType::RoomCreate) - { - settings.required_state.push((StateEventType::RoomCreate, "".to_owned())); - } - - self.inner - .sliding_sync - .subscribe_to_room(self.inner.room.room_id().to_owned(), Some(settings)) - } - /// Get the timeline of the room if one exists. pub fn timeline(&self) -> Option> { self.inner.timeline.get().cloned() diff --git a/crates/matrix-sdk-ui/tests/integration/room_list_service.rs b/crates/matrix-sdk-ui/tests/integration/room_list_service.rs index 8917390b5c3..e4dbc4bb661 100644 --- a/crates/matrix-sdk-ui/tests/integration/room_list_service.rs +++ b/crates/matrix-sdk-ui/tests/integration/room_list_service.rs @@ -2128,19 +2128,20 @@ async fn test_room_subscription() -> Result<(), Error> { }, }; - let room1 = room_list.room(room_id_1).unwrap(); - // Subscribe. - room1.subscribe(Some(assign!(RoomSubscription::default(), { - required_state: vec![ - (StateEventType::RoomName, "".to_owned()), - (StateEventType::RoomTopic, "".to_owned()), - (StateEventType::RoomAvatar, "".to_owned()), - (StateEventType::RoomCanonicalAlias, "".to_owned()), - ], - timeline_limit: Some(uint!(30)), - }))); + room_list.subscribe_to_rooms( + &[room_id_1], + Some(assign!(RoomSubscription::default(), { + required_state: vec![ + (StateEventType::RoomName, "".to_owned()), + (StateEventType::RoomTopic, "".to_owned()), + (StateEventType::RoomAvatar, "".to_owned()), + (StateEventType::RoomCanonicalAlias, "".to_owned()), + ], + timeline_limit: Some(uint!(30)), + })), + ); sync_then_assert_request_and_fake_response! { [server, room_list, sync] diff --git a/crates/matrix-sdk/src/sliding_sync/README.md b/crates/matrix-sdk/src/sliding_sync/README.md index df3ec5a2cb1..504a8168382 100644 --- a/crates/matrix-sdk/src/sliding_sync/README.md +++ b/crates/matrix-sdk/src/sliding_sync/README.md @@ -139,7 +139,7 @@ Notably, this map only knows about the rooms that have come down [Sliding Sync protocol][MSC] and if the given room isn't in any active list range, it may be stale. Additionally to selecting the room data via the room lists, the [Sliding Sync protocol][MSC] allows to subscribe to specific rooms via -the [`subscribe_to_room()`](SlidingSync::subscribe_to_room). Any room subscribed +the [`subscribe_to_rooms()`](SlidingSync::subscribe_to_rooms). Any room subscribed to will receive updates (with the given settings) regardless of whether they are visible in any list. The most common case for using this API is when the user enters a room - as we want to receive the incoming new messages regardless of diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index c3748b1d408..953f34a2947 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -143,26 +143,26 @@ impl SlidingSync { SlidingSyncBuilder::new(id, client) } - /// Subscribe to a given room. + /// Subscribe to many rooms. /// - /// If the associated `Room` exists, it will be marked as + /// If the associated `Room`s exist, it will be marked as /// members are missing, so that it ensures to re-fetch all members. - pub fn subscribe_to_room( + pub fn subscribe_to_rooms( &self, - room_id: OwnedRoomId, + room_ids: &[&RoomId], settings: Option, ) { - if let Some(room) = self.inner.client.get_room(&room_id) { - room.mark_members_missing(); - } + let settings = settings.unwrap_or_default(); + let mut sticky = self.inner.sticky.write().unwrap(); + let room_subscriptions = &mut sticky.data_mut().room_subscriptions; - self.inner - .sticky - .write() - .unwrap() - .data_mut() - .room_subscriptions - .insert(room_id, settings.unwrap_or_default()); + for room_id in room_ids { + if let Some(room) = self.inner.client.get_room(room_id) { + room.mark_members_missing(); + } + + room_subscriptions.insert((*room_id).to_owned(), settings.clone()); + } self.inner.internal_channel_send_if_possible( SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration, @@ -1081,7 +1081,7 @@ mod tests { } #[async_test] - async fn test_subscribe_to_room() -> Result<()> { + async fn test_subscribe_to_rooms() -> Result<()> { let (server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo") .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))]) .await?; @@ -1150,8 +1150,7 @@ mod tests { // Members are now synced! We can start subscribing and see how it goes. assert!(room0.are_members_synced()); - sliding_sync.subscribe_to_room(room_id_0.to_owned(), None); - sliding_sync.subscribe_to_room(room_id_1.to_owned(), None); + sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None); // OK, we have subscribed to some rooms. Let's check on `room0` if members are // now marked as not synced. diff --git a/labs/multiverse/src/main.rs b/labs/multiverse/src/main.rs index db7c765b8c9..f7a00a5a5ba 100644 --- a/labs/multiverse/src/main.rs +++ b/labs/multiverse/src/main.rs @@ -398,7 +398,7 @@ impl App { .get_selected_room_id(Some(selected)) .and_then(|room_id| self.ui_rooms.lock().unwrap().get(&room_id).cloned()) { - room.subscribe(None); + self.sync_service.room_list_service().subscribe_to_rooms(&[room.room_id()], None); self.current_room_subscription = Some(room); } }