Skip to content

Commit

Permalink
feat(sdk): Subscribe to many rooms only via Sliding Sync.
Browse files Browse the repository at this point in the history
This patch changes the `SlidingSync::subscribe_to_room` method to
`subscribe_to_rooms`. Note the plural form. It's now mandatory to
subscribe to a set of rooms. The idea is to avoid calling this method
repeatedly. Why? Because each time the method is called, it sends a
`SlidingSyncInternalMessage` of kind `SyncLoopSkipOverCurrentIteration`,
i.e. it cancels the in-flight sliding sync request, to start over with
a new one (with the new room subscription). A problem arises when the
async runtime (here, Tokio) is busy: in this case, the internal message
channel can be filled pretty easily because its size is 8. Messages
are not consumed as fast as they are inserted. By changing this API:
subscribing to multiple rooms will result in a single internal message,
instead of one per room.

Consequently, the rest of the patch moves the `subscribe` method of
`room_list_service::Room` to `room_list_service::RoomListService`
because it now concerns multiple rooms instead of a single one.
  • Loading branch information
Hywan committed Aug 8, 2024
1 parent 57963dc commit ad01e64
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 60 deletions.
24 changes: 20 additions & 4 deletions bindings/matrix-sdk-ffi/src/room_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,26 @@ impl RoomListService {
}
})))
}

fn subscribe_to_rooms(
&self,
room_ids: Vec<String>,
settings: Option<RoomSubscription>,
) -> Result<(), RoomListError> {
let room_ids = room_ids
.into_iter()
.map(|room_id| {
RoomId::parse(&room_id).map_err(|_| RoomListError::InvalidRoomId { error: room_id })
})
.collect::<Result<Vec<_>, _>>()?;

self.inner.subscribe_to_rooms(
&room_ids.iter().map(AsRef::as_ref).collect::<Vec<_>>(),
settings.map(Into::into),
);

Ok(())
}
}

#[derive(uniffi::Object)]
Expand Down Expand Up @@ -649,10 +669,6 @@ impl RoomListItem {
self.inner.is_encrypted().await.unwrap_or(false)
}

fn subscribe(&self, settings: Option<RoomSubscription>) {
self.inner.subscribe(settings.map(Into::into));
}

