diff --git a/.github/workflows/release-node-bindings.yml b/.github/workflows/release-node-bindings.yml index 2c38c822e..39318fbe6 100644 --- a/.github/workflows/release-node-bindings.yml +++ b/.github/workflows/release-node-bindings.yml @@ -196,6 +196,11 @@ jobs: . bindings_node + - name: Disable usePackageExitCodes feature + uses: crazy-max/ghaction-chocolatey@v3 + with: + args: feature disable --name="'usePackageExitCodes'" + - name: Upgrade Visual Studio 2022 enterprise uses: crazy-max/ghaction-chocolatey@v3 with: diff --git a/Cargo.lock b/Cargo.lock index a5caa0e37..c32202839 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14,13 +14,19 @@ dependencies = [ [[package]] name = "addr2line" -version = "0.24.2" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" +checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" dependencies = [ "gimli", ] +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "adler2" version = "2.0.0" @@ -357,17 +363,17 @@ dependencies = [ [[package]] name = "backtrace" -version = "0.3.74" +version = "0.3.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" +checksum = "26b05800d2e817c8b3b4b54abd461726265fa9789ae34330622f2db9ee696f9d" dependencies = [ "addr2line", + "cc", "cfg-if", "libc", - "miniz_oxide", + "miniz_oxide 0.7.4", "object", "rustc-demangle", - "windows-targets 0.52.6", ] [[package]] @@ -821,6 +827,33 @@ dependencies = [ "thiserror", ] +[[package]] +name = "color-eyre" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55146f5e46f237f7423d74111267d4597b59b0dad0ffaf7303bce9945d843ad5" +dependencies = [ + "backtrace", + "color-spantrace", + "eyre", + "indenter", + "once_cell", + "owo-colors", + "tracing-error", +] + +[[package]] +name = "color-spantrace" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd6be1b2a7e382e2b98b43b2adcca6bb0e465af0bdd38123873ae61eb17a72c2" +dependencies = [ + "once_cell", + "owo-colors", + "tracing-core", + "tracing-error", +] + [[package]] name = "colorchoice" version = "1.0.2" @@ -1185,6 +1218,7 @@ dependencies = [ [[package]] name = "diesel-wasm-sqlite" version = "0.0.1" +source = "git+https://github.com/xmtp/diesel-wasm-sqlite?branch=main#eac6c9064f346b32e226379d60c624d9424e08bb" dependencies = [ "diesel", "diesel_derives", @@ -1899,7 +1933,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1b589b4dc103969ad3cf85c950899926ec64300a1a46d76c03a6072957036f0" dependencies = [ "crc32fast", - "miniz_oxide", + "miniz_oxide 0.8.0", ] [[package]] @@ -2132,9 +2166,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.31.1" +version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" +checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" [[package]] name = "glob" @@ -2477,9 +2511,9 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "0.14.30" +version = "0.14.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a152ddd61dfaec7273fe8419ab357f33aee0d914c5f4efbf0d96fa749eea5ec9" +checksum = "8c08302e8fa335b151b788c775ff56e7a03ae64ff85c548ee820fecb70356e85" dependencies = [ "bytes", "futures-channel", @@ -2501,9 +2535,9 @@ dependencies = [ [[package]] name = "hyper" -version = "1.4.1" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" +checksum = "bbbff0a806a4728c99295b254c8838933b5b082d75e3cb70c8dab21fdfbcfa9a" dependencies = [ "bytes", "futures-channel", @@ -2528,7 +2562,7 @@ checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", "http 0.2.12", - "hyper 0.14.30", + "hyper 0.14.31", "rustls 0.21.12", "tokio", "tokio-rustls 0.24.1", @@ -2542,7 +2576,7 @@ checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" dependencies = [ "futures-util", "http 1.1.0", - "hyper 1.4.1", + "hyper 1.5.0", "hyper-util", "rustls 0.23.14", "rustls-pki-types", @@ -2557,7 +2591,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" dependencies = [ - "hyper 1.4.1", + "hyper 1.5.0", "hyper-util", "pin-project-lite", "tokio", @@ -2571,7 +2605,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ "bytes", - "hyper 0.14.30", + "hyper 0.14.31", "native-tls", "tokio", "tokio-native-tls", @@ -2585,7 +2619,7 @@ checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" dependencies = [ "bytes", "http-body-util", - "hyper 1.4.1", + "hyper 1.5.0", "hyper-util", "native-tls", "tokio", @@ -2604,7 +2638,7 @@ dependencies = [ "futures-util", "http 1.1.0", "http-body 1.0.1", - "hyper 1.4.1", + "hyper 1.5.0", "pin-project-lite", "socket2", "tokio", @@ -2941,7 +2975,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -3128,6 +3162,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" +[[package]] +name = "miniz_oxide" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08" +dependencies = [ + "adler", +] + [[package]] name = "miniz_oxide" version = "0.8.0" @@ -3215,7 +3258,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "http-body-util", - "hyper 1.4.1", + "hyper 1.5.0", "hyper-util", "log", "rand", @@ -3425,9 +3468,9 @@ checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" [[package]] name = "object" -version = "0.36.5" +version = "0.32.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e" +checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" dependencies = [ "memchr", ] @@ -3640,6 +3683,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "owo-colors" +version = "3.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" + [[package]] name = "p256" version = "0.13.2" @@ -4373,7 +4422,7 @@ dependencies = [ "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", - "hyper 0.14.30", + "hyper 0.14.31", "hyper-rustls 0.24.2", "hyper-tls 0.5.0", "ipnet", @@ -4418,7 +4467,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "http-body-util", - "hyper 1.4.1", + "hyper 1.5.0", "hyper-rustls 0.27.3", "hyper-tls 0.6.0", "hyper-util", @@ -5732,7 +5781,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "http-body-util", - "hyper 1.4.1", + "hyper 1.5.0", "hyper-timeout", "hyper-util", "percent-encoding", @@ -5827,6 +5876,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-error" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d686ec1c0f384b1277f097b2f279a2ecc11afe8c133c1aabf036a27cb4cd206e" +dependencies = [ + "tracing", + "tracing-subscriber", ] [[package]] @@ -6211,6 +6271,12 @@ dependencies = [ "rand", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "value-bag" version = "1.9.0" @@ -6289,7 +6355,7 @@ dependencies = [ "futures-util", "headers", "http 0.2.12", - "hyper 0.14.30", + "hyper 0.14.31", "log", "mime", "mime_guess", @@ -6986,6 +7052,31 @@ dependencies = [ "xmtp_v2", ] +[[package]] +name = "xshell" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6db0ab86eae739efd1b054a8d3d16041914030ac4e01cd1dca0cf252fd8b6437" +dependencies = [ + "xshell-macros", +] + +[[package]] +name = "xshell-macros" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d422e8e38ec76e2f06ee439ccc765e9c6a9638b9e7c9f2e8255e4d41e8bd852" + +[[package]] +name = "xtask" +version = "0.0.1" +dependencies = [ + "color-eyre", + "tracing", + "tracing-subscriber", + "xshell", +] + [[package]] name = "yansi" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index 22cf9869e..700f2c267 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ members = [ "bindings_wasm", "bindings_node", "bindings_ffi", + "xtask" ] # Make the feature resolver explicit. diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index de12a89e3..18409adc3 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -251,11 +251,11 @@ impl FfiXmtpClient { }) } - pub fn group(&self, group_id: Vec) -> Result { - let convo = self.inner_client.group(group_id)?; - Ok(FfiGroup { + pub fn conversation(&self, conversation_id: Vec) -> Result { + let convo = self.inner_client.group(conversation_id)?; + Ok(FfiConversation { inner_client: self.inner_client.clone(), - group_id: convo.group_id, + conversation_id: convo.group_id, created_at_ns: convo.created_at_ns, }) } @@ -738,7 +738,7 @@ impl FfiConversations { &self, account_addresses: Vec, opts: FfiCreateGroupOptions, - ) -> Result, GenericError> { + ) -> Result, GenericError> { log::info!( "creating group with account addresses: {}", account_addresses.join(", ") @@ -784,23 +784,26 @@ impl FfiConversations { .await? }; - let out = Arc::new(FfiGroup { + let out = Arc::new(FfiConversation { inner_client: self.inner_client.clone(), - group_id: convo.group_id, + conversation_id: convo.group_id, created_at_ns: convo.created_at_ns, }); Ok(out) } - pub async fn create_dm(&self, account_address: String) -> Result, GenericError> { + pub async fn create_dm( + &self, + account_address: String, + ) -> Result, GenericError> { log::info!("creating dm with target address: {}", account_address); let convo = self.inner_client.create_dm(account_address).await?; - let out = Arc::new(FfiGroup { + let out = Arc::new(FfiConversation { inner_client: self.inner_client.clone(), - group_id: convo.group_id, + conversation_id: convo.group_id, created_at_ns: convo.created_at_ns, }); @@ -810,14 +813,14 @@ impl FfiConversations { pub async fn process_streamed_welcome_message( &self, envelope_bytes: Vec, - ) -> Result, GenericError> { + ) -> Result, GenericError> { let inner = self.inner_client.as_ref(); let group = inner .process_streamed_welcome_message(envelope_bytes) .await?; - let out = Arc::new(FfiGroup { + let out = Arc::new(FfiConversation { inner_client: self.inner_client.clone(), - group_id: group.group_id, + conversation_id: group.group_id, created_at_ns: group.created_at_ns, }); Ok(out) @@ -829,10 +832,10 @@ impl FfiConversations { Ok(()) } - pub async fn sync_all_groups(&self) -> Result { + pub async fn sync_all_conversations(&self) -> Result { let inner = self.inner_client.as_ref(); let groups = inner.find_groups(FindGroupParams { - include_dm_groups: true, + conversation_type: None, ..FindGroupParams::default() })?; @@ -857,21 +860,73 @@ impl FfiConversations { pub async fn list( &self, opts: FfiListConversationsOptions, - ) -> Result>, GenericError> { + ) -> Result>, GenericError> { + let inner = self.inner_client.as_ref(); + let convo_list: Vec> = inner + .find_groups(FindGroupParams { + allowed_states: None, + created_after_ns: opts.created_after_ns, + created_before_ns: opts.created_before_ns, + limit: opts.limit, + conversation_type: None, + })? + .into_iter() + .map(|group| { + Arc::new(FfiConversation { + inner_client: self.inner_client.clone(), + conversation_id: group.group_id, + created_at_ns: group.created_at_ns, + }) + }) + .collect(); + + Ok(convo_list) + } + + pub async fn list_groups( + &self, + opts: FfiListConversationsOptions, + ) -> Result>, GenericError> { + let inner = self.inner_client.as_ref(); + let convo_list: Vec> = inner + .find_groups(FindGroupParams { + allowed_states: None, + created_after_ns: opts.created_after_ns, + created_before_ns: opts.created_before_ns, + limit: opts.limit, + conversation_type: Some(ConversationType::Group), + })? + .into_iter() + .map(|group| { + Arc::new(FfiConversation { + inner_client: self.inner_client.clone(), + conversation_id: group.group_id, + created_at_ns: group.created_at_ns, + }) + }) + .collect(); + + Ok(convo_list) + } + + pub async fn list_dms( + &self, + opts: FfiListConversationsOptions, + ) -> Result>, GenericError> { let inner = self.inner_client.as_ref(); - let convo_list: Vec> = inner + let convo_list: Vec> = inner .find_groups(FindGroupParams { allowed_states: None, created_after_ns: opts.created_after_ns, created_before_ns: opts.created_before_ns, limit: opts.limit, - include_dm_groups: false, + conversation_type: Some(ConversationType::Dm), })? .into_iter() .map(|group| { - Arc::new(FfiGroup { + Arc::new(FfiConversation { inner_client: self.inner_client.clone(), - group_id: group.group_id, + conversation_id: group.group_id, created_at_ns: group.created_at_ns, }) }) @@ -880,18 +935,81 @@ impl FfiConversations { Ok(convo_list) } + pub async fn stream_groups( + &self, + callback: Box, + ) -> FfiStreamCloser { + let client = self.inner_client.clone(); + let handle = RustXmtpClient::stream_conversations_with_callback( + client.clone(), + Some(ConversationType::Group), + move |convo| { + callback.on_conversation(Arc::new(FfiConversation { + inner_client: client.clone(), + conversation_id: convo.group_id, + created_at_ns: convo.created_at_ns, + })) + }, + ); + + FfiStreamCloser::new(handle) + } + + pub async fn stream_dms(&self, callback: Box) -> FfiStreamCloser { + let client = self.inner_client.clone(); + let handle = RustXmtpClient::stream_conversations_with_callback( + client.clone(), + Some(ConversationType::Dm), + move |convo| { + callback.on_conversation(Arc::new(FfiConversation { + inner_client: client.clone(), + conversation_id: convo.group_id, + created_at_ns: convo.created_at_ns, + })) + }, + ); + + FfiStreamCloser::new(handle) + } + pub async fn stream(&self, callback: Box) -> FfiStreamCloser { let client = self.inner_client.clone(); let handle = RustXmtpClient::stream_conversations_with_callback( client.clone(), + None, move |convo| { - callback.on_conversation(Arc::new(FfiGroup { + callback.on_conversation(Arc::new(FfiConversation { inner_client: client.clone(), - group_id: convo.group_id, + conversation_id: convo.group_id, created_at_ns: convo.created_at_ns, })) }, - false, + ); + + FfiStreamCloser::new(handle) + } + + pub async fn stream_all_group_messages( + &self, + message_callback: Box, + ) -> FfiStreamCloser { + let handle = RustXmtpClient::stream_all_messages_with_callback( + self.inner_client.clone(), + Some(ConversationType::Group), + move |message| message_callback.on_message(message.into()), + ); + + FfiStreamCloser::new(handle) + } + + pub async fn stream_all_dm_messages( + &self, + message_callback: Box, + ) -> FfiStreamCloser { + let handle = RustXmtpClient::stream_all_messages_with_callback( + self.inner_client.clone(), + Some(ConversationType::Dm), + move |message| message_callback.on_message(message.into()), ); FfiStreamCloser::new(handle) @@ -903,6 +1021,7 @@ impl FfiConversations { ) -> FfiStreamCloser { let handle = RustXmtpClient::stream_all_messages_with_callback( self.inner_client.clone(), + None, move |message| message_callback.on_message(message.into()), ); @@ -911,14 +1030,14 @@ impl FfiConversations { } #[derive(uniffi::Object)] -pub struct FfiGroup { +pub struct FfiConversation { inner_client: Arc, - group_id: Vec, + conversation_id: Vec, created_at_ns: i64, } #[derive(uniffi::Record)] -pub struct FfiGroupMember { +pub struct FfiConversationMember { pub inbox_id: String, pub account_addresses: Vec, pub installation_ids: Vec>, @@ -962,7 +1081,7 @@ impl From for ConsentState { #[derive(uniffi::Enum)] pub enum FfiConsentEntityType { - GroupId, + ConversationId, InboxId, Address, } @@ -970,7 +1089,7 @@ pub enum FfiConsentEntityType { impl From for ConsentType { fn from(entity_type: FfiConsentEntityType) -> Self { match entity_type { - FfiConsentEntityType::GroupId => ConsentType::GroupId, + FfiConsentEntityType::ConversationId => ConsentType::ConversationId, FfiConsentEntityType::InboxId => ConsentType::InboxId, FfiConsentEntityType::Address => ConsentType::Address, } @@ -1007,11 +1126,11 @@ impl FfiCreateGroupOptions { } #[uniffi::export(async_runtime = "tokio")] -impl FfiGroup { +impl FfiConversation { pub async fn send(&self, content_bytes: Vec) -> Result, GenericError> { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1023,7 +1142,7 @@ impl FfiGroup { pub fn send_optimistic(&self, content_bytes: Vec) -> Result, GenericError> { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1036,7 +1155,7 @@ impl FfiGroup { pub async fn publish_messages(&self) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); group.publish_messages().await?; @@ -1046,7 +1165,7 @@ impl FfiGroup { pub async fn sync(&self) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1061,7 +1180,7 @@ impl FfiGroup { ) -> Result, GenericError> { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1082,13 +1201,13 @@ impl FfiGroup { Ok(messages) } - pub async fn process_streamed_group_message( + pub async fn process_streamed_conversation_message( &self, envelope_bytes: Vec, ) -> Result { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); let message = group.process_streamed_group_message(envelope_bytes).await?; @@ -1097,18 +1216,18 @@ impl FfiGroup { Ok(ffi_message) } - pub async fn list_members(&self) -> Result, GenericError> { + pub async fn list_members(&self) -> Result, GenericError> { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); - let members: Vec = group + let members: Vec = group .members() .await? .into_iter() - .map(|member| FfiGroupMember { + .map(|member| FfiConversationMember { inbox_id: member.inbox_id, account_addresses: member.account_addresses, installation_ids: member.installation_ids, @@ -1129,7 +1248,7 @@ impl FfiGroup { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1146,7 +1265,7 @@ impl FfiGroup { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1158,7 +1277,7 @@ impl FfiGroup { pub async fn remove_members(&self, account_addresses: Vec) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1173,7 +1292,7 @@ impl FfiGroup { ) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1185,7 +1304,7 @@ impl FfiGroup { pub async fn update_group_name(&self, group_name: String) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1197,7 +1316,7 @@ impl FfiGroup { pub fn group_name(&self) -> Result { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1213,7 +1332,7 @@ impl FfiGroup { ) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1227,7 +1346,7 @@ impl FfiGroup { pub fn group_image_url_square(&self) -> Result { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1242,7 +1361,7 @@ impl FfiGroup { ) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1254,7 +1373,7 @@ impl FfiGroup { pub fn group_description(&self) -> Result { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1269,7 +1388,7 @@ impl FfiGroup { ) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1283,7 +1402,7 @@ impl FfiGroup { pub fn group_pinned_frame_url(&self) -> Result { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1295,7 +1414,7 @@ impl FfiGroup { pub fn admin_list(&self) -> Result, GenericError> { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1307,7 +1426,7 @@ impl FfiGroup { pub fn super_admin_list(&self) -> Result, GenericError> { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1329,7 +1448,7 @@ impl FfiGroup { pub async fn add_admin(&self, inbox_id: String) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); group @@ -1342,7 +1461,7 @@ impl FfiGroup { pub async fn remove_admin(&self, inbox_id: String) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); group @@ -1355,7 +1474,7 @@ impl FfiGroup { pub async fn add_super_admin(&self, inbox_id: String) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); group @@ -1368,7 +1487,7 @@ impl FfiGroup { pub async fn remove_super_admin(&self, inbox_id: String) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); group @@ -1381,7 +1500,7 @@ impl FfiGroup { pub fn group_permissions(&self) -> Result, GenericError> { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1399,7 +1518,7 @@ impl FfiGroup { ) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); group @@ -1417,7 +1536,7 @@ impl FfiGroup { let inner_client = Arc::clone(&self.inner_client); let handle = MlsGroup::stream_with_callback( inner_client, - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, move |message| message_callback.on_message(message.into()), ); @@ -1432,7 +1551,7 @@ impl FfiGroup { pub fn is_active(&self) -> Result { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1442,7 +1561,7 @@ impl FfiGroup { pub fn consent_state(&self) -> Result { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1454,7 +1573,7 @@ impl FfiGroup { pub fn update_consent_state(&self, state: FfiConsentState) -> Result<(), GenericError> { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -1466,45 +1585,45 @@ impl FfiGroup { pub fn added_by_inbox_id(&self) -> Result { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); Ok(group.added_by_inbox_id()?) } - pub fn group_metadata(&self) -> Result, GenericError> { + pub fn group_metadata(&self) -> Result, GenericError> { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); let metadata = group.metadata(group.mls_provider()?)?; - Ok(Arc::new(FfiGroupMetadata { + Ok(Arc::new(FfiConversationMetadata { inner: Arc::new(metadata), })) } } #[uniffi::export] -impl FfiGroup { +impl FfiConversation { pub fn id(&self) -> Vec { - self.group_id.clone() + self.conversation_id.clone() } } #[derive(uniffi::Enum, PartialEq)] -pub enum FfiGroupMessageKind { +pub enum FfiConversationMessageKind { Application, MembershipChange, } -impl From for FfiGroupMessageKind { +impl From for FfiConversationMessageKind { fn from(kind: GroupMessageKind) -> Self { match kind { - GroupMessageKind::Application => FfiGroupMessageKind::Application, - GroupMessageKind::MembershipChange => FfiGroupMessageKind::MembershipChange, + GroupMessageKind::Application => FfiConversationMessageKind::Application, + GroupMessageKind::MembershipChange => FfiConversationMessageKind::MembershipChange, } } } @@ -1543,7 +1662,7 @@ pub struct FfiMessage { pub convo_id: Vec, pub sender_inbox_id: String, pub content: Vec, - pub kind: FfiGroupMessageKind, + pub kind: FfiConversationMessageKind, pub delivery_status: FfiDeliveryStatus, } @@ -1651,16 +1770,16 @@ pub trait FfiMessageCallback: Send + Sync { #[uniffi::export(callback_interface)] pub trait FfiConversationCallback: Send + Sync { - fn on_conversation(&self, conversation: Arc); + fn on_conversation(&self, conversation: Arc); } #[derive(uniffi::Object)] -pub struct FfiGroupMetadata { +pub struct FfiConversationMetadata { inner: Arc, } #[uniffi::export] -impl FfiGroupMetadata { +impl FfiConversationMetadata { pub fn creator_inbox_id(&self) -> String { self.inner.creator_inbox_id.clone() } @@ -1720,10 +1839,10 @@ mod tests { use super::{create_client, FfiMessage, FfiMessageCallback, FfiXmtpClient}; use crate::{ get_inbox_id_for_address, inbox_owner::SigningError, logger::FfiLogger, FfiConsent, - FfiConsentEntityType, FfiConsentState, FfiConversationCallback, FfiCreateGroupOptions, - FfiGroup, FfiGroupMessageKind, FfiGroupPermissionsOptions, FfiInboxOwner, - FfiListConversationsOptions, FfiListMessagesOptions, FfiMetadataField, FfiPermissionPolicy, - FfiPermissionPolicySet, FfiPermissionUpdateType, + FfiConsentEntityType, FfiConsentState, FfiConversation, FfiConversationCallback, + FfiConversationMessageKind, FfiCreateGroupOptions, FfiGroupPermissionsOptions, + FfiInboxOwner, FfiListConversationsOptions, FfiListMessagesOptions, FfiMetadataField, + FfiPermissionPolicy, FfiPermissionPolicySet, FfiPermissionUpdateType, }; use ethers::utils::hex; use rand::distributions::{Alphanumeric, DistString}; @@ -1789,7 +1908,7 @@ mod tests { struct RustStreamCallback { num_messages: Arc, messages: Arc>>, - conversations: Arc>>>, + conversations: Arc>>>, notify: Arc, } @@ -1798,10 +1917,11 @@ mod tests { self.num_messages.load(Ordering::SeqCst) } - pub async fn wait_for_delivery(&self) -> Result<(), Elapsed> { - tokio::time::timeout(std::time::Duration::from_secs(60), async { - self.notify.notified().await - }) + pub async fn wait_for_delivery(&self, timeout_secs: Option) -> Result<(), Elapsed> { + tokio::time::timeout( + std::time::Duration::from_secs(timeout_secs.unwrap_or(60)), + async { self.notify.notified().await }, + ) .await?; Ok(()) } @@ -1821,7 +1941,7 @@ mod tests { } impl FfiConversationCallback for RustStreamCallback { - fn on_conversation(&self, group: Arc) { + fn on_conversation(&self, group: Arc) { log::debug!("received conversation"); let _ = self.num_messages.fetch_add(1, Ordering::SeqCst); let mut convos = self.conversations.lock().unwrap(); @@ -1888,12 +2008,12 @@ mod tests { new_test_client_with_wallet(wallet).await } - impl FfiGroup { + impl FfiConversation { #[cfg(test)] async fn update_installations(&self) -> Result<(), GroupError> { let group = MlsGroup::new( self.inner_client.clone(), - self.group_id.clone(), + self.conversation_id.clone(), self.created_at_ns, ); @@ -2391,7 +2511,7 @@ mod tests { .update_group_name("Old Name".to_string()) .await .unwrap(); - message_callbacks.wait_for_delivery().await.unwrap(); + message_callbacks.wait_for_delivery(None).await.unwrap(); let bo_groups = bo .conversations() @@ -2404,14 +2524,14 @@ mod tests { .update_group_name("Old Name2".to_string()) .await .unwrap(); - message_callbacks.wait_for_delivery().await.unwrap(); + message_callbacks.wait_for_delivery(None).await.unwrap(); // Uncomment the following lines to add more group name updates bo_group .update_group_name("Old Name3".to_string()) .await .unwrap(); - message_callbacks.wait_for_delivery().await.unwrap(); + message_callbacks.wait_for_delivery(None).await.unwrap(); assert_eq!(message_callbacks.message_count(), 3); @@ -2444,8 +2564,8 @@ mod tests { let alix_group1 = alix_groups[0].clone(); let alix_group5 = alix_groups[5].clone(); - let bo_group1 = bo.group(alix_group1.id()).unwrap(); - let bo_group5 = bo.group(alix_group5.id()).unwrap(); + let bo_group1 = bo.conversation(alix_group1.id()).unwrap(); + let bo_group5 = bo.conversation(alix_group5.id()).unwrap(); alix_group1.send("alix1".as_bytes().to_vec()).await.unwrap(); alix_group5.send("alix1".as_bytes().to_vec()).await.unwrap(); @@ -2459,7 +2579,7 @@ mod tests { assert_eq!(bo_messages1.len(), 0); assert_eq!(bo_messages5.len(), 0); - bo.conversations().sync_all_groups().await.unwrap(); + bo.conversations().sync_all_conversations().await.unwrap(); let bo_messages1 = bo_group1 .find_messages(FfiListMessagesOptions::default()) @@ -2487,7 +2607,7 @@ mod tests { .unwrap(); } bo.conversations().sync().await.unwrap(); - let num_groups_synced_1: u32 = bo.conversations().sync_all_groups().await.unwrap(); + let num_groups_synced_1: u32 = bo.conversations().sync_all_conversations().await.unwrap(); assert!(num_groups_synced_1 == 30); // Remove bo from all groups and sync @@ -2504,11 +2624,11 @@ mod tests { } // First sync after removal needs to process all groups and set them to inactive - let num_groups_synced_2: u32 = bo.conversations().sync_all_groups().await.unwrap(); + let num_groups_synced_2: u32 = bo.conversations().sync_all_conversations().await.unwrap(); assert!(num_groups_synced_2 == 30); // Second sync after removal will not process inactive groups - let num_groups_synced_3: u32 = bo.conversations().sync_all_groups().await.unwrap(); + let num_groups_synced_3: u32 = bo.conversations().sync_all_conversations().await.unwrap(); assert!(num_groups_synced_3 == 0); } @@ -2531,7 +2651,7 @@ mod tests { .unwrap(); bo.conversations().sync().await.unwrap(); - let bo_group = bo.group(alix_group.id()).unwrap(); + let bo_group = bo.conversation(alix_group.id()).unwrap(); bo_group.send("bo1".as_bytes().to_vec()).await.unwrap(); // Temporary workaround for OpenMLS issue - make sure Alix's epoch is up-to-date @@ -2613,8 +2733,8 @@ mod tests { client2.conversations().sync().await.unwrap(); // Find groups for both clients - let client1_group = client1.group(group.id()).unwrap(); - let client2_group = client2.group(group.id()).unwrap(); + let client1_group = client1.conversation(group.id()).unwrap(); + let client2_group = client2.conversation(group.id()).unwrap(); // Sync both groups client1_group.sync().await.unwrap(); @@ -2646,7 +2766,7 @@ mod tests { assert_eq!(client1_members.len(), 2); client2.conversations().sync().await.unwrap(); - let client2_group = client2.group(group.id()).unwrap(); + let client2_group = client2.conversation(group.id()).unwrap(); let client2_members = client2_group.list_members().await.unwrap(); assert_eq!(client2_members.len(), 2); } @@ -2685,9 +2805,9 @@ mod tests { caro.conversations().sync().await.unwrap(); // Alix and Caro find the group - let alix_group = alix.group(group.id()).unwrap(); - let bo_group = bo.group(group.id()).unwrap(); - let caro_group = caro.group(group.id()).unwrap(); + let alix_group = alix.conversation(group.id()).unwrap(); + let bo_group = bo.conversation(group.id()).unwrap(); + let caro_group = caro.conversation(group.id()).unwrap(); alix_group.update_installations().await.unwrap(); log::info!("Alix sending first message"); @@ -2727,7 +2847,7 @@ mod tests { // New installation of bo finds the group bo2.conversations().sync().await.unwrap(); - let bo2_group = bo2.group(group.id()).unwrap(); + let bo2_group = bo2.conversation(group.id()).unwrap(); log::info!("Bo sending fourth message"); // Bo sends a message to the group @@ -2794,7 +2914,7 @@ mod tests { bo.conversations().sync().await.unwrap(); - let bo_group = bo.group(alix_group.id()).unwrap(); + let bo_group = bo.conversation(alix_group.id()).unwrap(); // Move forward 4 epochs alix_group @@ -2865,7 +2985,7 @@ mod tests { .unwrap(); bo.conversations().sync().await.unwrap(); - let bo_group = bo.group(alix_group.id()).unwrap(); + let bo_group = bo.conversation(alix_group.id()).unwrap(); bo_group.send("bo1".as_bytes().to_vec()).await.unwrap(); alix_group.send("alix1".as_bytes().to_vec()).await.unwrap(); @@ -2921,7 +3041,7 @@ mod tests { .unwrap(); bo.conversations().sync().await.unwrap(); - let bo_group = bo.group(alix_group.id()).unwrap(); + let bo_group = bo.conversation(alix_group.id()).unwrap(); alix_group.sync().await.unwrap(); let alix_members = alix_group.list_members().await.unwrap(); @@ -2949,7 +3069,7 @@ mod tests { let bo_messages = bo_group .find_messages(FfiListMessagesOptions::default()) .unwrap(); - assert!(bo_messages.first().unwrap().kind == FfiGroupMessageKind::MembershipChange); + assert!(bo_messages.first().unwrap().kind == FfiConversationMessageKind::MembershipChange); assert_eq!(bo_messages.len(), 1); let bo_members = bo_group.list_members().await.unwrap(); @@ -2992,9 +3112,9 @@ mod tests { .update_group_name("hello".to_string()) .await .unwrap(); - message_callbacks.wait_for_delivery().await.unwrap(); + message_callbacks.wait_for_delivery(None).await.unwrap(); alix_group.send("hello1".as_bytes().to_vec()).await.unwrap(); - message_callbacks.wait_for_delivery().await.unwrap(); + message_callbacks.wait_for_delivery(None).await.unwrap(); let bo_groups = bo .conversations() @@ -3011,9 +3131,9 @@ mod tests { assert_eq!(bo_messages1.len(), first_msg_check); bo_group.send("hello2".as_bytes().to_vec()).await.unwrap(); - message_callbacks.wait_for_delivery().await.unwrap(); + message_callbacks.wait_for_delivery(None).await.unwrap(); bo_group.send("hello3".as_bytes().to_vec()).await.unwrap(); - message_callbacks.wait_for_delivery().await.unwrap(); + message_callbacks.wait_for_delivery(None).await.unwrap(); alix_group.sync().await.unwrap(); @@ -3023,7 +3143,7 @@ mod tests { assert_eq!(alix_messages.len(), second_msg_check); alix_group.send("hello4".as_bytes().to_vec()).await.unwrap(); - message_callbacks.wait_for_delivery().await.unwrap(); + message_callbacks.wait_for_delivery(None).await.unwrap(); bo_group.sync().await.unwrap(); let bo_messages2 = bo_group @@ -3056,7 +3176,7 @@ mod tests { .await .unwrap(); - stream_callback.wait_for_delivery().await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); assert_eq!(stream_callback.message_count(), 1); // Create another group and add bola @@ -3067,7 +3187,7 @@ mod tests { ) .await .unwrap(); - stream_callback.wait_for_delivery().await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); assert_eq!(stream_callback.message_count(), 2); @@ -3099,7 +3219,7 @@ mod tests { stream.wait_for_ready().await; alix_group.send("first".as_bytes().to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); let bo_group = bo .conversations() @@ -3112,11 +3232,11 @@ mod tests { let _ = caro.inner_client.sync_welcomes().await.unwrap(); bo_group.send("second".as_bytes().to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); alix_group.send("third".as_bytes().to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); bo_group.send("fourth".as_bytes().to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); assert_eq!(stream_callback.message_count(), 4); stream.end_and_wait().await.unwrap(); @@ -3128,7 +3248,7 @@ mod tests { let amal = new_test_client().await; let bola = new_test_client().await; - let amal_group: Arc = amal + let amal_group: Arc = amal .conversations() .create_group( vec![bola.account_address.clone()], @@ -3138,7 +3258,9 @@ mod tests { .unwrap(); bola.inner_client.sync_welcomes().await.unwrap(); - let bola_group = bola.group(amal_group.group_id.clone()).unwrap(); + let bola_group = bola + .conversation(amal_group.conversation_id.clone()) + .unwrap(); let stream_callback = RustStreamCallback::default(); let stream_closer = bola_group.stream(Box::new(stream_callback.clone())).await; @@ -3146,13 +3268,13 @@ mod tests { stream_closer.wait_for_ready().await; amal_group.send("hello".as_bytes().to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); amal_group .send("goodbye".as_bytes().to_vec()) .await .unwrap(); - stream_callback.wait_for_delivery().await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); assert_eq!(stream_callback.message_count(), 2); stream_closer.end_and_wait().await.unwrap(); @@ -3185,9 +3307,9 @@ mod tests { stream_closer.wait_for_ready().await; amal_group.send(b"hello1".to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); amal_group.send(b"hello2".to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); assert_eq!(stream_callback.message_count(), 2); assert!(!stream_closer.is_closed()); @@ -3196,7 +3318,7 @@ mod tests { .remove_members_by_inbox_id(vec![bola.inbox_id().clone()]) .await .unwrap(); - stream_callback.wait_for_delivery().await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); assert_eq!(stream_callback.message_count(), 3); // Member removal transcript message // amal_group.send(b"hello3".to_vec()).await.unwrap(); @@ -3215,7 +3337,7 @@ mod tests { assert_eq!(stream_callback.message_count(), 3); // Don't receive transcript messages while removed amal_group.send("hello4".as_bytes().to_vec()).await.unwrap(); - stream_callback.wait_for_delivery().await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); assert_eq!(stream_callback.message_count(), 4); // Receiving messages again assert!(!stream_closer.is_closed()); @@ -3296,10 +3418,10 @@ mod tests { ) .await .unwrap(); - group_callback.wait_for_delivery().await.unwrap(); + group_callback.wait_for_delivery(None).await.unwrap(); alix_group.send("hello1".as_bytes().to_vec()).await.unwrap(); - message_callback.wait_for_delivery().await.unwrap(); + message_callback.wait_for_delivery(None).await.unwrap(); assert_eq!(group_callback.message_count(), 1); assert_eq!(message_callback.message_count(), 1); @@ -3736,27 +3858,226 @@ mod tests { let alix_conversations = alix.conversations(); let bola_conversations = bola.conversations(); - let _alix_group = alix_conversations + let _alix_dm = alix_conversations .create_dm(bola.account_address.clone()) .await .unwrap(); - let alix_num_sync = alix_conversations.sync_all_groups().await.unwrap(); + let alix_num_sync = alix_conversations.sync_all_conversations().await.unwrap(); bola_conversations.sync().await.unwrap(); - let bola_num_sync = bola_conversations.sync_all_groups().await.unwrap(); + let bola_num_sync = bola_conversations.sync_all_conversations().await.unwrap(); assert_eq!(alix_num_sync, 1); assert_eq!(bola_num_sync, 1); let alix_groups = alix_conversations - .list(FfiListConversationsOptions::default()) + .list_groups(FfiListConversationsOptions::default()) .await .unwrap(); assert_eq!(alix_groups.len(), 0); let bola_groups = bola_conversations - .list(FfiListConversationsOptions::default()) + .list_groups(FfiListConversationsOptions::default()) .await .unwrap(); assert_eq!(bola_groups.len(), 0); + + let alix_dms = alix_conversations + .list_dms(FfiListConversationsOptions::default()) + .await + .unwrap(); + assert_eq!(alix_dms.len(), 1); + + let bola_dms = bola_conversations + .list_dms(FfiListConversationsOptions::default()) + .await + .unwrap(); + assert_eq!(bola_dms.len(), 1); + + let alix_conversations = alix_conversations + .list(FfiListConversationsOptions::default()) + .await + .unwrap(); + assert_eq!(alix_conversations.len(), 1); + + let bola_conversations = bola_conversations + .list(FfiListConversationsOptions::default()) + .await + .unwrap(); + assert_eq!(bola_conversations.len(), 1); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_dm_streaming() { + let alix = new_test_client().await; + let bo = new_test_client().await; + + // Stream all conversations + let stream_callback = RustStreamCallback::default(); + let stream = bo + .conversations() + .stream(Box::new(stream_callback.clone())) + .await; + + alix.conversations() + .create_group( + vec![bo.account_address.clone()], + FfiCreateGroupOptions::default(), + ) + .await + .unwrap(); + + stream_callback.wait_for_delivery(None).await.unwrap(); + + assert_eq!(stream_callback.message_count(), 1); + alix.conversations() + .create_dm(bo.account_address.clone()) + .await + .unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); + + assert_eq!(stream_callback.message_count(), 2); + + stream.end_and_wait().await.unwrap(); + assert!(stream.is_closed()); + + // Stream just groups + let stream_callback = RustStreamCallback::default(); + let stream = bo + .conversations() + .stream_groups(Box::new(stream_callback.clone())) + .await; + + alix.conversations() + .create_group( + vec![bo.account_address.clone()], + FfiCreateGroupOptions::default(), + ) + .await + .unwrap(); + + stream_callback.wait_for_delivery(None).await.unwrap(); + + assert_eq!(stream_callback.message_count(), 1); + alix.conversations() + .create_dm(bo.account_address.clone()) + .await + .unwrap(); + let result = stream_callback.wait_for_delivery(Some(2)).await; + assert!(result.is_err(), "Stream unexpectedly received a DM"); + assert_eq!(stream_callback.message_count(), 1); + + stream.end_and_wait().await.unwrap(); + assert!(stream.is_closed()); + + // Stream just dms + let stream_callback = RustStreamCallback::default(); + let stream = bo + .conversations() + .stream_dms(Box::new(stream_callback.clone())) + .await; + + alix.conversations() + .create_dm(bo.account_address.clone()) + .await + .unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); + assert_eq!(stream_callback.message_count(), 1); + + alix.conversations() + .create_group( + vec![bo.account_address.clone()], + FfiCreateGroupOptions::default(), + ) + .await + .unwrap(); + + let result = stream_callback.wait_for_delivery(Some(2)).await; + assert!(result.is_err(), "Stream unexpectedly received a Group"); + assert_eq!(stream_callback.message_count(), 1); + + stream.end_and_wait().await.unwrap(); + assert!(stream.is_closed()); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_stream_all_dm_messages() { + let alix = new_test_client().await; + let bo = new_test_client().await; + let alix_dm = alix + .conversations() + .create_dm(bo.account_address.clone()) + .await + .unwrap(); + + let alix_group = alix + .conversations() + .create_group( + vec![bo.account_address.clone()], + FfiCreateGroupOptions::default(), + ) + .await + .unwrap(); + + // Stream all conversations + let stream_callback = RustStreamCallback::default(); + let stream = bo + .conversations() + .stream_all_messages(Box::new(stream_callback.clone())) + .await; + stream.wait_for_ready().await; + + alix_group.send("first".as_bytes().to_vec()).await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); + assert_eq!(stream_callback.message_count(), 1); + + alix_dm.send("second".as_bytes().to_vec()).await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); + assert_eq!(stream_callback.message_count(), 2); + + stream.end_and_wait().await.unwrap(); + assert!(stream.is_closed()); + + // Stream just groups + let stream_callback = RustStreamCallback::default(); + let stream = bo + .conversations() + .stream_all_group_messages(Box::new(stream_callback.clone())) + .await; + stream.wait_for_ready().await; + + alix_group.send("first".as_bytes().to_vec()).await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); + assert_eq!(stream_callback.message_count(), 1); + + alix_dm.send("second".as_bytes().to_vec()).await.unwrap(); + let result = stream_callback.wait_for_delivery(Some(2)).await; + assert!(result.is_err(), "Stream unexpectedly received a DM message"); + assert_eq!(stream_callback.message_count(), 1); + + stream.end_and_wait().await.unwrap(); + assert!(stream.is_closed()); + + // Stream just dms + let stream_callback = RustStreamCallback::default(); + let stream = bo + .conversations() + .stream_all_dm_messages(Box::new(stream_callback.clone())) + .await; + stream.wait_for_ready().await; + + alix_dm.send("first".as_bytes().to_vec()).await.unwrap(); + stream_callback.wait_for_delivery(None).await.unwrap(); + assert_eq!(stream_callback.message_count(), 1); + + alix_group.send("second".as_bytes().to_vec()).await.unwrap(); + let result = stream_callback.wait_for_delivery(Some(2)).await; + assert!( + result.is_err(), + "Stream unexpectedly received a Group message" + ); + assert_eq!(stream_callback.message_count(), 1); + + stream.end_and_wait().await.unwrap(); + assert!(stream.is_closed()); } #[tokio::test(flavor = "multi_thread", worker_threads = 5)] @@ -3777,7 +4098,7 @@ mod tests { assert_eq!(alix_initial_consent, FfiConsentState::Allowed); bo.conversations().sync().await.unwrap(); - let bo_group = bo.group(alix_group.id()).unwrap(); + let bo_group = bo.conversation(alix_group.id()).unwrap(); let bo_initial_consent = bo_group.consent_state().unwrap(); assert_eq!(bo_initial_consent, FfiConsentState::Unknown); @@ -3789,7 +4110,7 @@ mod tests { assert_eq!(alix_updated_consent, FfiConsentState::Denied); bo.set_consent_states(vec![FfiConsent { state: FfiConsentState::Allowed, - entity_type: FfiConsentEntityType::GroupId, + entity_type: FfiConsentEntityType::ConversationId, entity: hex::encode(bo_group.id()), }]) .await @@ -3798,6 +4119,42 @@ mod tests { assert_eq!(bo_updated_consent, FfiConsentState::Allowed); } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_set_and_get_dm_consent() { + let alix = new_test_client().await; + let bo = new_test_client().await; + + let alix_dm = alix + .conversations() + .create_dm(bo.account_address.clone()) + .await + .unwrap(); + + let alix_initial_consent = alix_dm.consent_state().unwrap(); + assert_eq!(alix_initial_consent, FfiConsentState::Allowed); + + bo.conversations().sync().await.unwrap(); + let bo_dm = bo.conversation(alix_dm.id()).unwrap(); + + let bo_initial_consent = bo_dm.consent_state().unwrap(); + assert_eq!(bo_initial_consent, FfiConsentState::Unknown); + + alix_dm + .update_consent_state(FfiConsentState::Denied) + .unwrap(); + let alix_updated_consent = alix_dm.consent_state().unwrap(); + assert_eq!(alix_updated_consent, FfiConsentState::Denied); + bo.set_consent_states(vec![FfiConsent { + state: FfiConsentState::Allowed, + entity_type: FfiConsentEntityType::ConversationId, + entity: hex::encode(bo_dm.id()), + }]) + .await + .unwrap(); + let bo_updated_consent = bo_dm.consent_state().unwrap(); + assert_eq!(bo_updated_consent, FfiConsentState::Allowed); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] async fn test_set_and_get_member_consent() { let alix = new_test_client().await; diff --git a/bindings_node/src/consent_state.rs b/bindings_node/src/consent_state.rs index ecc1ba8a7..a674f3090 100644 --- a/bindings_node/src/consent_state.rs +++ b/bindings_node/src/consent_state.rs @@ -38,7 +38,7 @@ pub enum NapiConsentEntityType { impl From for ConsentType { fn from(entity_type: NapiConsentEntityType) -> Self { match entity_type { - NapiConsentEntityType::GroupId => ConsentType::GroupId, + NapiConsentEntityType::GroupId => ConsentType::ConversationId, NapiConsentEntityType::InboxId => ConsentType::InboxId, NapiConsentEntityType::Address => ConsentType::Address, } diff --git a/bindings_node/src/conversations.rs b/bindings_node/src/conversations.rs index c3ae47033..bcdc4670e 100644 --- a/bindings_node/src/conversations.rs +++ b/bindings_node/src/conversations.rs @@ -7,6 +7,7 @@ use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFun use napi::JsFunction; use napi_derive::napi; use xmtp_mls::client::FindGroupParams; +use xmtp_mls::groups::group_metadata::ConversationType; use xmtp_mls::groups::{GroupMetadataOptions, PreconfiguredPolicies}; use crate::messages::NapiMessage; @@ -200,6 +201,7 @@ impl NapiConversations { let client = self.inner_client.clone(); let stream_closer = RustXmtpClient::stream_conversations_with_callback( client.clone(), + Some(ConversationType::Group), move |convo| { tsfn.call( Ok(NapiGroup::new( @@ -210,7 +212,6 @@ impl NapiConversations { ThreadsafeFunctionCallMode::Blocking, ); }, - false, ); Ok(NapiStreamCloser::new(stream_closer)) @@ -222,6 +223,7 @@ impl NapiConversations { callback.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?; let stream_closer = RustXmtpClient::stream_all_messages_with_callback( self.inner_client.clone(), + Some(ConversationType::Group), move |message| { tsfn.call(Ok(message.into()), ThreadsafeFunctionCallMode::Blocking); }, diff --git a/bindings_wasm/src/consent_state.rs b/bindings_wasm/src/consent_state.rs index 0b13f3df9..8c94d89c0 100644 --- a/bindings_wasm/src/consent_state.rs +++ b/bindings_wasm/src/consent_state.rs @@ -40,7 +40,7 @@ pub enum WasmConsentEntityType { impl From for ConsentType { fn from(entity_type: WasmConsentEntityType) -> Self { match entity_type { - WasmConsentEntityType::GroupId => ConsentType::GroupId, + WasmConsentEntityType::GroupId => ConsentType::ConversationId, WasmConsentEntityType::InboxId => ConsentType::InboxId, WasmConsentEntityType::Address => ConsentType::Address, } diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index d373d29e3..1dc3d690c 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -38,8 +38,9 @@ use xmtp_proto::xmtp::mls::api::v1::{ use crate::{ api::ApiClientWrapper, groups::{ - group_permissions::PolicySet, validated_commit::CommitValidationError, GroupError, - GroupMetadataOptions, IntentError, MlsGroup, + group_metadata::ConversationType, group_permissions::PolicySet, + validated_commit::CommitValidationError, GroupError, GroupMetadataOptions, IntentError, + MlsGroup, }, identity::{parse_credential, Identity, IdentityError}, identity_updates::{load_identity_updates, IdentityUpdateError}, @@ -222,7 +223,7 @@ pub struct FindGroupParams { pub created_after_ns: Option, pub created_before_ns: Option, pub limit: Option, - pub include_dm_groups: bool, + pub conversation_type: Option, } /// Clients manage access to the network, identity, and data store @@ -665,7 +666,7 @@ where params.created_after_ns, params.created_before_ns, params.limit, - params.include_dm_groups, + params.conversation_type, )? .into_iter() .map(|stored_group| { diff --git a/xmtp_mls/src/groups/group_metadata.rs b/xmtp_mls/src/groups/group_metadata.rs index 333125022..2fc55a9b0 100644 --- a/xmtp_mls/src/groups/group_metadata.rs +++ b/xmtp_mls/src/groups/group_metadata.rs @@ -50,7 +50,8 @@ impl TryFrom for Vec { type Error = GroupMetadataError; fn try_from(value: GroupMetadata) -> Result { - let conversation_type: ConversationTypeProto = value.conversation_type.clone().into(); + let conversation_type: ConversationTypeProto = value.conversation_type.into( + ); let proto_val = GroupMetadataProto { conversation_type: conversation_type as i32, creator_inbox_id: value.creator_inbox_id.clone(), @@ -100,11 +101,11 @@ impl TryFrom<&Extensions> for GroupMetadata { /** * XMTP supports the following types of conversation * - * Group: A conversation with 1->N members and complex permissions and roles - * DM: A conversation between 2 members with simplified permissions - * Sync: A conversation between all the devices of a single member with simplified permissions + * *Group*: A conversation with 1->N members and complex permissions and roles + * *DM*: A conversation between 2 members with simplified permissions + * *Sync*: A conversation between all the devices of a single member with simplified permissions */ -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Copy, Clone, PartialEq)] pub enum ConversationType { Group, Dm, diff --git a/xmtp_mls/src/groups/message_history.rs b/xmtp_mls/src/groups/message_history.rs index 1dc1c45b1..677afc244 100644 --- a/xmtp_mls/src/groups/message_history.rs +++ b/xmtp_mls/src/groups/message_history.rs @@ -26,6 +26,7 @@ use xmtp_proto::{ }, }; +use super::group_metadata::ConversationType; use super::{GroupError, MlsGroup}; use crate::XmtpApi; @@ -137,7 +138,7 @@ where pub async fn ensure_member_of_all_groups(&self, inbox_id: String) -> Result<(), GroupError> { let conn = self.store().conn()?; - let groups = conn.find_groups(None, None, None, None, false)?; + let groups = conn.find_groups(None, None, None, None, Some(ConversationType::Group))?; for group in groups { let group = self.group(group.id)?; Box::pin(group.add_members_by_inbox_id(vec![inbox_id.clone()])).await?; @@ -386,7 +387,7 @@ where self.sync_welcomes().await?; let conn = self.store().conn()?; - let groups = conn.find_groups(None, None, None, None, false)?; + let groups = conn.find_groups(None, None, None, None, Some(ConversationType::Group))?; for crate::storage::group::StoredGroup { id, .. } in groups.into_iter() { let group = self.group(id)?; Box::pin(group.sync()).await?; @@ -504,14 +505,14 @@ where async fn prepare_groups_to_sync(&self) -> Result, MessageHistoryError> { let conn = self.store().conn()?; - Ok(conn.find_groups(None, None, None, None, false)?) + Ok(conn.find_groups(None, None, None, None, Some(ConversationType::Group))?) } async fn prepare_messages_to_sync( &self, ) -> Result, MessageHistoryError> { let conn = self.store().conn()?; - let groups = conn.find_groups(None, None, None, None, false)?; + let groups = conn.find_groups(None, None, None, None, Some(ConversationType::Group))?; let mut all_messages: Vec = vec![]; for StoredGroup { id, .. } in groups.into_iter() { diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 5fcd77c38..c95feb0c8 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -394,11 +394,10 @@ impl MlsGroup { ); stored_group.store(provider.conn_ref())?; - Ok(Self::new_from_arc( - client.clone(), - group_id, - stored_group.created_at_ns, - )) + let new_group = Self::new_from_arc(client.clone(), group_id, stored_group.created_at_ns); + // Consent state defaults to allowed when the user creates the group + new_group.update_consent_state(ConsentState::Allowed)?; + Ok(new_group) } // Create a group from a decrypted and decoded welcome message @@ -998,8 +997,10 @@ impl MlsGroup { /// Find the `consent_state` of the group pub fn consent_state(&self) -> Result { let conn = self.context().store().conn()?; - let record = - conn.get_consent_record(hex::encode(self.group_id.clone()), ConsentType::GroupId)?; + let record = conn.get_consent_record( + hex::encode(self.group_id.clone()), + ConsentType::ConversationId, + )?; match record { Some(rec) => Ok(rec.state), @@ -1010,7 +1011,7 @@ impl MlsGroup { pub fn update_consent_state(&self, state: ConsentState) -> Result<(), GroupError> { let conn = self.context().store().conn()?; conn.insert_or_replace_consent_records(vec![StoredConsentRecord::new( - ConsentType::GroupId, + ConsentType::ConversationId, state, hex::encode(self.group_id.clone()), )])?; @@ -3196,7 +3197,7 @@ pub(crate) mod tests { let _ = bola.sync_welcomes().await; let bola_groups = bola .find_groups(FindGroupParams { - include_dm_groups: true, + conversation_type: None, ..FindGroupParams::default() }) .unwrap(); diff --git a/xmtp_mls/src/groups/subscriptions.rs b/xmtp_mls/src/groups/subscriptions.rs index 6005099db..84b3175d6 100644 --- a/xmtp_mls/src/groups/subscriptions.rs +++ b/xmtp_mls/src/groups/subscriptions.rs @@ -163,38 +163,28 @@ where .map(move |res| { let group_id_to_info = group_id_to_info.clone(); async move { - match res { - Ok(envelope) => { - tracing::info!("Received message streaming payload"); - let group_id = extract_group_id(&envelope)?; - tracing::info!("Extracted group id {}", hex::encode(&group_id)); - let stream_info = group_id_to_info.get(&group_id).ok_or( - ClientError::StreamInconsistency( - "Received message for a non-subscribed group".to_string(), - ), - )?; - let mls_group = MlsGroup::new( - client.clone(), - group_id, - stream_info.convo_created_at_ns, - ); - mls_group.process_stream_entry(envelope).await - } - Err(err) => Err(GroupError::Api(err)), - } + let envelope = res.map_err(GroupError::from)?; + tracing::info!("Received message streaming payload"); + let group_id = extract_group_id(&envelope)?; + tracing::info!("Extracted group id {}", hex::encode(&group_id)); + let stream_info = + group_id_to_info + .get(&group_id) + .ok_or(ClientError::StreamInconsistency( + "Received message for a non-subscribed group".to_string(), + ))?; + let mls_group = + MlsGroup::new(client.clone(), group_id, stream_info.convo_created_at_ns); + mls_group.process_stream_entry(envelope).await } }) .filter_map(|res| async { - match res.await { - Ok(Some(message)) => Some(message), - Ok(None) => { + match crate::optify!(res.await, "Error processing stream entry").flatten() { + Some(message) => Some(message), + None => { tracing::info!("Skipped message streaming payload"); None } - Err(err) => { - tracing::error!("Error processing stream entry: {:?}", err); - None - } } }); Ok(stream) @@ -231,7 +221,6 @@ pub(crate) mod tests { wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_dedicated_worker); use super::*; - use core::time::Duration; use tokio_stream::wrappers::UnboundedReceiverStream; use xmtp_cryptography::utils::generate_local_wallet; @@ -302,7 +291,7 @@ pub(crate) mod tests { let bola_group = Arc::new(bola_groups.first().unwrap().clone()); let bola_group_ptr = bola_group.clone(); - let notify = Delivery::new(Some(Duration::from_secs(10))); + let notify = Delivery::new(Some(10)); let notify_ptr = notify.clone(); let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); let mut stream = UnboundedReceiverStream::new(rx); @@ -388,7 +377,7 @@ pub(crate) mod tests { .unwrap(); let amal_group_ptr = amal_group.clone(); - let notify = Delivery::new(Some(Duration::from_secs(20))); + let notify = Delivery::new(Some(20)); let notify_ptr = notify.clone(); let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); diff --git a/xmtp_mls/src/lib.rs b/xmtp_mls/src/lib.rs index 0a1f45bb5..757da0351 100644 --- a/xmtp_mls/src/lib.rs +++ b/xmtp_mls/src/lib.rs @@ -71,6 +71,31 @@ pub async fn sleep(duration: core::time::Duration) { tokio::time::sleep(duration).await } +/// Turn the Result into an `Option`, logging the error with `tracing::error` and +/// returning `None` if the value matches on Result::Err(). +/// Optionally pass a message as the second argument. +#[macro_export] +macro_rules! optify { + ( $e: expr ) => { + match $e { + Ok(v) => Some(v), + Err(e) => { + tracing::error!("{:?}", e); + None + } + } + }; + ( $e: expr, $msg: tt ) => { + match $e { + Ok(v) => Some(v), + Err(e) => { + tracing::error!("{}: {:?}", $msg, e); + None + } + } + }; +} + #[cfg(test)] pub(crate) mod tests { // Execute once before any tests are run diff --git a/xmtp_mls/src/storage/encrypted_store/consent_record.rs b/xmtp_mls/src/storage/encrypted_store/consent_record.rs index eed989729..fba2063e5 100644 --- a/xmtp_mls/src/storage/encrypted_store/consent_record.rs +++ b/xmtp_mls/src/storage/encrypted_store/consent_record.rs @@ -21,7 +21,7 @@ use serde::{Deserialize, Serialize}; #[diesel(table_name = consent_records)] #[diesel(primary_key(entity_type, entity))] pub struct StoredConsentRecord { - /// Enum, [`ConsentType`] representing the type of consent (group_id inbox_id, etc..) + /// Enum, [`ConsentType`] representing the type of consent (conversation_id inbox_id, etc..) pub entity_type: ConsentType, /// Enum, [`ConsentState`] representing the state of consent (allowed, denied, etc..) pub state: ConsentState, @@ -85,8 +85,8 @@ impl DbConnection { #[diesel(sql_type = Integer)] /// Type of consent record stored pub enum ConsentType { - /// Consent is for a group - GroupId = 1, + /// Consent is for a conversation + ConversationId = 1, /// Consent is for an inbox InboxId = 2, /// Consent is for an address @@ -109,7 +109,7 @@ where { fn from_sql(bytes: ::RawValue<'_>) -> deserialize::Result { match i32::from_sql(bytes)? { - 1 => Ok(ConsentType::GroupId), + 1 => Ok(ConsentType::ConversationId), 2 => Ok(ConsentType::InboxId), 3 => Ok(ConsentType::Address), x => Err(format!("Unrecognized variant {}", x).into()), diff --git a/xmtp_mls/src/storage/encrypted_store/group.rs b/xmtp_mls/src/storage/encrypted_store/group.rs index f411875fe..942ee2e83 100644 --- a/xmtp_mls/src/storage/encrypted_store/group.rs +++ b/xmtp_mls/src/storage/encrypted_store/group.rs @@ -16,7 +16,9 @@ use super::{ schema::{groups, groups::dsl}, Sqlite, }; -use crate::{impl_fetch, impl_store, DuplicateItem, StorageError}; +use crate::{ + groups::group_metadata::ConversationType, impl_fetch, impl_store, DuplicateItem, StorageError, +}; /// The Group ID type. pub type ID = Vec; @@ -123,7 +125,7 @@ impl DbConnection { created_after_ns: Option, created_before_ns: Option, limit: Option, - include_dm_groups: bool, + conversation_type: Option, ) -> Result, StorageError> { let mut query = dsl::groups.order(dsl::created_at_ns.asc()).into_boxed(); @@ -143,8 +145,16 @@ impl DbConnection { query = query.limit(limit); } - if !include_dm_groups { - query = query.filter(dsl::dm_inbox_id.is_null()); + if let Some(conversation_type) = conversation_type { + match conversation_type { + ConversationType::Group => { + query = query.filter(dsl::dm_inbox_id.is_null()); + } + ConversationType::Dm => { + query = query.filter(dsl::dm_inbox_id.is_not_null()); + } + ConversationType::Sync => {} + } } query = query.filter(dsl::purpose.eq(Purpose::Conversation)); @@ -469,7 +479,9 @@ pub(crate) mod tests { let test_group_3 = generate_dm(Some(GroupMembershipState::Allowed)); test_group_3.store(conn).unwrap(); - let all_results = conn.find_groups(None, None, None, None, false).unwrap(); + let all_results = conn + .find_groups(None, None, None, None, Some(ConversationType::Group)) + .unwrap(); assert_eq!(all_results.len(), 2); let pending_results = conn @@ -478,19 +490,27 @@ pub(crate) mod tests { None, None, None, - false, + Some(ConversationType::Group), ) .unwrap(); assert_eq!(pending_results[0].id, test_group_1.id); assert_eq!(pending_results.len(), 1); // Offset and limit - let results_with_limit = conn.find_groups(None, None, None, Some(1), false).unwrap(); + let results_with_limit = conn + .find_groups(None, None, None, Some(1), Some(ConversationType::Group)) + .unwrap(); assert_eq!(results_with_limit.len(), 1); assert_eq!(results_with_limit[0].id, test_group_1.id); let results_with_created_at_ns_after = conn - .find_groups(None, Some(test_group_1.created_at_ns), None, Some(1), false) + .find_groups( + None, + Some(test_group_1.created_at_ns), + None, + Some(1), + Some(ConversationType::Group), + ) .unwrap(); assert_eq!(results_with_created_at_ns_after.len(), 1); assert_eq!(results_with_created_at_ns_after[0].id, test_group_2.id); @@ -500,9 +520,16 @@ pub(crate) mod tests { assert_eq!(synced_groups.len(), 0); // test that dm groups are included - let dm_results = conn.find_groups(None, None, None, None, true).unwrap(); + let dm_results = conn.find_groups(None, None, None, None, None).unwrap(); assert_eq!(dm_results.len(), 3); assert_eq!(dm_results[2].id, test_group_3.id); + + // test only dms are returned + let dm_results = conn + .find_groups(None, None, None, None, Some(ConversationType::Dm)) + .unwrap(); + assert_eq!(dm_results.len(), 1); + assert_eq!(dm_results[0].id, test_group_3.id); }) .await } diff --git a/xmtp_mls/src/subscriptions.rs b/xmtp_mls/src/subscriptions.rs index 8f34452ab..3408219b7 100644 --- a/xmtp_mls/src/subscriptions.rs +++ b/xmtp_mls/src/subscriptions.rs @@ -3,7 +3,6 @@ use std::{collections::HashMap, sync::Arc}; use futures::{FutureExt, Stream, StreamExt}; use prost::Message; use tokio::{sync::oneshot, task::JoinHandle}; -use tokio_stream::wrappers::errors::BroadcastStreamRecvError; use xmtp_id::scw_verifier::SmartContractSignatureVerifier; use xmtp_proto::xmtp::mls::api::v1::WelcomeMessage; @@ -31,6 +30,18 @@ pub(crate) enum LocalEvents { NewGroup(MlsGroup), } +impl LocalEvents { + fn group_filter(self) -> Option> { + use LocalEvents::*; + // this is just to protect against any future variants + #[allow(unreachable_patterns)] + match self { + NewGroup(c) => Some(c), + _ => None, + } + } +} + impl Clone for LocalEvents { fn clone(&self) -> LocalEvents { use LocalEvents::*; @@ -136,45 +147,34 @@ where pub async fn stream_conversations( &self, - include_dm: bool, + conversation_type: Option, ) -> Result> + '_, ClientError> { - let event_queue = - tokio_stream::wrappers::BroadcastStream::new(self.local_events.subscribe()); + let event_queue = tokio_stream::wrappers::BroadcastStream::new( + self.local_events.subscribe(), + ) + .filter_map(|event| async { + crate::optify!(event, "Missed messages due to event queue lag") + .map(|c| c.group_filter()) + .flatten() + }); // Helper function for filtering Dm groups let filter_group = move |group: MlsGroup| async move { - let provider = match group.client.context().mls_provider() { - Ok(p) => Some(p), - Err(e) => { - tracing::error!("{}", e); - None - } - }?; - match group.metadata(provider) { - Ok(metadata) => { - if include_dm || metadata.conversation_type != ConversationType::Dm { - Some(group) + let conversation_type = &conversation_type; + let provider = crate::optify!(group.client.context().mls_provider())?; + let metadata = + crate::optify!(group.metadata(provider), "error processing group metadata"); + metadata + .filter(|m| { + if &Some(m.conversation_type) == conversation_type { + true } else { - None + conversation_type.is_none() } - } - Err(err) => { - tracing::error!("Error processing group metadata: {:?}", err); - None - } - } + }) + .map(|_| group) }; - let event_queue = event_queue.filter_map(|event| async move { - match event { - Ok(LocalEvents::NewGroup(g)) => Some(g), - Err(BroadcastStreamRecvError::Lagged(missed)) => { - tracing::warn!("Missed {missed} messages due to local event queue lagging"); - None - } - } - }); - let installation_key = self.installation_public_key(); let id_cursor = 0; @@ -190,16 +190,10 @@ where self.process_streamed_welcome(welcome?).await }) .filter_map(move |res| async { - match res.await { - Ok(group) => Some(group), - Err(err) => { - tracing::error!( - "Error processing stream entry for conversation: {:?}", - err - ); - None - } - } + crate::optify!( + res.await, + "Error processing stream entry for conversation: " + ) }); Ok(futures::stream::select(stream, event_queue).filter_map(filter_group)) @@ -213,13 +207,13 @@ where { pub fn stream_conversations_with_callback( client: Arc>, + conversation_type: Option, mut convo_callback: impl FnMut(MlsGroup) + Send + 'static, - include_dm: bool, ) -> impl crate::StreamHandle> { let (tx, rx) = oneshot::channel(); crate::spawn(Some(rx), async move { - let stream = client.stream_conversations(include_dm).await?; + let stream = client.stream_conversations(conversation_type).await?; futures::pin_mut!(stream); let _ = tx.send(()); while let Some(convo) = stream.next().await { @@ -233,13 +227,14 @@ where pub async fn stream_all_messages( &self, + conversation_type: Option, ) -> Result> + '_, ClientError> { self.sync_welcomes().await?; let mut group_id_to_info = self .store() .conn()? - .find_groups(None, None, None, None, false)? + .find_groups(None, None, None, None, conversation_type)? .into_iter() .map(Into::into) .collect::, MessagesStreamInfo>>(); @@ -253,7 +248,7 @@ where futures::pin_mut!(messages_stream); tracing::info!("Setting up conversation stream in stream_all_messages"); - let convo_stream = self.stream_conversations(true).await?; + let convo_stream = self.stream_conversations(conversation_type).await?; futures::pin_mut!(convo_stream); @@ -318,12 +313,13 @@ where pub fn stream_all_messages_with_callback( client: Arc>, + conversation_type: Option, mut callback: impl FnMut(StoredGroupMessage) + Send + 'static, ) -> impl crate::StreamHandle> { let (tx, rx) = oneshot::channel(); crate::spawn(Some(rx), async move { - let stream = client.stream_all_messages().await?; + let stream = client.stream_all_messages(conversation_type).await?; futures::pin_mut!(stream); let _ = tx.send(()); while let Some(message) = stream.next().await { @@ -346,7 +342,7 @@ pub(crate) mod tests { use crate::{ builder::ClientBuilder, client::FindGroupParams, - groups::GroupMetadataOptions, + groups::{group_metadata::ConversationType, GroupMetadataOptions}, storage::group_message::StoredGroupMessage, utils::test::{Delivery, FullXmtpClient, TestClient}, Client, StreamHandle, @@ -378,7 +374,7 @@ pub(crate) mod tests { let mut stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx); let bob_ptr = bob.clone(); crate::spawn(None, async move { - let bob_stream = bob_ptr.stream_conversations(true).await.unwrap(); + let bob_stream = bob_ptr.stream_conversations(None).await.unwrap(); futures::pin_mut!(bob_stream); while let Some(item) = bob_stream.next().await { let _ = tx.send(item); @@ -476,6 +472,7 @@ pub(crate) mod tests { let notify_pointer = notify.clone(); let mut handle = Client::::stream_all_messages_with_callback( Arc::new(caro), + None, move |message| { (*messages_clone.lock()).push(message); notify_pointer.notify_one(); @@ -526,6 +523,7 @@ pub(crate) mod tests { let delivery_pointer = delivery.clone(); let mut handle = Client::::stream_all_messages_with_callback( caro.clone(), + None, move |message| { delivery_pointer.notify_one(); (*messages_clone.lock()).push(message); @@ -625,6 +623,7 @@ pub(crate) mod tests { let blocked_pointer = blocked.clone(); let mut handle = Client::::stream_all_messages_with_callback( caro.clone(), + None, move |message| { (*messages_clone.lock()).push(message); blocked_pointer.fetch_sub(1, Ordering::SeqCst); @@ -680,12 +679,12 @@ pub(crate) mod tests { let closer = Client::::stream_conversations_with_callback( alix.clone(), + Some(ConversationType::Group), move |g| { let mut groups = groups_pointer.lock(); groups.push(g); notify_pointer.notify_one(); }, - false, ); alix.create_group(None, GroupMetadataOptions::default()) @@ -736,18 +735,18 @@ pub(crate) mod tests { let groups = Arc::new(Mutex::new(Vec::new())); // Wait for 2 seconds for the group creation to be streamed - let notify = Delivery::new(Some(std::time::Duration::from_secs(1))); + let notify = Delivery::new(Some(1)); let (notify_pointer, groups_pointer) = (notify.clone(), groups.clone()); // Start a stream with enableDm set to false - let closer = Client::::stream_conversations_with_callback( + let mut closer = Client::::stream_conversations_with_callback( alix.clone(), + Some(ConversationType::Group), move |g| { let mut groups = groups_pointer.lock(); groups.push(g); notify_pointer.notify_one(); }, - false, ); alix.create_dm_by_inbox_id(bo.inbox_id()).await.unwrap(); @@ -755,23 +754,70 @@ pub(crate) mod tests { let result = notify.wait_for_delivery().await; assert!(result.is_err(), "Stream unexpectedly received a DM group"); - closer.end(); + let group = alix + .create_group(None, GroupMetadataOptions::default()) + .unwrap(); + group + .add_members_by_inbox_id(vec![bo.inbox_id()]) + .await + .unwrap(); + + notify.wait_for_delivery().await.unwrap(); + { + let grps = groups.lock(); + assert_eq!(grps.len(), 1); + } + + let _ = closer.end_and_wait().await; - // Start a stream with enableDm set to true + // Start a stream with only dms let groups = Arc::new(Mutex::new(Vec::new())); - // Wait for 2 seconds for the group creation to be streamed - let notify = Delivery::new(Some(std::time::Duration::from_secs(60))); + let notify = Delivery::new(Some(1)); let (notify_pointer, groups_pointer) = (notify.clone(), groups.clone()); - let closer = FullXmtpClient::stream_conversations_with_callback( + + // Start a stream with conversation_type DM + let closer = Client::::stream_conversations_with_callback( alix.clone(), + Some(ConversationType::Dm), move |g| { let mut groups = groups_pointer.lock(); groups.push(g); notify_pointer.notify_one(); }, - true, ); + let group = alix + .create_group(None, GroupMetadataOptions::default()) + .unwrap(); + group + .add_members_by_inbox_id(vec![bo.inbox_id()]) + .await + .unwrap(); + + let result = notify.wait_for_delivery().await; + assert!(result.is_err(), "Stream unexpectedly received a Group"); + + alix.create_dm_by_inbox_id(bo.inbox_id()).await.unwrap(); + notify.wait_for_delivery().await.unwrap(); + { + let grps = groups.lock(); + assert_eq!(grps.len(), 1); + } + + closer.end(); + + // Start a stream with all conversations + let groups = Arc::new(Mutex::new(Vec::new())); + // Wait for 2 seconds for the group creation to be streamed + let notify = Delivery::new(None); + let (notify_pointer, groups_pointer) = (notify.clone(), groups.clone()); + let closer = + FullXmtpClient::stream_conversations_with_callback(alix.clone(), None, move |g| { + let mut groups = groups_pointer.lock(); + groups.push(g); + notify_pointer.notify_one(); + }); + alix.create_dm_by_inbox_id(bo.inbox_id()).await.unwrap(); notify.wait_for_delivery().await.unwrap(); { @@ -789,6 +835,147 @@ pub(crate) mod tests { assert_eq!(grps.len(), 2); } + let group = alix + .create_group(None, GroupMetadataOptions::default()) + .unwrap(); + group + .add_members_by_inbox_id(vec![bo.inbox_id()]) + .await + .unwrap(); + + notify.wait_for_delivery().await.unwrap(); + { + let grps = groups.lock(); + assert_eq!(grps.len(), 3); + } + + closer.end(); + } + + #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)] + #[cfg_attr(not(target_arch = "wasm32"), tokio::test(flavor = "multi_thread"))] + async fn test_dm_stream_all_messages() { + let alix = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await); + let bo = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await); + + let alix_group = alix + .create_group(None, GroupMetadataOptions::default()) + .unwrap(); + alix_group + .add_members_by_inbox_id(vec![bo.inbox_id()]) + .await + .unwrap(); + + let alix_dm = alix.create_dm_by_inbox_id(bo.inbox_id()).await.unwrap(); + + // Start a stream with only groups + let messages: Arc>> = Arc::new(Mutex::new(Vec::new())); + // Wait for 2 seconds for the group creation to be streamed + let notify = Delivery::new(Some(1)); + let (notify_pointer, messages_pointer) = (notify.clone(), messages.clone()); + + let mut closer = Client::::stream_all_messages_with_callback( + bo.clone(), + Some(ConversationType::Group), + move |message| { + let mut messages: parking_lot::lock_api::MutexGuard< + '_, + parking_lot::RawMutex, + Vec, + > = messages_pointer.lock(); + messages.push(message); + notify_pointer.notify_one(); + }, + ); + closer.wait_for_ready().await; + + alix_dm.send_message("first".as_bytes()).await.unwrap(); + + let result = notify.wait_for_delivery().await; + assert!(result.is_err(), "Stream unexpectedly received a DM message"); + + alix_group.send_message("second".as_bytes()).await.unwrap(); + + notify.wait_for_delivery().await.unwrap(); + { + let msgs = messages.lock(); + assert_eq!(msgs.len(), 1); + } + + closer.end(); + + // Start a stream with only dms + let messages: Arc>> = Arc::new(Mutex::new(Vec::new())); + // Wait for 2 seconds for the group creation to be streamed + let notify = Delivery::new(Some(1)); + let (notify_pointer, messages_pointer) = (notify.clone(), messages.clone()); + + let mut closer = Client::::stream_all_messages_with_callback( + bo.clone(), + Some(ConversationType::Dm), + move |message| { + let mut messages: parking_lot::lock_api::MutexGuard< + '_, + parking_lot::RawMutex, + Vec, + > = messages_pointer.lock(); + messages.push(message); + notify_pointer.notify_one(); + }, + ); + closer.wait_for_ready().await; + + alix_group.send_message("first".as_bytes()).await.unwrap(); + + let result = notify.wait_for_delivery().await; + assert!( + result.is_err(), + "Stream unexpectedly received a Group message" + ); + + alix_dm.send_message("second".as_bytes()).await.unwrap(); + + notify.wait_for_delivery().await.unwrap(); + { + let msgs = messages.lock(); + assert_eq!(msgs.len(), 1); + } + + closer.end(); + + // Start a stream with all conversations + let messages: Arc>> = Arc::new(Mutex::new(Vec::new())); + // Wait for 2 seconds for the group creation to be streamed + let notify = Delivery::new(Some(1)); + let (notify_pointer, messages_pointer) = (notify.clone(), messages.clone()); + + let mut closer = Client::::stream_all_messages_with_callback( + bo.clone(), + None, + move |message| { + let mut messages = messages_pointer.lock(); + messages.push(message); + notify_pointer.notify_one(); + }, + ); + closer.wait_for_ready().await; + + alix_group.send_message("first".as_bytes()).await.unwrap(); + + notify.wait_for_delivery().await.unwrap(); + { + let msgs = messages.lock(); + assert_eq!(msgs.len(), 1); + } + + alix_dm.send_message("second".as_bytes()).await.unwrap(); + + notify.wait_for_delivery().await.unwrap(); + { + let msgs = messages.lock(); + assert_eq!(msgs.len(), 2); + } + closer.end(); } } diff --git a/xmtp_mls/src/utils/test/mod.rs b/xmtp_mls/src/utils/test/mod.rs index b519d34e1..734da0a8a 100755 --- a/xmtp_mls/src/utils/test/mod.rs +++ b/xmtp_mls/src/utils/test/mod.rs @@ -186,8 +186,8 @@ pub struct Delivery { } impl Delivery { - pub fn new(timeout: Option) -> Self { - let timeout = timeout.unwrap_or(core::time::Duration::from_secs(60)); + pub fn new(timeout: Option) -> Self { + let timeout = core::time::Duration::from_secs(timeout.unwrap_or(60)); Self { notify: Arc::new(Notify::new()), timeout, diff --git a/xmtp_user_preferences/README.md b/xmtp_user_preferences/README.md index 35d10bc42..de0b132f0 100644 --- a/xmtp_user_preferences/README.md +++ b/xmtp_user_preferences/README.md @@ -1,3 +1,3 @@ -# XTMP Personal Private Portable Preferences +# XMTP Personal Private Portable Preferences A library for encrypting messages where the sender and recipient are the same (self-messaging).