diff --git a/bindings_ffi/src/logger.rs b/bindings_ffi/src/logger.rs index 7f5c45f9d..c3dd68922 100644 --- a/bindings_ffi/src/logger.rs +++ b/bindings_ffi/src/logger.rs @@ -20,7 +20,7 @@ impl log::Log for RustLogger { self.logger.lock().expect("Logger mutex is poisoned!").log( record.level() as u32, record.level().to_string(), - format!("[libxmtp] {}", record.args()), + format!("[libxmtp][t:{}] {}", thread_id::get(), record.args()), ); } } diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 39fea18f6..d13cc477d 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -1507,6 +1507,10 @@ mod tests { } impl LocalWalletInboxOwner { + pub fn with_wallet(wallet: xmtp_cryptography::utils::LocalWallet) -> Self { + Self { wallet } + } + pub fn new() -> Self { Self { wallet: xmtp_cryptography::utils::LocalWallet::new(&mut rng()), @@ -1532,7 +1536,7 @@ mod tests { impl FfiLogger for MockLogger { fn log(&self, _level: u32, level_label: String, message: String) { - println!("[{}][t:{}]: {}", level_label, thread_id::get(), message) + println!("[{}]{}", level_label, message) } } @@ -1607,8 +1611,11 @@ mod tests { client.register_identity(signature_request).await.unwrap(); } - async fn new_test_client() -> Arc { - let ffi_inbox_owner = LocalWalletInboxOwner::new(); + /// Create a new test client with a given wallet. + async fn new_test_client_with_wallet( + wallet: xmtp_cryptography::utils::LocalWallet, + ) -> Arc { + let ffi_inbox_owner = LocalWalletInboxOwner::with_wallet(wallet); let nonce = 1; let inbox_id = generate_inbox_id(&ffi_inbox_owner.get_address(), &nonce); @@ -1626,10 +1633,16 @@ mod tests { ) .await .unwrap(); + register_client(&ffi_inbox_owner, &client).await; client } + async fn new_test_client() -> Arc { + let wallet = xmtp_cryptography::utils::LocalWallet::new(&mut rng()); + new_test_client_with_wallet(wallet).await + } + #[tokio::test] async fn get_inbox_id() { let client = new_test_client().await; @@ -2228,6 +2241,68 @@ mod tests { ); } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_create_new_installation_without_breaking_group() { + let wallet1_key = &mut rng(); + let wallet1 = xmtp_cryptography::utils::LocalWallet::new(wallet1_key); + let wallet2_key = &mut rng(); + let wallet2 = xmtp_cryptography::utils::LocalWallet::new(wallet2_key); + + // Create clients + let client1 = new_test_client_with_wallet(wallet1).await; + let client2 = new_test_client_with_wallet(wallet2.clone()).await; + // Create a new group with client1 including wallet2 + + let group = client1 + .conversations() + .create_group( + vec![client2.account_address.clone()], + FfiCreateGroupOptions::default(), + ) + .await + .unwrap(); + + // Sync groups + client1.conversations().sync().await.unwrap(); + client2.conversations().sync().await.unwrap(); + + // Find groups for both clients + let client1_group = client1.group(group.id()).unwrap(); + let client2_group = client2.group(group.id()).unwrap(); + + // Sync both groups + client1_group.sync().await.unwrap(); + client2_group.sync().await.unwrap(); + + // Assert both clients see 2 members + let client1_members = client1_group.list_members().unwrap(); + assert_eq!(client1_members.len(), 2); + + let client2_members = client2_group.list_members().unwrap(); + assert_eq!(client2_members.len(), 2); + + // Drop and delete local database for client2 + client2.release_db_connection().unwrap(); + + // Recreate client2 (new installation) + let client2 = new_test_client_with_wallet(wallet2).await; + + // Send a message that will break the group + client1_group + .send("This message will break the group".as_bytes().to_vec()) + .await + .unwrap(); + + // Assert client1 still sees 2 members + let client1_members = client1_group.list_members().unwrap(); + assert_eq!(client1_members.len(), 2); + + client2.conversations().sync().await.unwrap(); + let client2_group = client2.group(group.id()).unwrap(); + let client2_members = client2_group.list_members().unwrap(); + assert_eq!(client2_members.len(), 2); + } + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] async fn test_can_send_messages_when_epochs_behind() { let alix = new_test_client().await; diff --git a/xmtp_mls/migrations/2024-05-11-004236_cache_association_state/up.sql b/xmtp_mls/migrations/2024-05-11-004236_cache_association_state/up.sql index ab63aba77..825bd9a1e 100644 --- a/xmtp_mls/migrations/2024-05-11-004236_cache_association_state/up.sql +++ b/xmtp_mls/migrations/2024-05-11-004236_cache_association_state/up.sql @@ -1,3 +1,5 @@ +-- Caches the computed association state at a given sequence ID in an inbox log, +-- so that we don't need to replay the whole log. CREATE TABLE association_state ( "inbox_id" TEXT NOT NULL, "sequence_id" BIGINT NOT NULL, diff --git a/xmtp_mls/migrations/2024-05-15-145138_new_schema/up.sql b/xmtp_mls/migrations/2024-05-15-145138_new_schema/up.sql index 4c0cb3a57..8b4030ab3 100644 --- a/xmtp_mls/migrations/2024-05-15-145138_new_schema/up.sql +++ b/xmtp_mls/migrations/2024-05-15-145138_new_schema/up.sql @@ -87,6 +87,8 @@ CREATE TABLE group_intents( CREATE INDEX group_intents_group_id_state ON group_intents(group_id, state); +-- Caches the identity update payload at a given sequence ID, so that API calls +-- don't need to be repeated. CREATE TABLE identity_updates( -- The inbox_id the update refers to "inbox_id" text NOT NULL, diff --git a/xmtp_mls/src/api/identity.rs b/xmtp_mls/src/api/identity.rs index cfb685392..dbf3a1e62 100644 --- a/xmtp_mls/src/api/identity.rs +++ b/xmtp_mls/src/api/identity.rs @@ -17,6 +17,7 @@ use xmtp_proto::xmtp::identity::api::v1::{ const GET_IDENTITY_UPDATES_CHUNK_SIZE: usize = 50; +#[derive(Debug)] /// A filter for querying identity updates. `sequence_id` is the starting sequence, and only later updates will be returned. pub struct GetIdentityUpdatesV2Filter { pub inbox_id: InboxId, diff --git a/xmtp_mls/src/groups/group_membership.rs b/xmtp_mls/src/groups/group_membership.rs index e14a5ea23..a614a7ea2 100644 --- a/xmtp_mls/src/groups/group_membership.rs +++ b/xmtp_mls/src/groups/group_membership.rs @@ -105,6 +105,7 @@ impl From<&GroupMembership> for Vec { } } +#[derive(Debug)] pub struct MembershipDiff<'inbox_id> { pub added_inboxes: Vec<&'inbox_id String>, pub removed_inboxes: Vec<&'inbox_id String>, diff --git a/xmtp_mls/src/groups/members.rs b/xmtp_mls/src/groups/members.rs index 4e9b073db..c25172bbb 100644 --- a/xmtp_mls/src/groups/members.rs +++ b/xmtp_mls/src/groups/members.rs @@ -40,14 +40,25 @@ impl MlsGroup { .members .into_iter() .map(|(inbox_id, sequence_id)| (inbox_id, sequence_id as i64)) + .filter(|(_, sequence_id)| *sequence_id != 0) // Skip the initial state .collect::>(); let conn = provider.conn_ref(); - let association_state_map = StoredAssociationState::batch_read_from_cache(conn, requests)?; + let association_states = + StoredAssociationState::batch_read_from_cache(conn, requests.clone())?; let mutable_metadata = self.mutable_metadata()?; - // TODO: Figure out what to do with missing members from the local DB. Do we go to the network? Load from identity updates? - // Right now I am just omitting them - let members = association_state_map + if association_states.len() != requests.len() { + // Cache miss - not expected to happen because: + // 1. We don't allow updates to the group metadata unless we have already validated the association state + // 2. When validating the association state, we must have written it to the cache + log::error!( + "Failed to load all members for group - metadata: {:?}, computed members: {:?}", + requests, + association_states + ); + return Err(GroupError::InvalidGroupMembership); + } + let members = association_states .into_iter() .map(|association_state| { let inbox_id_str = association_state.inbox_id().to_string(); diff --git a/xmtp_mls/src/identity_updates.rs b/xmtp_mls/src/identity_updates.rs index dc8432fc9..14a4cd44a 100644 --- a/xmtp_mls/src/identity_updates.rs +++ b/xmtp_mls/src/identity_updates.rs @@ -109,7 +109,6 @@ where if let Some(association_state) = StoredAssociationState::read_from_cache(conn, inbox_id.to_string(), last_sequence_id)? { - log::debug!("Loaded association state from cache"); return Ok(association_state); } @@ -125,7 +124,6 @@ where last_sequence_id, association_state.clone(), )?; - log::debug!("Wrote association state to cache"); Ok(association_state) } @@ -137,6 +135,12 @@ where starting_sequence_id: Option, ending_sequence_id: Option, ) -> Result { + log::debug!( + "Computing diff for {:?} from {:?} to {:?}", + inbox_id.as_ref(), + starting_sequence_id, + ending_sequence_id + ); if starting_sequence_id.is_none() { return Ok(self .get_association_state(conn, inbox_id.as_ref(), ending_sequence_id) @@ -148,8 +152,23 @@ where .get_association_state(conn, inbox_id.as_ref(), starting_sequence_id) .await?; - let incremental_updates = conn - .get_identity_updates(inbox_id, starting_sequence_id, ending_sequence_id)? + let incremental_updates = + conn.get_identity_updates(inbox_id.as_ref(), starting_sequence_id, ending_sequence_id)?; + + let last_sequence_id = incremental_updates.last().map(|update| update.sequence_id); + if ending_sequence_id.is_some() + && last_sequence_id.is_some() + && last_sequence_id != ending_sequence_id + { + log::error!( + "Did not find the expected last sequence id. Expected: {:?}, Found: {:?}", + ending_sequence_id, + last_sequence_id + ); + return Err(AssociationError::MissingIdentityUpdate.into()); + } + + let incremental_updates = incremental_updates .into_iter() .map(|update| update.try_into()) .collect::, AssociationError>>()?; @@ -159,6 +178,16 @@ where final_state = apply_update(final_state, update).await?; } + log::debug!("Final state at {:?}: {:?}", last_sequence_id, final_state); + if last_sequence_id.is_some() { + StoredAssociationState::write_to_cache( + conn, + inbox_id.as_ref().to_string(), + last_sequence_id.unwrap(), + final_state.clone(), + )?; + } + Ok(initial_state.diff(&final_state)) } @@ -335,6 +364,7 @@ pub async fn load_identity_updates( if inbox_ids.is_empty() { return Ok(HashMap::new()); } + log::debug!("Fetching identity updates for: {:?}", inbox_ids); let existing_sequence_ids = conn.get_latest_sequence_id(&inbox_ids)?; let filters: Vec = inbox_ids diff --git a/xmtp_mls/src/storage/encrypted_store/association_state.rs b/xmtp_mls/src/storage/encrypted_store/association_state.rs index 9ff51cca1..210009853 100644 --- a/xmtp_mls/src/storage/encrypted_store/association_state.rs +++ b/xmtp_mls/src/storage/encrypted_store/association_state.rs @@ -40,12 +40,22 @@ impl StoredAssociationState { state: AssociationState, ) -> Result<(), StorageError> { let state_proto: AssociationStateProto = state.into(); - StoredAssociationState { - inbox_id, + let result = StoredAssociationState { + inbox_id: inbox_id.clone(), sequence_id, state: state_proto.encode_to_vec(), } - .store_or_ignore(conn) + .store_or_ignore(conn); + + if result.is_ok() { + log::debug!( + "Wrote association state to cache: {} {}", + inbox_id, + sequence_id + ); + } + + result } pub fn read_from_cache( @@ -56,7 +66,7 @@ impl StoredAssociationState { let stored_state: Option = conn.fetch(&(inbox_id.to_string(), sequence_id))?; - stored_state + let result = stored_state .map(|stored_state| { stored_state .try_into() @@ -66,14 +76,23 @@ impl StoredAssociationState { )) }) }) - .transpose() + .transpose(); + + if result.is_ok() && result.as_ref().unwrap().is_some() { + log::debug!( + "Loaded association state from cache: {} {}", + inbox_id, + sequence_id + ); + } + + result } pub fn batch_read_from_cache( conn: &DbConnection, identifiers: Vec<(InboxId, i64)>, ) -> Result, StorageError> { - // If no identifier provided, return empty hash map if identifiers.is_empty() { return Ok(vec![]); } diff --git a/xmtp_mls/src/storage/encrypted_store/mod.rs b/xmtp_mls/src/storage/encrypted_store/mod.rs index 2b3eb7138..3242c5b32 100644 --- a/xmtp_mls/src/storage/encrypted_store/mod.rs +++ b/xmtp_mls/src/storage/encrypted_store/mod.rs @@ -184,7 +184,7 @@ impl EncryptedMessageStore { .as_ref() .ok_or(StorageError::PoolNeedsConnection)?; - log::info!( + log::debug!( "Pulling connection from pool, idle_connections={}, total_connections={}", pool.state().idle_connections, pool.state().connections