diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index a2c53e871..d01e31ff1 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -271,24 +271,27 @@ impl FfiXmtpClient { }) } - pub fn group(&self, group_id: Vec) -> Result { - let convo = self.inner_client.group(group_id)?; + pub fn conversation(&self, conversation_id: Vec) -> Result { + let convo = self.inner_client.group(conversation_id)?; - Ok(FfiGroup { + Ok(FfiConversation { inner_client: self.inner_client.clone(), - group_id: convo.group_id, + conversation_id: convo.group_id, created_at_ns: convo.created_at_ns, }) } - pub fn dm_group(&self, target_inbox_id: String) -> Result { + pub fn dm_conversation( + &self, + target_inbox_id: String, + ) -> Result { let convo = self .inner_client .dm_group_from_target_inbox(target_inbox_id)?; - Ok(FfiGroup { + Ok(FfiConversation { inner_client: self.inner_client.clone(), - group_id: convo.group_id, + conversation_id: convo.group_id, created_at_ns: convo.created_at_ns, }) } @@ -783,7 +786,7 @@ impl FfiConversations { &self, account_addresses: Vec, opts: FfiCreateGroupOptions, - ) -> Result, GenericError> { + ) -> Result, GenericError> { log::info!( "creating group with account addresses: {}", account_addresses.join(", ") @@ -829,23 +832,26 @@ impl FfiConversations { .await? }; - let out = Arc::new(FfiGroup { + let out = Arc::new(FfiConversation { inner_client: self.inner_client.clone(), - group_id: convo.group_id, + conversation_id: convo.group_id, created_at_ns: convo.created_at_ns, }); Ok(out) } - pub async fn create_dm(&self, account_address: String) -> Result, GenericError> { + pub async fn create_dm( + &self, + account_address: String, + ) -> Result, GenericError> { log::info!("creating dm with target address: {}", account_address); let convo = self.inner_client.create_dm(account_address).await?; - let out = Arc::new(FfiGroup { + let out = Arc::new(FfiConversation { inner_client: self.inner_client.clone(), - group_id: convo.group_id, + conversation_id: convo.group_id, created_at_ns: convo.created_at_ns, }); @@ -855,14 +861,14 @@ impl FfiConversations { pub async fn process_streamed_welcome_message( &self, envelope_bytes: Vec, - ) -> Result, GenericError> { + ) -> Result, GenericError> { let inner = self.inner_client.as_ref(); let group = inner .process_streamed_welcome_message(envelope_bytes) .await?; - let out = Arc::new(FfiGroup { + let out = Arc::new(FfiConversation { inner_client: self.inner_client.clone(), - group_id: group.group_id, + conversation_id: group.group_id, created_at_ns: group.created_at_ns, }); Ok(out) @@ -874,10 +880,10 @@ impl FfiConversations { Ok(()) } - pub async fn sync_all_groups(&self) -> Result { + pub async fn sync_all_conversations(&self) -> Result { let inner = self.inner_client.as_ref(); let groups = inner.find_groups(FindGroupParams { - include_dm_groups: true, + conversation_type: None, ..FindGroupParams::default() })?; @@ -902,21 +908,73 @@ impl FfiConversations { pub async fn list( &self, opts: FfiListConversationsOptions, - ) -> Result>, GenericError> { + ) -> Result>, GenericError> { + let inner = self.inner_client.as_ref(); + let convo_list: Vec> = inner + .find_groups(FindGroupParams { + allowed_states: None, + created_after_ns: opts.created_after_ns, + created_before_ns: opts.created_before_ns, + limit: opts.limit, + conversation_type: None, + })? + .into_iter() + .map(|group| { + Arc::new(FfiConversation { + inner_client: self.inner_client.clone(), + conversation_id: group.group_id, + created_at_ns: group.created_at_ns, + }) + }) + .collect(); + + Ok(convo_list) + } + + pub async fn list_groups( + &self, + opts: FfiListConversationsOptions, + ) -> Result>, GenericError> { + let inner = self.inner_client.as_ref(); + let convo_list: Vec> = inner + .find_groups(FindGroupParams { + allowed_states: None, + created_after_ns: opts.created_after_ns, + created_before_ns: opts.created_before_ns, + limit: opts.limit, + conversation_type: Some(ConversationType::Group), + })? + .into_iter() + .map(|group| { + Arc::new(FfiConversation { + inner_client: self.inner_client.clone(), + conversation_id: group.group_id, + created_at_ns: group.created_at_ns, + }) + }) + .collect(); + + Ok(convo_list) + } + + pub async fn list_dms( + &self, + opts: FfiListConversationsOptions, + ) -> Result>, GenericError> { let inner = self.inner_client.as_ref(); - let convo_list: Vec> = inner + let convo_list: Vec> = inner .find_groups(FindGroupParams { allowed_states: None, created_after_ns: opts.created_after_ns, created_before_ns: opts.created_before_ns, limit: opts.limit, - include_dm_groups: false, + conversation_type: Some(ConversationType::Dm), })? .into_iter() .map(|group| { - Arc::new(FfiGroup { + Arc::new(FfiConversation { inner_client: self.inner_client.clone(), - group_id: group.group_id, + conversation_id: group.group_id, created_at_ns: group.created_at_ns, }) }) @@ -925,18 +983,81 @@ impl FfiConversations { Ok(convo_list) } + pub async fn stream_groups( + &self, + callback: Box, + ) -> FfiStreamCloser { + let client = self.inner_client.clone(); + let handle = RustXmtpClient::stream_conversations_with_callback( + client.clone(), + move |convo| { + callback.on_conversation(Arc::new(FfiConversation { + inner_client: client.clone(), + conversation_id: convo.group_id, + created_at_ns: convo.created_at_ns, + })) + }, + Some(ConversationType::Group), + ); + + FfiStreamCloser::new(handle) + } + + pub async fn stream_dms(&self, callback: Box) -> FfiStreamCloser { + let client = self.inner_client.clone(); + let handle = RustXmtpClient::stream_conversations_with_callback( + client.clone(), + move |convo| { + callback.on_conversation(Arc::new(FfiConversation { + inner_client: client.clone(), + conversation_id: convo.group_id, + created_at_ns: convo.created_at_ns, + })) + }, + Some(ConversationType::Dm), + ); + + FfiStreamCloser::new(handle) + } + pub async fn stream(&self, callback: Box) -> FfiStreamCloser { let client = self.inner_client.clone(); let handle = RustXmtpClient::stream_conversations_with_callback( client.clone(), move |convo| { - callback.on_conversation(Arc::new(FfiGroup { + callback.on_conversation(Arc::new(FfiConversation { inner_client: client.clone(), - group_id: convo.group_id, + conversation_id: convo.group_id, created_at_ns: convo.created_at_ns, })) }, - false, + None, + ); + + FfiStreamCloser::new(handle) + } + + pub async fn stream_all_group_messages( + &self, + message_callback: Box, + ) -> FfiStreamCloser { + let handle = RustXmtpClient::stream_all_messages_with_callback( + self.inner_client.clone(), + move |message| message_callback.on_message(message.into()), + Some(ConversationType::Group), + ); + + FfiStreamCloser::new(handle) + } + + pub async fn stream_all_dm_messages( + &self, + message_callback: Box, + ) -> FfiStreamCloser { + let handle = RustXmtpClient::stream_all_messages_with_callback( + self.inner_client.clone(), + move |message| message_callback.on_message(message.into()), + Some(ConversationType::Dm), ); FfiStreamCloser::new(handle) @@ -949,6 +1070,7 @@ impl FfiConversations { let handle = RustXmtpClient::stream_all_messages_with_callback( self.inner_client.clone(), move |message| message_callback.on_message(message.into()), + None, ); FfiStreamCloser::new(handle) @@ -956,14 +1078,14 @@ impl FfiConversations { } #[derive(uniffi::Object)] -pub struct FfiGroup { +pub struct FfiConversation { inner_client: Arc, - group_id: Vec, + conversation_id: Vec, created_at_ns: i64, } #[derive(uniffi::Record)] -pub struct FfiGroupMember { +pub struct FfiConversationMember { pub inbox_id: String, pub account_addresses: Vec, pub installation_ids: Vec>, @@ -1007,7 +1129,7 @@ impl From for ConsentState { #[derive(uniffi::Enum)] pub enum FfiConsentEntityType { - GroupId, + ConversationId, InboxId, Address, } @@ -1015,7 +1137,7 @@ pub enum FfiConsentEntityType { impl From for ConsentType { fn from(entity_type: FfiConsentEntityType) -> Self { match entity_type { - FfiConsentEntityType::GroupId => ConsentType::GroupId, + FfiConsentEntityType::ConversationId => ConsentType::ConversationId, FfiConsentEntityType::InboxId => ConsentType::InboxId, FfiConsentEntityType::Address => ConsentType::Address, } @@ -1052,11 +1174,11 @@ impl FfiCreateGroupOptions { } #[uniffi::export(async_runtime = "tokio")] -impl FfiGroup { +impl FfiConversation { pub async fn send(&self, content_bytes: Vec) -> Result, GenericError> { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1070,7 +1192,7 @@ impl FfiGroup { pub fn send_optimistic(&self, content_bytes: Vec) -> Result, GenericError> { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1083,7 +1205,7 @@ impl FfiGroup { pub async fn publish_messages(&self) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); group.publish_messages(&self.inner_client).await?; @@ -1093,7 +1215,7 @@ impl FfiGroup { pub async fn sync(&self) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1108,7 +1230,7 @@ impl FfiGroup { ) -> Result, GenericError> { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1129,13 +1251,13 @@ impl FfiGroup { Ok(messages) } - pub async fn process_streamed_group_message( + pub async fn process_streamed_conversation_message( &self, envelope_bytes: Vec, ) -> Result { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); let message = group @@ -1146,18 +1268,18 @@ impl FfiGroup { Ok(ffi_message) } - pub async fn list_members(&self) -> Result, GenericError> { + pub async fn list_members(&self) -> Result, GenericError> { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); - let members: Vec = group + let members: Vec = group .members(&self.inner_client) .await? .into_iter() - .map(|member| FfiGroupMember { + .map(|member| FfiConversationMember { inbox_id: member.inbox_id, account_addresses: member.account_addresses, installation_ids: member.installation_ids, @@ -1178,7 +1300,7 @@ impl FfiGroup { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1197,7 +1319,7 @@ impl FfiGroup { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1211,7 +1333,7 @@ impl FfiGroup { pub async fn remove_members(&self, account_addresses: Vec) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1228,7 +1350,7 @@ impl FfiGroup { ) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1242,7 +1364,7 @@ impl FfiGroup { pub async fn update_group_name(&self, group_name: String) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1256,7 +1378,7 @@ impl FfiGroup { pub fn group_name(&self) -> Result { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1272,7 +1394,7 @@ impl FfiGroup { ) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1286,7 +1408,7 @@ impl FfiGroup { pub fn group_image_url_square(&self) -> Result { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1301,7 +1423,7 @@ impl FfiGroup { ) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1315,7 +1437,7 @@ impl FfiGroup { pub fn group_description(&self) -> Result { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1330,7 +1452,7 @@ impl FfiGroup { ) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1344,7 +1466,7 @@ impl FfiGroup { pub fn group_pinned_frame_url(&self) -> Result { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1356,7 +1478,7 @@ impl FfiGroup { pub fn admin_list(&self) -> Result, GenericError> { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1368,7 +1490,7 @@ impl FfiGroup { pub fn super_admin_list(&self) -> Result, GenericError> { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1390,7 +1512,7 @@ impl FfiGroup { pub async fn add_admin(&self, inbox_id: String) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); group @@ -1403,7 +1525,7 @@ impl FfiGroup { pub async fn remove_admin(&self, inbox_id: String) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); group @@ -1416,7 +1538,7 @@ impl FfiGroup { pub async fn add_super_admin(&self, inbox_id: String) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); group @@ -1429,7 +1551,7 @@ impl FfiGroup { pub async fn remove_super_admin(&self, inbox_id: String) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); group @@ -1446,7 +1568,7 @@ impl FfiGroup { pub fn group_permissions(&self) -> Result, GenericError> { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1464,7 +1586,7 @@ impl FfiGroup { ) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); group @@ -1483,7 +1605,7 @@ impl FfiGroup { let inner_client = Arc::clone(&self.inner_client); let handle = MlsGroup::stream_with_callback( inner_client, - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, move |message| message_callback.on_message(message.into()), ); @@ -1498,7 +1620,7 @@ impl FfiGroup { pub fn is_active(&self) -> Result { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1508,7 +1630,7 @@ impl FfiGroup { pub fn consent_state(&self) -> Result { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1520,7 +1642,7 @@ impl FfiGroup { pub fn update_consent_state(&self, state: FfiConsentState) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1532,45 +1654,45 @@ impl FfiGroup { pub fn added_by_inbox_id(&self) -> Result { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); Ok(group.added_by_inbox_id()?) } - pub fn group_metadata(&self) -> Result, GenericError> { + pub fn group_metadata(&self) -> Result, GenericError> { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); let metadata = group.metadata(group.mls_provider()?)?; - Ok(Arc::new(FfiGroupMetadata { + Ok(Arc::new(FfiConversationMetadata { inner: Arc::new(metadata), })) } } #[uniffi::export] -impl FfiGroup { +impl FfiConversation { pub fn id(&self) -> Vec { - self.group_id.clone() + self.conversation_id.clone() } } #[derive(uniffi::Enum, PartialEq)] -pub enum FfiGroupMessageKind { +pub enum FfiConversationMessageKind { Application, MembershipChange, } -impl From for FfiGroupMessageKind { +impl From for FfiConversationMessageKind { fn from(kind: GroupMessageKind) -> Self { match kind { - GroupMessageKind::Application => FfiGroupMessageKind::Application, - GroupMessageKind::MembershipChange => FfiGroupMessageKind::MembershipChange, + GroupMessageKind::Application => FfiConversationMessageKind::Application, + GroupMessageKind::MembershipChange => FfiConversationMessageKind::MembershipChange, } } } @@ -1609,7 +1731,7 @@ pub struct FfiMessage { pub convo_id: Vec, pub sender_inbox_id: String, pub content: Vec, - pub kind: FfiGroupMessageKind, + pub kind: FfiConversationMessageKind, pub delivery_status: FfiDeliveryStatus, } @@ -1715,16 +1837,16 @@ pub trait FfiMessageCallback: Send + Sync { #[uniffi::export(callback_interface)] pub trait FfiConversationCallback: Send + Sync { - fn on_conversation(&self, conversation: Arc); + fn on_conversation(&self, conversation: Arc); } #[derive(uniffi::Object)] -pub struct FfiGroupMetadata { +pub struct FfiConversationMetadata { inner: Arc, } #[uniffi::export] -impl FfiGroupMetadata { +impl FfiConversationMetadata { pub fn creator_inbox_id(&self) -> String { self.inner.creator_inbox_id.clone() } @@ -1784,10 +1906,10 @@ mod tests { use super::{create_client, FfiMessage, FfiMessageCallback, FfiXmtpClient}; use crate::{ get_inbox_id_for_address, inbox_owner::SigningError, logger::FfiLogger, FfiConsent, - FfiConsentEntityType, FfiConsentState, FfiConversationCallback, FfiCreateGroupOptions, - FfiGroup, FfiGroupMessageKind, FfiGroupPermissionsOptions, FfiInboxOwner, - FfiListConversationsOptions, FfiListMessagesOptions, FfiMetadataField, FfiPermissionPolicy, - FfiPermissionPolicySet, FfiPermissionUpdateType, + FfiConsentEntityType, FfiConsentState, FfiConversation, FfiConversationCallback, + FfiConversationMessageKind, FfiCreateGroupOptions, FfiGroupPermissionsOptions, + FfiInboxOwner, FfiListConversationsOptions, FfiListMessagesOptions, FfiMetadataField, + FfiPermissionPolicy, FfiPermissionPolicySet, FfiPermissionUpdateType, }; use ethers::utils::hex; use rand::distributions::{Alphanumeric, DistString}; @@ -1853,7 +1975,7 @@ mod tests { struct RustStreamCallback { num_messages: Arc, messages: Arc>>, - conversations: Arc>>>, + conversations: Arc>>>, notify: Arc, } @@ -1862,10 +1984,11 @@ mod tests { self.num_messages.load(Ordering::SeqCst) } - pub async fn wait_for_delivery(&self) -> Result<(), Elapsed> { - tokio::time::timeout(std::time::Duration::from_secs(60), async { - self.notify.notified().await - }) + pub async fn wait_for_delivery(&self, timeout_secs: Option) -> Result<(), Elapsed> { + tokio::time::timeout( + std::time::Duration::from_secs(timeout_secs.unwrap_or(60)), + async { self.notify.notified().await }, + ) .await?; Ok(()) } @@ -1885,7 +2008,7 @@ mod tests { } impl FfiConversationCallback for RustStreamCallback { - fn on_conversation(&self, group: Arc) { + fn on_conversation(&self, group: Arc) { log::debug!("received conversation"); let _ = self.num_messages.fetch_add(1, Ordering::SeqCst); let mut convos = self.conversations.lock().unwrap(); @@ -1952,12 +2075,12 @@ mod tests { new_test_client_with_wallet(wallet).await } - impl FfiGroup { + impl FfiConversation { #[cfg(test)] async fn update_installations(&self) -> Result<(), GroupError> { let group = MlsGroup::new( self.inner_client.context().clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -2455,7 +2578,7 @@ mod tests { .update_group_name("Old Name".to_string()) .await .unwrap(); - message_callbacks.wait_for_delivery().await.unwrap(); + message_callbacks.wait_for_delivery(None).await.unwrap(); let bo_groups = bo .conversations() @@ -2468,14 +2591,14 @@ mod tests { .update_group_name("Old Name2".to_string()) .await .unwrap(); - message_callbacks.wait_for_delivery().await.unwrap(); + message_callbacks.wait_for_delivery(None).await.unwrap(); // Uncomment the following lines to add more group name updates bo_group .update_group_name("Old Name3".to_string()) .await .unwrap(); - message_callbacks.wait_for_delivery().await.unwrap(); + message_callbacks.wait_for_delivery(None).await.unwrap(); assert_eq!(message_callbacks.message_count(), 3); @@ -2508,8 +2631,8 @@ mod tests { let alix_group1 = alix_groups[0].clone(); let alix_group5 = alix_groups[5].clone(); - let bo_group1 = bo.group(alix_group1.id()).unwrap(); - let bo_group5 = bo.group(alix_group5.id()).unwrap(); + let bo_group1 = bo.conversation(alix_group1.id()).unwrap(); + let bo_group5 = bo.conversation(alix_group5.id()).unwrap(); alix_group1.send("alix1".as_bytes().to_vec()).await.unwrap(); alix_group5.send("alix1".as_bytes().to_vec()).await.unwrap(); @@ -2523,7 +2646,7 @@ mod tests { assert_eq!(bo_messages1.len(), 0); assert_eq!(bo_messages5.len(), 0); - bo.conversations().sync_all_groups().await.unwrap(); + bo.conversations().sync_all_conversations().await.unwrap(); let bo_messages1 = bo_group1 .find_messages(FfiListMessagesOptions::default()) @@ -2551,7 +2674,7 @@ mod tests { .unwrap(); } bo.conversations().sync().await.unwrap(); - let num_groups_synced_1: u32 = bo.conversations().sync_all_groups().await.unwrap(); + let num_groups_synced_1: u32 = bo.conversations().sync_all_conversations().await.unwrap(); assert!(num_groups_synced_1 == 30); // Remove bo from all groups and sync @@ -2568,11 +2691,11 @@ mod tests { } // First sync after removal needs to process all groups and set them to inactive - let num_groups_synced_2: u32 = bo.conversations().sync_all_groups().await.unwrap(); + let num_groups_synced_2: u32 = bo.conversations().sync_all_conversations().await.unwrap(); assert!(num_groups_synced_2 == 30); // Second sync after removal will not process inactive groups - let num_groups_synced_3: u32 = bo.conversations().sync_all_groups().await.unwrap(); + let num_groups_synced_3: u32 = bo.conversations().sync_all_conversations().await.unwrap(); assert!(num_groups_synced_3 == 0); } @@ -2595,7 +2718,7 @@ mod tests { .unwrap(); bo.conversations().sync().await.unwrap(); - let bo_group = bo.group(alix_group.id()).unwrap(); + let bo_group = bo.conversation(alix_group.id()).unwrap(); bo_group.send("bo1".as_bytes().to_vec()).await.unwrap(); // Temporary workaround for OpenMLS issue - make sure Alix's epoch is up-to-date @@ -2677,8 +2800,8 @@ mod tests { client2.conversations().sync().await.unwrap(); // Find groups for both clients - let client1_group = client1.group(group.id()).unwrap(); - let client2_group = client2.group(group.id()).unwrap(); + let client1_group = client1.conversation(group.id()).unwrap(); + let client2_group = client2.conversation(group.id()).unwrap(); // Sync both groups client1_group.sync().await.unwrap(); @@ -2710,7 +2833,7 @@ mod tests { assert_eq!(client1_members.len(), 2); client2.conversations().sync().await.unwrap(); - let client2_group = client2.group(group.id()).unwrap(); + let client2_group = client2.conversation(group.id()).unwrap(); let client2_members = client2_group.list_members().await.unwrap(); assert_eq!(client2_members.len(), 2); } @@ -2749,9 +2872,9 @@ mod tests { caro.conversations().sync().await.unwrap(); // Alix and Caro find the group - let alix_group = alix.group(group.id()).unwrap(); - let bo_group = bo.group(group.id()).unwrap(); - let caro_group = caro.group(group.id()).unwrap(); + let alix_group = alix.conversation(group.id()).unwrap(); + let bo_group = bo.conversation(group.id()).unwrap(); + let caro_group = caro.conversation(group.id()).unwrap(); alix_group.update_installations().await.unwrap(); log::info!("Alix sending first message"); @@ -2791,7 +2914,7 @@ mod tests { // New installation of bo finds the group bo2.conversations().sync().await.unwrap(); - let bo2_group = bo2.group(group.id()).unwrap(); + let bo2_group = bo2.conversation(group.id()).unwrap(); log::info!("Bo sending fourth message"); // Bo sends a message to the group @@ -2858,7 +2981,7 @@ mod tests { bo.conversations().sync().await.unwrap(); - let bo_group = bo.group(alix_group.id()).unwrap(); + let bo_group = bo.conversation(alix_group.id()).unwrap(); // Move forward 4 epochs alix_group @@ -2929,7 +3052,7 @@ mod tests { .unwrap(); bo.conversations().sync().await.unwrap(); - let bo_group = bo.group(alix_group.id()).unwrap(); + let bo_group = bo.conversation(alix_group.id()).unwrap(); bo_group.send("bo1".as_bytes().to_vec()).await.unwrap(); alix_group.send("alix1".as_bytes().to_vec()).await.unwrap(); @@ -2985,7 +3108,7 @@ mod tests { .unwrap(); bo.conversations().sync().await.unwrap(); - let bo_group = bo.group(alix_group.id()).unwrap(); + let bo_group = bo.conversation(alix_group.id()).unwrap(); alix_group.sync().await.unwrap(); let alix_members = alix_group.list_members().await.unwrap(); @@ -3013,7 +3136,7 @@ mod tests { let bo_messages = bo_group .find_messages(FfiListMessagesOptions::default()) .unwrap(); - assert!(bo_messages.first().unwrap().kind == FfiGroupMessageKind::MembershipChange); + assert!(bo_messages.first().unwrap().kind == FfiConversationMessageKind::MembershipChange); assert_eq!(bo_messages.len(), 1); let bo_members = bo_group.list_members().await.unwrap(); @@ -3056,9 +3179,9 @@ mod tests { .update_group_name("hello".to_string()) .await .unwrap(); - message_callbacks.wait_for_delivery().await.unwrap(); + message_callbacks.wait_for_delivery(None).await.unwrap(); alix_group.send("hello1".as_bytes().to_vec()).await.unwrap(); - message_callbacks.wait_for_delivery().await.unwrap(); + message_callbacks.wait_for_delivery(None).await.unwrap(); let bo_groups = bo .conversations() @@ -3075,9 +3198,9 @@ mod tests { assert_eq!(bo_messages1.len(), first_msg_check); bo_group.send("hello2".as_bytes().to_vec()).await.unwrap(); - message_callbacks.wait_for_delivery().await.unwrap(); + message_callbacks.wait_for_delivery(None).await.unwrap(); bo_group.send("hello3".as_bytes().to_vec()).await.unwrap(); - message_callbacks.wait_for_delivery().await.unwrap(); + message_callbacks.wait_for_delivery(None).await.unwrap(); alix_group.sync().await.unwrap(); @@ -3087,7 +3210,7 @@ mod tests { assert_eq!(alix_messages.len(), second_msg_check); alix_group.send("hello4".as_bytes().to_vec()).await.unwrap(); - message_callbacks.wait_for_delivery().await.unwrap(); + message_callbacks.wait_for_delivery(None).await.unwrap(); bo_group.sync().await.unwrap(); let bo_messages2 = bo_group @@ -3120,7 +3243,7 @@ mod tests { .await .unwrap(); - stream_callback.wait_for_delivery().await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); assert_eq!(stream_callback.message_count(), 1); // Create another group and add bola @@ -3131,7 +3254,7 @@ mod tests { ) .await .unwrap(); - stream_callback.wait_for_delivery().await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); assert_eq!(stream_callback.message_count(), 2); @@ -3163,7 +3286,7 @@ mod tests { stream.wait_for_ready().await; alix_group.send("first".as_bytes().to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); let bo_group = bo .conversations() @@ -3176,11 +3299,11 @@ mod tests { let _ = caro.inner_client.sync_welcomes().await.unwrap(); bo_group.send("second".as_bytes().to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); alix_group.send("third".as_bytes().to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); bo_group.send("fourth".as_bytes().to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); assert_eq!(stream_callback.message_count(), 4); stream.end_and_wait().await.unwrap(); @@ -3192,7 +3315,7 @@ mod tests { let amal = new_test_client().await; let bola = new_test_client().await; - let amal_group: Arc = amal + let amal_group: Arc = amal .conversations() .create_group( vec![bola.account_address.clone()], @@ -3202,7 +3325,9 @@ mod tests { .unwrap(); bola.inner_client.sync_welcomes().await.unwrap(); - let bola_group = bola.group(amal_group.group_id.clone()).unwrap(); + let bola_group = bola + .conversation(amal_group.conversation_id.clone()) + .unwrap(); let stream_callback = RustStreamCallback::default(); let stream_closer = bola_group.stream(Box::new(stream_callback.clone())).await; @@ -3210,13 +3335,13 @@ mod tests { stream_closer.wait_for_ready().await; amal_group.send("hello".as_bytes().to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); amal_group .send("goodbye".as_bytes().to_vec()) .await .unwrap(); - stream_callback.wait_for_delivery().await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); assert_eq!(stream_callback.message_count(), 2); stream_closer.end_and_wait().await.unwrap(); @@ -3249,9 +3374,9 @@ mod tests { stream_closer.wait_for_ready().await; amal_group.send(b"hello1".to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); amal_group.send(b"hello2".to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); assert_eq!(stream_callback.message_count(), 2); assert!(!stream_closer.is_closed()); @@ -3260,7 +3385,7 @@ mod tests { .remove_members_by_inbox_id(vec![bola.inbox_id().clone()]) .await .unwrap(); - stream_callback.wait_for_delivery().await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); assert_eq!(stream_callback.message_count(), 3); // Member removal transcript message // amal_group.send(b"hello3".to_vec()).await.unwrap(); @@ -3279,7 +3404,7 @@ mod tests { assert_eq!(stream_callback.message_count(), 3); // Don't receive transcript messages while removed amal_group.send("hello4".as_bytes().to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); assert_eq!(stream_callback.message_count(), 4); // Receiving messages again assert!(!stream_closer.is_closed()); @@ -3360,10 +3485,10 @@ mod tests { ) .await .unwrap(); - group_callback.wait_for_delivery().await.unwrap(); + group_callback.wait_for_delivery(None).await.unwrap(); alix_group.send("hello1".as_bytes().to_vec()).await.unwrap(); - message_callback.wait_for_delivery().await.unwrap(); + message_callback.wait_for_delivery(None).await.unwrap(); assert_eq!(group_callback.message_count(), 1); assert_eq!(message_callback.message_count(), 1); @@ -3800,27 +3925,226 @@ mod tests { let alix_conversations = alix.conversations(); let bola_conversations = bola.conversations(); - let _alix_group = alix_conversations + let _alix_dm = alix_conversations .create_dm(bola.account_address.clone()) .await .unwrap(); - let alix_num_sync = alix_conversations.sync_all_groups().await.unwrap(); + let alix_num_sync = alix_conversations.sync_all_conversations().await.unwrap(); bola_conversations.sync().await.unwrap(); - let bola_num_sync = bola_conversations.sync_all_groups().await.unwrap(); + let bola_num_sync = bola_conversations.sync_all_conversations().await.unwrap(); assert_eq!(alix_num_sync, 1); assert_eq!(bola_num_sync, 1); let alix_groups = alix_conversations - .list(FfiListConversationsOptions::default()) + .list_groups(FfiListConversationsOptions::default()) .await .unwrap(); assert_eq!(alix_groups.len(), 0); let bola_groups = bola_conversations - .list(FfiListConversationsOptions::default()) + .list_groups(FfiListConversationsOptions::default()) .await .unwrap(); assert_eq!(bola_groups.len(), 0); + + let alix_dms = alix_conversations + .list_dms(FfiListConversationsOptions::default()) + .await + .unwrap(); + assert_eq!(alix_dms.len(), 1); + + let bola_dms = bola_conversations + .list_dms(FfiListConversationsOptions::default()) + .await + .unwrap(); + assert_eq!(bola_dms.len(), 1); + + let alix_conversations = alix_conversations + .list(FfiListConversationsOptions::default()) + .await + .unwrap(); + assert_eq!(alix_conversations.len(), 1); + + let bola_conversations = bola_conversations + .list(FfiListConversationsOptions::default()) + .await + .unwrap(); + assert_eq!(bola_conversations.len(), 1); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_dm_streaming() { + let alix = new_test_client().await; + let bo = new_test_client().await; + + // Stream all conversations + let stream_callback = RustStreamCallback::default(); + let stream = bo + .conversations() + .stream(Box::new(stream_callback.clone())) + .await; + + alix.conversations() + .create_group( + vec![bo.account_address.clone()], + FfiCreateGroupOptions::default(), + ) + .await + .unwrap(); + + stream_callback.wait_for_delivery(None).await.unwrap(); + + assert_eq!(stream_callback.message_count(), 1); + alix.conversations() + .create_dm(bo.account_address.clone()) + .await + .unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); + + assert_eq!(stream_callback.message_count(), 2); + + stream.end_and_wait().await.unwrap(); + assert!(stream.is_closed()); + + // Stream just groups + let stream_callback = RustStreamCallback::default(); + let stream = bo + .conversations() + .stream_groups(Box::new(stream_callback.clone())) + .await; + + alix.conversations() + .create_group( + vec![bo.account_address.clone()], + FfiCreateGroupOptions::default(), + ) + .await + .unwrap(); + + stream_callback.wait_for_delivery(None).await.unwrap(); + + assert_eq!(stream_callback.message_count(), 1); + alix.conversations() + .create_dm(bo.account_address.clone()) + .await + .unwrap(); + let result = stream_callback.wait_for_delivery(Some(2)).await; + assert!(result.is_err(), "Stream unexpectedly received a DM"); + assert_eq!(stream_callback.message_count(), 1); + + stream.end_and_wait().await.unwrap(); + assert!(stream.is_closed()); + + // Stream just dms + let stream_callback = RustStreamCallback::default(); + let stream = bo + .conversations() + .stream_dms(Box::new(stream_callback.clone())) + .await; + + alix.conversations() + .create_dm(bo.account_address.clone()) + .await + .unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); + assert_eq!(stream_callback.message_count(), 1); + + alix.conversations() + .create_group( + vec![bo.account_address.clone()], + FfiCreateGroupOptions::default(), + ) + .await + .unwrap(); + + let result = stream_callback.wait_for_delivery(Some(2)).await; + assert!(result.is_err(), "Stream unexpectedly received a Group"); + assert_eq!(stream_callback.message_count(), 1); + + stream.end_and_wait().await.unwrap(); + assert!(stream.is_closed()); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_stream_all_dm_messages() { + let alix = new_test_client().await; + let bo = new_test_client().await; + let alix_dm = alix + .conversations() + .create_dm(bo.account_address.clone()) + .await + .unwrap(); + + let alix_group = alix + .conversations() + .create_group( + vec![bo.account_address.clone()], + FfiCreateGroupOptions::default(), + ) + .await + .unwrap(); + + // Stream all conversations + let stream_callback = RustStreamCallback::default(); + let stream = bo + .conversations() + .stream_all_messages(Box::new(stream_callback.clone())) + .await; + stream.wait_for_ready().await; + + alix_group.send("first".as_bytes().to_vec()).await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); + assert_eq!(stream_callback.message_count(), 1); + + alix_dm.send("second".as_bytes().to_vec()).await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); + assert_eq!(stream_callback.message_count(), 2); + + stream.end_and_wait().await.unwrap(); + assert!(stream.is_closed()); + + // Stream just groups + let stream_callback = RustStreamCallback::default(); + let stream = bo + .conversations() + .stream_all_group_messages(Box::new(stream_callback.clone())) + .await; + stream.wait_for_ready().await; + + alix_group.send("first".as_bytes().to_vec()).await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); + assert_eq!(stream_callback.message_count(), 1); + + alix_dm.send("second".as_bytes().to_vec()).await.unwrap(); + let result = stream_callback.wait_for_delivery(Some(2)).await; + assert!(result.is_err(), "Stream unexpectedly received a DM message"); + assert_eq!(stream_callback.message_count(), 1); + + stream.end_and_wait().await.unwrap(); + assert!(stream.is_closed()); + + // Stream just dms + let stream_callback = RustStreamCallback::default(); + let stream = bo + .conversations() + .stream_all_dm_messages(Box::new(stream_callback.clone())) + .await; + stream.wait_for_ready().await; + + alix_dm.send("first".as_bytes().to_vec()).await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); + assert_eq!(stream_callback.message_count(), 1); + + alix_group.send("second".as_bytes().to_vec()).await.unwrap(); + let result = stream_callback.wait_for_delivery(Some(2)).await; + assert!( + result.is_err(), + "Stream unexpectedly received a Group message" + ); + assert_eq!(stream_callback.message_count(), 1); + + stream.end_and_wait().await.unwrap(); + assert!(stream.is_closed()); } #[tokio::test(flavor = "multi_thread", worker_threads = 5)] @@ -3841,7 +4165,7 @@ mod tests { assert_eq!(alix_initial_consent, FfiConsentState::Allowed); bo.conversations().sync().await.unwrap(); - let bo_group = bo.group(alix_group.id()).unwrap(); + let bo_group = bo.conversation(alix_group.id()).unwrap(); let bo_initial_consent = bo_group.consent_state().unwrap(); assert_eq!(bo_initial_consent, FfiConsentState::Unknown); @@ -3853,7 +4177,7 @@ mod tests { assert_eq!(alix_updated_consent, FfiConsentState::Denied); bo.set_consent_states(vec![FfiConsent { state: FfiConsentState::Allowed, - entity_type: FfiConsentEntityType::GroupId, + entity_type: FfiConsentEntityType::ConversationId, entity: hex::encode(bo_group.id()), }]) .await @@ -3862,6 +4186,42 @@ mod tests { assert_eq!(bo_updated_consent, FfiConsentState::Allowed); } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_set_and_get_dm_consent() { + let alix = new_test_client().await; + let bo = new_test_client().await; + + let alix_dm = alix + .conversations() + .create_dm(bo.account_address.clone()) + .await + .unwrap(); + + let alix_initial_consent = alix_dm.consent_state().unwrap(); + assert_eq!(alix_initial_consent, FfiConsentState::Allowed); + + bo.conversations().sync().await.unwrap(); + let bo_dm = bo.conversation(alix_dm.id()).unwrap(); + + let bo_initial_consent = bo_dm.consent_state().unwrap(); + assert_eq!(bo_initial_consent, FfiConsentState::Unknown); + + alix_dm + .update_consent_state(FfiConsentState::Denied) + .unwrap(); + let alix_updated_consent = alix_dm.consent_state().unwrap(); + assert_eq!(alix_updated_consent, FfiConsentState::Denied); + bo.set_consent_states(vec![FfiConsent { + state: FfiConsentState::Allowed, + entity_type: FfiConsentEntityType::ConversationId, + entity: hex::encode(bo_dm.id()), + }]) + .await + .unwrap(); + let bo_updated_consent = bo_dm.consent_state().unwrap(); + assert_eq!(bo_updated_consent, FfiConsentState::Allowed); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] async fn test_set_and_get_member_consent() { let alix = new_test_client().await; diff --git a/bindings_node/src/consent_state.rs b/bindings_node/src/consent_state.rs index ecc1ba8a7..a674f3090 100644 --- a/bindings_node/src/consent_state.rs +++ b/bindings_node/src/consent_state.rs @@ -38,7 +38,7 @@ pub enum NapiConsentEntityType { impl From for ConsentType { fn from(entity_type: NapiConsentEntityType) -> Self { match entity_type { - NapiConsentEntityType::GroupId => ConsentType::GroupId, + NapiConsentEntityType::GroupId => ConsentType::ConversationId, NapiConsentEntityType::InboxId => ConsentType::InboxId, NapiConsentEntityType::Address => ConsentType::Address, } diff --git a/bindings_node/src/conversations.rs b/bindings_node/src/conversations.rs index c3ae47033..a55aac2c7 100644 --- a/bindings_node/src/conversations.rs +++ b/bindings_node/src/conversations.rs @@ -7,6 +7,7 @@ use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFun use napi::JsFunction; use napi_derive::napi; use xmtp_mls::client::FindGroupParams; +use xmtp_mls::groups::group_metadata::ConversationType; use xmtp_mls::groups::{GroupMetadataOptions, PreconfiguredPolicies}; use crate::messages::NapiMessage; @@ -210,7 +211,7 @@ impl NapiConversations { ThreadsafeFunctionCallMode::Blocking, ); }, - false, + Some(ConversationType::Group), ); Ok(NapiStreamCloser::new(stream_closer)) @@ -225,6 +226,7 @@ impl NapiConversations { move |message| { tsfn.call(Ok(message.into()), ThreadsafeFunctionCallMode::Blocking); }, + Some(ConversationType::Group), ); Ok(NapiStreamCloser::new(stream_closer)) diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index 00e8f55a5..0bcebf766 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -41,8 +41,9 @@ use xmtp_proto::xmtp::mls::api::v1::{ use crate::{ api::ApiClientWrapper, groups::{ - group_permissions::PolicySet, validated_commit::CommitValidationError, GroupError, - GroupMetadataOptions, IntentError, MlsGroup, + group_metadata::ConversationType, group_permissions::PolicySet, + validated_commit::CommitValidationError, GroupError, GroupMetadataOptions, IntentError, + MlsGroup, }, identity::{parse_credential, Identity, IdentityError}, identity_updates::{load_identity_updates, IdentityUpdateError}, @@ -224,7 +225,7 @@ pub struct FindGroupParams { pub created_after_ns: Option, pub created_before_ns: Option, pub limit: Option, - pub include_dm_groups: bool, + pub conversation_type: Option, } /// Clients manage access to the network, identity, and data store @@ -660,7 +661,7 @@ where params.created_after_ns, params.created_before_ns, params.limit, - params.include_dm_groups, + params.conversation_type, )? .into_iter() .map(|stored_group| { diff --git a/xmtp_mls/src/groups/message_history.rs b/xmtp_mls/src/groups/message_history.rs index 8f09e1a32..52dcbd46e 100644 --- a/xmtp_mls/src/groups/message_history.rs +++ b/xmtp_mls/src/groups/message_history.rs @@ -25,6 +25,7 @@ use xmtp_proto::{ }, }; +use super::group_metadata::ConversationType; use super::{GroupError, MlsGroup}; use crate::XmtpApi; @@ -135,7 +136,7 @@ where pub async fn ensure_member_of_all_groups(&self, inbox_id: String) -> Result<(), GroupError> { let conn = self.store().conn()?; - let groups = conn.find_groups(None, None, None, None, false)?; + let groups = conn.find_groups(None, None, None, None, Some(ConversationType::Group))?; for group in groups { let group = self.group(group.id)?; Box::pin(group.add_members_by_inbox_id(self, vec![inbox_id.clone()])).await?; @@ -384,7 +385,7 @@ where self.sync_welcomes().await?; let conn = self.store().conn()?; - let groups = conn.find_groups(None, None, None, None, false)?; + let groups = conn.find_groups(None, None, None, None, Some(ConversationType::Group))?; for crate::storage::group::StoredGroup { id, .. } in groups.into_iter() { let group = self.group(id)?; Box::pin(group.sync(self)).await?; @@ -502,14 +503,14 @@ where async fn prepare_groups_to_sync(&self) -> Result, MessageHistoryError> { let conn = self.store().conn()?; - Ok(conn.find_groups(None, None, None, None, false)?) + Ok(conn.find_groups(None, None, None, None, Some(ConversationType::Group))?) } async fn prepare_messages_to_sync( &self, ) -> Result, MessageHistoryError> { let conn = self.store().conn()?; - let groups = conn.find_groups(None, None, None, None, false)?; + let groups = conn.find_groups(None, None, None, None, Some(ConversationType::Group))?; let mut all_messages: Vec = vec![]; for StoredGroup { id, .. } in groups.into_iter() { diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index c686e52bf..381e38e30 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -381,11 +381,11 @@ impl MlsGroup { ); stored_group.store(provider.conn_ref())?; - Ok(Self::new( - context.clone(), - group_id, - stored_group.created_at_ns, - )) + let new_group = Self::new(context.clone(), group_id, stored_group.created_at_ns); + + // Consent state defaults to allowed when the user creates the group + new_group.update_consent_state(ConsentState::Allowed)?; + Ok(new_group) } // Create a group from a decrypted and decoded welcome message @@ -1041,8 +1041,10 @@ impl MlsGroup { /// Find the `consent_state` of the group pub fn consent_state(&self) -> Result { let conn = self.context.store.conn()?; - let record = - conn.get_consent_record(hex::encode(self.group_id.clone()), ConsentType::GroupId)?; + let record = conn.get_consent_record( + hex::encode(self.group_id.clone()), + ConsentType::ConversationId, + )?; match record { Some(rec) => Ok(rec.state), @@ -1053,7 +1055,7 @@ impl MlsGroup { pub fn update_consent_state(&self, state: ConsentState) -> Result<(), GroupError> { let conn = self.context.store.conn()?; conn.insert_or_replace_consent_records(vec![StoredConsentRecord::new( - ConsentType::GroupId, + ConsentType::ConversationId, state, hex::encode(self.group_id.clone()), )])?; @@ -3229,7 +3231,7 @@ mod tests { let _ = bola.sync_welcomes().await; let bola_groups = bola .find_groups(FindGroupParams { - include_dm_groups: true, + conversation_type: None, ..FindGroupParams::default() }) .unwrap(); diff --git a/xmtp_mls/src/storage/encrypted_store/consent_record.rs b/xmtp_mls/src/storage/encrypted_store/consent_record.rs index 1a6cd5ef4..ac5bdbe01 100644 --- a/xmtp_mls/src/storage/encrypted_store/consent_record.rs +++ b/xmtp_mls/src/storage/encrypted_store/consent_record.rs @@ -21,7 +21,7 @@ use serde::{Deserialize, Serialize}; #[diesel(table_name = consent_records)] #[diesel(primary_key(entity_type, entity))] pub struct StoredConsentRecord { - /// Enum, [`ConsentType`] representing the type of consent (group_id inbox_id, etc..) + /// Enum, [`ConsentType`] representing the type of consent (conversation_id inbox_id, etc..) pub entity_type: ConsentType, /// Enum, [`ConsentState`] representing the state of consent (allowed, denied, etc..) pub state: ConsentState, @@ -85,8 +85,8 @@ impl DbConnection { #[diesel(sql_type = Integer)] /// Type of consent record stored pub enum ConsentType { - /// Consent is for a group - GroupId = 1, + /// Consent is for a conversation + ConversationId = 1, /// Consent is for an inbox InboxId = 2, /// Consent is for an address @@ -109,7 +109,7 @@ where { fn from_sql(bytes: ::RawValue<'_>) -> deserialize::Result { match i32::from_sql(bytes)? { - 1 => Ok(ConsentType::GroupId), + 1 => Ok(ConsentType::ConversationId), 2 => Ok(ConsentType::InboxId), 3 => Ok(ConsentType::Address), x => Err(format!("Unrecognized variant {}", x).into()), diff --git a/xmtp_mls/src/storage/encrypted_store/group.rs b/xmtp_mls/src/storage/encrypted_store/group.rs index 851e6bf78..465e9b113 100644 --- a/xmtp_mls/src/storage/encrypted_store/group.rs +++ b/xmtp_mls/src/storage/encrypted_store/group.rs @@ -15,7 +15,9 @@ use super::{ db_connection::DbConnection, schema::{groups, groups::dsl}, }; -use crate::{impl_fetch, impl_store, DuplicateItem, StorageError}; +use crate::{ + groups::group_metadata::ConversationType, impl_fetch, impl_store, DuplicateItem, StorageError, +}; /// The Group ID type. pub type ID = Vec; @@ -122,7 +124,7 @@ impl DbConnection { created_after_ns: Option, created_before_ns: Option, limit: Option, - include_dm_groups: bool, + conversation_type: Option, ) -> Result, StorageError> { let mut query = dsl::groups.order(dsl::created_at_ns.asc()).into_boxed(); @@ -142,8 +144,16 @@ impl DbConnection { query = query.limit(limit); } - if !include_dm_groups { - query = query.filter(dsl::dm_inbox_id.is_null()); + if let Some(conversation_type) = conversation_type { + match conversation_type { + ConversationType::Group => { + query = query.filter(dsl::dm_inbox_id.is_null()); + } + ConversationType::Dm => { + query = query.filter(dsl::dm_inbox_id.is_not_null()); + } + ConversationType::Sync => {} + } } query = query.filter(dsl::purpose.eq(Purpose::Conversation)); @@ -481,7 +491,9 @@ pub(crate) mod tests { let test_group_3 = generate_dm(Some(GroupMembershipState::Allowed)); test_group_3.store(conn).unwrap(); - let all_results = conn.find_groups(None, None, None, None, false).unwrap(); + let all_results = conn + .find_groups(None, None, None, None, Some(ConversationType::Group)) + .unwrap(); assert_eq!(all_results.len(), 2); let pending_results = conn @@ -490,19 +502,27 @@ pub(crate) mod tests { None, None, None, - false, + Some(ConversationType::Group), ) .unwrap(); assert_eq!(pending_results[0].id, test_group_1.id); assert_eq!(pending_results.len(), 1); // Offset and limit - let results_with_limit = conn.find_groups(None, None, None, Some(1), false).unwrap(); + let results_with_limit = conn + .find_groups(None, None, None, Some(1), Some(ConversationType::Group)) + .unwrap(); assert_eq!(results_with_limit.len(), 1); assert_eq!(results_with_limit[0].id, test_group_1.id); let results_with_created_at_ns_after = conn - .find_groups(None, Some(test_group_1.created_at_ns), None, Some(1), false) + .find_groups( + None, + Some(test_group_1.created_at_ns), + None, + Some(1), + Some(ConversationType::Group), + ) .unwrap(); assert_eq!(results_with_created_at_ns_after.len(), 1); assert_eq!(results_with_created_at_ns_after[0].id, test_group_2.id); @@ -512,9 +532,16 @@ pub(crate) mod tests { assert_eq!(synced_groups.len(), 0); // test that dm groups are included - let dm_results = conn.find_groups(None, None, None, None, true).unwrap(); + let dm_results = conn.find_groups(None, None, None, None, None).unwrap(); assert_eq!(dm_results.len(), 3); assert_eq!(dm_results[2].id, test_group_3.id); + + // test only dms are returned + let dm_results = conn + .find_groups(None, None, None, None, Some(ConversationType::Dm)) + .unwrap(); + assert_eq!(dm_results.len(), 1); + assert_eq!(dm_results[0].id, test_group_3.id); }) } diff --git a/xmtp_mls/src/subscriptions.rs b/xmtp_mls/src/subscriptions.rs index adda5d0d4..fabd6679e 100644 --- a/xmtp_mls/src/subscriptions.rs +++ b/xmtp_mls/src/subscriptions.rs @@ -132,36 +132,52 @@ where pub async fn stream_conversations( &self, - include_dm: bool, + conversation_type: Option, ) -> Result + '_, ClientError> { let provider = Arc::new(self.context.mls_provider()?); let event_queue = tokio_stream::wrappers::BroadcastStream::new(self.local_events.subscribe()); - // Helper function for filtering Dm groups - let filter_group = move |group: MlsGroup, provider: Arc| async move { - match group.metadata(provider.as_ref()) { - Ok(metadata) => { - if include_dm || metadata.conversation_type != ConversationType::Dm { - Some(group) - } else { + // Helper function for filtering based on conversation type + let filter_group = Arc::new(move |group: MlsGroup, provider: Arc| { + let conversation_type = conversation_type.clone(); + async move { + match group.metadata(provider.as_ref()) { + Ok(metadata) => match conversation_type { + Some(ConversationType::Dm) => { + if metadata.conversation_type == ConversationType::Dm { + Some(group) + } else { + None + } + } + Some(ConversationType::Group) => { + if metadata.conversation_type == ConversationType::Group { + Some(group) + } else { + None + } + } + None => Some(group), // Return all groups if conversation_type is None + _ => None, + }, + Err(err) => { + tracing::error!("Error processing group metadata: {:?}", err); None } } - Err(err) => { - tracing::error!("Error processing group metadata: {:?}", err); - None - } } - }; + }); let event_provider = Arc::clone(&provider); + let event_filter_group = Arc::clone(&filter_group); let event_queue = event_queue.filter_map(move |event| { let provider = Arc::clone(&event_provider); + let event_filter_group = Arc::clone(&event_filter_group); async move { match event { - Ok(LocalEvents::NewGroup(group)) => filter_group(group, provider).await, + Ok(LocalEvents::NewGroup(group)) => event_filter_group(group, provider).await, Err(BroadcastStreamRecvError::Lagged(missed)) => { tracing::warn!("Missed {missed} messages due to local event queue lagging"); None @@ -180,6 +196,7 @@ where .await?; let stream_provider = Arc::clone(&provider); + let stream_filter_group = Arc::clone(&filter_group); let stream = subscription .map(|welcome| async { tracing::info!("Received conversation streaming payload"); @@ -187,9 +204,10 @@ where }) .filter_map(move |res| { let provider = Arc::clone(&stream_provider); + let stream_filter_group = Arc::clone(&stream_filter_group); async move { match res.await { - Ok(group) => filter_group(group, provider).await, + Ok(group) => stream_filter_group(group, provider).await, Err(err) => { tracing::error!( "Error processing stream entry for conversation: {:?}", @@ -268,12 +286,12 @@ where pub fn stream_conversations_with_callback( client: Arc>, mut convo_callback: impl FnMut(MlsGroup) + Send + 'static, - include_dm: bool, + conversation_type: Option, ) -> StreamHandle> { let (tx, rx) = oneshot::channel(); let handle = tokio::spawn(async move { - let stream = client.stream_conversations(include_dm).await?; + let stream = client.stream_conversations(conversation_type).await?; futures::pin_mut!(stream); let _ = tx.send(()); while let Some(convo) = stream.next().await { @@ -317,13 +335,14 @@ where pub async fn stream_all_messages( &self, + conversation_type: Option, ) -> Result> + '_, ClientError> { self.sync_welcomes().await?; let mut group_id_to_info = self .store() .conn()? - .find_groups(None, None, None, None, false)? + .find_groups(None, None, None, None, conversation_type.clone())? .into_iter() .map(Into::into) .collect::, MessagesStreamInfo>>(); @@ -335,7 +354,7 @@ where futures::pin_mut!(messages_stream); tracing::info!("Setting up conversation stream in stream_all_messages"); - let convo_stream = self.stream_conversations(true).await?; + let convo_stream = self.stream_conversations(conversation_type.clone()).await?; futures::pin_mut!(convo_stream); @@ -398,11 +417,12 @@ where pub fn stream_all_messages_with_callback( client: Arc>, mut callback: impl FnMut(StoredGroupMessage) + Send + Sync + 'static, + conversation_type: Option, ) -> StreamHandle> { let (tx, rx) = oneshot::channel(); let handle = tokio::spawn(async move { - let stream = Self::stream_all_messages(&client).await?; + let stream = Self::stream_all_messages(&client, conversation_type).await?; let _ = tx.send(()); futures::pin_mut!(stream); while let Some(message) = stream.next().await { @@ -425,6 +445,7 @@ where #[cfg(test)] mod tests { use crate::client::FindGroupParams; + use crate::groups::group_metadata::ConversationType; use crate::utils::test::{Delivery, TestClient}; use crate::{ builder::ClientBuilder, groups::GroupMetadataOptions, @@ -456,7 +477,7 @@ mod tests { let mut stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx); let bob_ptr = bob.clone(); tokio::spawn(async move { - let bob_stream = bob_ptr.stream_conversations(true).await.unwrap(); + let bob_stream = bob_ptr.stream_conversations(None).await.unwrap(); futures::pin_mut!(bob_stream); while let Some(item) = bob_stream.next().await { let _ = tx.send(item); @@ -550,6 +571,7 @@ mod tests { (*messages_clone.lock()).push(message); notify_pointer.notify_one(); }, + None, ); handle.wait_for_ready().await; @@ -602,11 +624,14 @@ mod tests { let messages_clone = messages.clone(); let delivery = Delivery::new(None); let delivery_pointer = delivery.clone(); - let mut handle = - Client::::stream_all_messages_with_callback(caro.clone(), move |message| { + let mut handle = Client::::stream_all_messages_with_callback( + caro.clone(), + move |message| { delivery_pointer.notify_one(); (*messages_clone.lock()).push(message); - }); + }, + None, + ); handle.wait_for_ready().await; alix_group @@ -710,11 +735,14 @@ mod tests { let blocked = Arc::new(AtomicU64::new(55)); let blocked_pointer = blocked.clone(); - let mut handle = - Client::::stream_all_messages_with_callback(caro.clone(), move |message| { + let mut handle = Client::::stream_all_messages_with_callback( + caro.clone(), + move |message| { (*messages_clone.lock()).push(message); blocked_pointer.fetch_sub(1, Ordering::SeqCst); - }); + }, + None, + ); handle.wait_for_ready().await; let alix_group_pointer = alix_group.clone(); @@ -773,7 +801,7 @@ mod tests { groups.push(g); notify_pointer.notify_one(); }, - false, + Some(ConversationType::Group), ); alix.create_group(None, GroupMetadataOptions::default()) @@ -826,7 +854,7 @@ mod tests { let notify = Delivery::new(Some(std::time::Duration::from_secs(1))); let (notify_pointer, groups_pointer) = (notify.clone(), groups.clone()); - // Start a stream with enableDm set to false + // Start a stream with conversation_type Group let closer = Client::::stream_conversations_with_callback( alix.clone(), move |g| { @@ -834,7 +862,7 @@ mod tests { groups.push(g); notify_pointer.notify_one(); }, - false, + Some(ConversationType::Group), ); alix.create_dm_by_inbox_id(bo.inbox_id()).await.unwrap(); @@ -842,13 +870,69 @@ mod tests { let result = notify.wait_for_delivery().await; assert!(result.is_err(), "Stream unexpectedly received a DM group"); + let group = alix + .create_group(None, GroupMetadataOptions::default()) + .unwrap(); + group + .add_members_by_inbox_id(&alix, vec![bo.inbox_id()]) + .await + .unwrap(); + + notify.wait_for_delivery().await.unwrap(); + { + let grps = groups.lock(); + assert_eq!(grps.len(), 1); + } + + closer.handle.abort(); + + // Start a stream with only dms + let groups = Arc::new(Mutex::new(Vec::new())); + // Wait for 2 seconds for the group creation to be streamed + let notify = Delivery::new(Some(std::time::Duration::from_secs(1))); + let (notify_pointer, groups_pointer) = (notify.clone(), groups.clone()); + + // Start a stream with conversation_type DM + let closer = Client::::stream_conversations_with_callback( + alix.clone(), + move |g| { + let mut groups: parking_lot::lock_api::MutexGuard< + '_, + parking_lot::RawMutex, + Vec, + > = groups_pointer.lock(); + groups.push(g); + notify_pointer.notify_one(); + }, + Some(ConversationType::Dm), + ); + + let group = alix + .create_group(None, GroupMetadataOptions::default()) + .unwrap(); + group + .add_members_by_inbox_id(&alix, vec![bo.inbox_id()]) + .await + .unwrap(); + + let result = notify.wait_for_delivery().await; + assert!(result.is_err(), "Stream unexpectedly received a Group"); + + alix.create_dm_by_inbox_id(bo.inbox_id()).await.unwrap(); + notify.wait_for_delivery().await.unwrap(); + { + let grps = groups.lock(); + assert_eq!(grps.len(), 1); + } + closer.handle.abort(); - // Start a stream with enableDm set to true + // Start a stream with all conversations let groups = Arc::new(Mutex::new(Vec::new())); // Wait for 2 seconds for the group creation to be streamed let notify = Delivery::new(Some(std::time::Duration::from_secs(60))); let (notify_pointer, groups_pointer) = (notify.clone(), groups.clone()); + // Start a stream with conversation_type None let closer = Client::::stream_conversations_with_callback( alix.clone(), move |g| { @@ -856,7 +940,7 @@ mod tests { groups.push(g); notify_pointer.notify_one(); }, - true, + None, ); alix.create_dm_by_inbox_id(bo.inbox_id()).await.unwrap(); @@ -876,6 +960,168 @@ mod tests { assert_eq!(grps.len(), 2); } + let group = alix + .create_group(None, GroupMetadataOptions::default()) + .unwrap(); + group + .add_members_by_inbox_id(&alix, vec![bo.inbox_id()]) + .await + .unwrap(); + + notify.wait_for_delivery().await.unwrap(); + { + let grps = groups.lock(); + assert_eq!(grps.len(), 3); + } + + closer.handle.abort(); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_dm_stream_all_messages() { + let alix = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await); + let bo = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await); + + let alix_group = alix + .create_group(None, GroupMetadataOptions::default()) + .unwrap(); + alix_group + .add_members_by_inbox_id(&alix, vec![bo.inbox_id()]) + .await + .unwrap(); + + let alix_dm = alix.create_dm_by_inbox_id(bo.inbox_id()).await.unwrap(); + + // Start a stream with only groups + let messages: Arc>> = Arc::new(Mutex::new(Vec::new())); + // Wait for 2 seconds for the group creation to be streamed + let notify = Delivery::new(Some(std::time::Duration::from_secs(1))); + let (notify_pointer, messages_pointer) = (notify.clone(), messages.clone()); + + let mut closer = Client::::stream_all_messages_with_callback( + bo.clone(), + move |message| { + let mut messages: parking_lot::lock_api::MutexGuard< + '_, + parking_lot::RawMutex, + Vec, + > = messages_pointer.lock(); + messages.push(message); + notify_pointer.notify_one(); + }, + Some(ConversationType::Group), + ); + closer.wait_for_ready().await; + + alix_dm + .send_message("first".as_bytes(), &alix) + .await + .unwrap(); + + let result = notify.wait_for_delivery().await; + assert!(result.is_err(), "Stream unexpectedly received a DM message"); + + alix_group + .send_message("second".as_bytes(), &alix) + .await + .unwrap(); + + notify.wait_for_delivery().await.unwrap(); + { + let msgs = messages.lock(); + assert_eq!(msgs.len(), 1); + } + + closer.handle.abort(); + + // Start a stream with only dms + let messages: Arc>> = Arc::new(Mutex::new(Vec::new())); + // Wait for 2 seconds for the group creation to be streamed + let notify = Delivery::new(Some(std::time::Duration::from_secs(1))); + let (notify_pointer, messages_pointer) = (notify.clone(), messages.clone()); + + let mut closer = Client::::stream_all_messages_with_callback( + bo.clone(), + move |message| { + let mut messages: parking_lot::lock_api::MutexGuard< + '_, + parking_lot::RawMutex, + Vec, + > = messages_pointer.lock(); + messages.push(message); + notify_pointer.notify_one(); + }, + Some(ConversationType::Dm), + ); + closer.wait_for_ready().await; + + alix_group + .send_message("first".as_bytes(), &alix) + .await + .unwrap(); + + let result = notify.wait_for_delivery().await; + assert!( + result.is_err(), + "Stream unexpectedly received a Group message" + ); + + alix_dm + .send_message("second".as_bytes(), &alix) + .await + .unwrap(); + + notify.wait_for_delivery().await.unwrap(); + { + let msgs = messages.lock(); + assert_eq!(msgs.len(), 1); + } + + closer.handle.abort(); + + // Start a stream with all conversations + let messages: Arc>> = Arc::new(Mutex::new(Vec::new())); + // Wait for 2 seconds for the group creation to be streamed + let notify = Delivery::new(Some(std::time::Duration::from_secs(1))); + let (notify_pointer, messages_pointer) = (notify.clone(), messages.clone()); + + let mut closer = Client::::stream_all_messages_with_callback( + bo.clone(), + move |message| { + let mut messages: parking_lot::lock_api::MutexGuard< + '_, + parking_lot::RawMutex, + Vec, + > = messages_pointer.lock(); + messages.push(message); + notify_pointer.notify_one(); + }, + None, + ); + closer.wait_for_ready().await; + + alix_group + .send_message("first".as_bytes(), &alix) + .await + .unwrap(); + + notify.wait_for_delivery().await.unwrap(); + { + let msgs = messages.lock(); + assert_eq!(msgs.len(), 1); + } + + alix_dm + .send_message("second".as_bytes(), &alix) + .await + .unwrap(); + + notify.wait_for_delivery().await.unwrap(); + { + let msgs = messages.lock(); + assert_eq!(msgs.len(), 2); + } + closer.handle.abort(); } } diff --git a/xmtp_user_preferences/README.md b/xmtp_user_preferences/README.md index 35d10bc42..de0b132f0 100644 --- a/xmtp_user_preferences/README.md +++ b/xmtp_user_preferences/README.md @@ -1,3 +1,3 @@ -# XTMP Personal Private Portable Preferences +# XMTP Personal Private Portable Preferences A library for encrypting messages where the sender and recipient are the same (self-messaging).