async fn latest_event(&self) -> Option<Arc<EventTimelineItem>> {
self.inner.latest_event().await.map(EventTimelineItem).map(Arc::new)
}
Expand Down
4 changes: 2 additions & 2 deletions crates/matrix-sdk-ui/src/notification_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,8 @@ impl NotificationClient {
.build()
.await?;

sync.subscribe_to_room(
room_id.to_owned(),
sync.subscribe_to_rooms(
&[room_id],
Some(assign!(http::request::RoomSubscription::default(), {
required_state,
timeline_limit: Some(uint!(16))
Expand Down
24 changes: 24 additions & 0 deletions crates/matrix-sdk-ui/src/room_list_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,30 @@ impl RoomListService {
))
}

/// Subscribe to rooms.
///
/// It means that all events from these rooms will be received every time,
/// no matter how the `RoomList` is configured.
pub fn subscribe_to_rooms(
&self,
room_ids: &[&RoomId],
settings: Option<http::request::RoomSubscription>,
) {
let mut settings = settings.unwrap_or_default();

// Make sure to always include the room creation event in the required state
// events, to know what the room version is.
if !settings
.required_state
.iter()
.any(|(event_type, _state_key)| *event_type == StateEventType::RoomCreate)
{
settings.required_state.push((StateEventType::RoomCreate, "".to_owned()));
}

self.sliding_sync.subscribe_to_rooms(room_ids, Some(settings))
}

#[cfg(test)]
pub fn sliding_sync(&self) -> &SlidingSync {
&self.sliding_sync
Expand Down
25 changes: 1 addition & 24 deletions crates/matrix-sdk-ui/src/room_list_service/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use std::{ops::Deref, sync::Arc};

use async_once_cell::OnceCell as AsyncOnceCell;
use matrix_sdk::SlidingSync;
use matrix_sdk_base::sliding_sync::http;
use ruma::{events::StateEventType, RoomId};
use ruma::RoomId;

use super::Error;
use crate::{
Expand Down Expand Up @@ -88,28 +87,6 @@ impl Room {
&self.inner.room
}

/// Subscribe to this room.
///
/// It means that all events from this room will be received every time, no
/// matter how the `RoomList` is configured.
pub fn subscribe(&self, settings: Option<http::request::RoomSubscription>) {
let mut settings = settings.unwrap_or_default();

// Make sure to always include the room creation event in the required state
// events, to know what the room version is.
if !settings
.required_state
.iter()
.any(|(event_type, _state_key)| *event_type == StateEventType::RoomCreate)
{
settings.required_state.push((StateEventType::RoomCreate, "".to_owned()));
}

self.inner
.sliding_sync
.subscribe_to_room(self.inner.room.room_id().to_owned(), Some(settings))
}

/// Get the timeline of the room if one exists.
pub fn timeline(&self) -> Option<Arc<Timeline>> {
self.inner.timeline.get().cloned()
Expand Down
23 changes: 12 additions & 11 deletions crates/matrix-sdk-ui/tests/integration/room_list_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2128,19 +2128,20 @@ async fn test_room_subscription() -> Result<(), Error> {
},
};

let room1 = room_list.room(room_id_1).unwrap();

// Subscribe.

room1.subscribe(Some(assign!(RoomSubscription::default(), {
required_state: vec![
(StateEventType::RoomName, "".to_owned()),
(StateEventType::RoomTopic, "".to_owned()),
(StateEventType::RoomAvatar, "".to_owned()),
(StateEventType::RoomCanonicalAlias, "".to_owned()),
],
timeline_limit: Some(uint!(30)),
})));
room_list.subscribe_to_rooms(
&[room_id_1],
Some(assign!(RoomSubscription::default(), {
required_state: vec![
(StateEventType::RoomName, "".to_owned()),
(StateEventType::RoomTopic, "".to_owned()),
(StateEventType::RoomAvatar, "".to_owned()),
(StateEventType::RoomCanonicalAlias, "".to_owned()),
],
timeline_limit: Some(uint!(30)),
})),
);

sync_then_assert_request_and_fake_response! {
[server, room_list, sync]
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk/src/sliding_sync/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ Notably, this map only knows about the rooms that have come down [Sliding
Sync protocol][MSC] and if the given room isn't in any active list range, it
may be stale. Additionally to selecting the room data via the room lists,
the [Sliding Sync protocol][MSC] allows to subscribe to specific rooms via
the [`subscribe_to_room()`](SlidingSync::subscribe_to_room). Any room subscribed
the [`subscribe_to_rooms()`](SlidingSync::subscribe_to_rooms). Any room subscribed
to will receive updates (with the given settings) regardless of whether they are
visible in any list. The most common case for using this API is when the user
enters a room - as we want to receive the incoming new messages regardless of
Expand Down
33 changes: 16 additions & 17 deletions crates/matrix-sdk/src/sliding_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,26 +143,26 @@ impl SlidingSync {
SlidingSyncBuilder::new(id, client)
}

/// Subscribe to a given room.
/// Subscribe to many rooms.
///
/// If the associated `Room` exists, it will be marked as
/// If the associated `Room`s exist, it will be marked as
/// members are missing, so that it ensures to re-fetch all members.
pub fn subscribe_to_room(
pub fn subscribe_to_rooms(
&self,
room_id: OwnedRoomId,
room_ids: &[&RoomId],
settings: Option<http::request::RoomSubscription>,
) {
if let Some(room) = self.inner.client.get_room(&room_id) {
room.mark_members_missing();
}
let settings = settings.unwrap_or_default();
let mut sticky = self.inner.sticky.write().unwrap();
let room_subscriptions = &mut sticky.data_mut().room_subscriptions;

self.inner
.sticky
.write()
.unwrap()
.data_mut()
.room_subscriptions
.insert(room_id, settings.unwrap_or_default());
for room_id in room_ids {
if let Some(room) = self.inner.client.get_room(room_id) {
room.mark_members_missing();
}

room_subscriptions.insert((*room_id).to_owned(), settings.clone());
}

self.inner.internal_channel_send_if_possible(
SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
Expand Down Expand Up @@ -1081,7 +1081,7 @@ mod tests {
}

#[async_test]
async fn test_subscribe_to_room() -> Result<()> {
async fn test_subscribe_to_rooms() -> Result<()> {
let (server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
.sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
.await?;
Expand Down Expand Up @@ -1150,8 +1150,7 @@ mod tests {
// Members are now synced! We can start subscribing and see how it goes.
assert!(room0.are_members_synced());

sliding_sync.subscribe_to_room(room_id_0.to_owned(), None);
sliding_sync.subscribe_to_room(room_id_1.to_owned(), None);
sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None);

// OK, we have subscribed to some rooms. Let's check on `room0` if members are
// now marked as not synced.
Expand Down
2 changes: 1 addition & 1 deletion labs/multiverse/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ impl App {
.get_selected_room_id(Some(selected))
.and_then(|room_id| self.ui_rooms.lock().unwrap().get(&room_id).cloned())
{
room.subscribe(None);
self.sync_service.room_list_service().subscribe_to_rooms(&[room.room_id()], None);
self.current_room_subscription = Some(room);
}
}
Expand Down

0 comments on commit ad01e64

Please sign in to comment.