Skip to content

Commit

Permalink
When a new installation is added the members list becomes incorrect (#…
Browse files Browse the repository at this point in the history
…937)

* lock stuff

* reproduce the member list issue in test

* Fix missing members due to cache inconsistency (#944)

Depends on #937

We currently write the association state cache whenever we compute an association state from scratch. Also, whenever we read an association state, we check the cache, and if it's not there, we go ahead and re-compute it. However, there are two codepaths that behave a little differently, causing us to underreport the members in a group:
1. One codepath computes incremental updates to an existing state, and does not update the cache.
2. One codepath batch reads multiple association states from the cache, and silently omits anything it doesn't find rather than re-computing it.

I've done a few things in this PR:

1. When computing incremental updates, make sure we also write to the cache
2. When batch reading from the cache, bake in a hard assumption that the cache will be up-to-date, and throw errors if we don't find what we expect
3. Some small improvements to our logging

---------

Co-authored-by: Richard Hua <[email protected]>
  • Loading branch information
nplasterer and richardhuaaa authored Aug 7, 2024
1 parent f6c861d commit 7d41461
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 19 deletions.
2 changes: 1 addition & 1 deletion bindings_ffi/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl log::Log for RustLogger {
self.logger.lock().expect("Logger mutex is poisoned!").log(
record.level() as u32,
record.level().to_string(),
format!("[libxmtp] {}", record.args()),
format!("[libxmtp][t:{}] {}", thread_id::get(), record.args()),
);
}
}
Expand Down
81 changes: 78 additions & 3 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1507,6 +1507,10 @@ mod tests {
}

impl LocalWalletInboxOwner {
pub fn with_wallet(wallet: xmtp_cryptography::utils::LocalWallet) -> Self {
Self { wallet }
}

pub fn new() -> Self {
Self {
wallet: xmtp_cryptography::utils::LocalWallet::new(&mut rng()),
Expand All @@ -1532,7 +1536,7 @@ mod tests {

impl FfiLogger for MockLogger {
fn log(&self, _level: u32, level_label: String, message: String) {
println!("[{}][t:{}]: {}", level_label, thread_id::get(), message)
println!("[{}]{}", level_label, message)
}
}

Expand Down Expand Up @@ -1607,8 +1611,11 @@ mod tests {
client.register_identity(signature_request).await.unwrap();
}

async fn new_test_client() -> Arc<FfiXmtpClient> {
let ffi_inbox_owner = LocalWalletInboxOwner::new();
/// Create a new test client with a given wallet.
async fn new_test_client_with_wallet(
wallet: xmtp_cryptography::utils::LocalWallet,
) -> Arc<FfiXmtpClient> {
let ffi_inbox_owner = LocalWalletInboxOwner::with_wallet(wallet);
let nonce = 1;
let inbox_id = generate_inbox_id(&ffi_inbox_owner.get_address(), &nonce);

Expand All @@ -1626,10 +1633,16 @@ mod tests {
)
.await
.unwrap();

register_client(&ffi_inbox_owner, &client).await;
client
}

async fn new_test_client() -> Arc<FfiXmtpClient> {
let wallet = xmtp_cryptography::utils::LocalWallet::new(&mut rng());
new_test_client_with_wallet(wallet).await
}

#[tokio::test]
async fn get_inbox_id() {
let client = new_test_client().await;
Expand Down Expand Up @@ -2228,6 +2241,68 @@ mod tests {
);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_create_new_installation_without_breaking_group() {
let wallet1_key = &mut rng();
let wallet1 = xmtp_cryptography::utils::LocalWallet::new(wallet1_key);
let wallet2_key = &mut rng();
let wallet2 = xmtp_cryptography::utils::LocalWallet::new(wallet2_key);

// Create clients
let client1 = new_test_client_with_wallet(wallet1).await;
let client2 = new_test_client_with_wallet(wallet2.clone()).await;
// Create a new group with client1 including wallet2

let group = client1
.conversations()
.create_group(
vec![client2.account_address.clone()],
FfiCreateGroupOptions::default(),
)
.await
.unwrap();

// Sync groups
client1.conversations().sync().await.unwrap();
client2.conversations().sync().await.unwrap();

// Find groups for both clients
let client1_group = client1.group(group.id()).unwrap();
let client2_group = client2.group(group.id()).unwrap();

// Sync both groups
client1_group.sync().await.unwrap();
client2_group.sync().await.unwrap();

// Assert both clients see 2 members
let client1_members = client1_group.list_members().unwrap();
assert_eq!(client1_members.len(), 2);

let client2_members = client2_group.list_members().unwrap();
assert_eq!(client2_members.len(), 2);

// Drop and delete local database for client2
client2.release_db_connection().unwrap();

// Recreate client2 (new installation)
let client2 = new_test_client_with_wallet(wallet2).await;

// Send a message that will break the group
client1_group
.send("This message will break the group".as_bytes().to_vec())
.await
.unwrap();

// Assert client1 still sees 2 members
let client1_members = client1_group.list_members().unwrap();
assert_eq!(client1_members.len(), 2);

client2.conversations().sync().await.unwrap();
let client2_group = client2.group(group.id()).unwrap();
let client2_members = client2_group.list_members().unwrap();
assert_eq!(client2_members.len(), 2);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_can_send_messages_when_epochs_behind() {
let alix = new_test_client().await;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
-- Caches the computed association state at a given sequence ID in an inbox log,
-- so that we don't need to replay the whole log.
CREATE TABLE association_state (
"inbox_id" TEXT NOT NULL,
"sequence_id" BIGINT NOT NULL,
Expand Down
2 changes: 2 additions & 0 deletions xmtp_mls/migrations/2024-05-15-145138_new_schema/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ CREATE TABLE group_intents(

CREATE INDEX group_intents_group_id_state ON group_intents(group_id, state);

-- Caches the identity update payload at a given sequence ID, so that API calls
-- don't need to be repeated.
CREATE TABLE identity_updates(
-- The inbox_id the update refers to
"inbox_id" text NOT NULL,
Expand Down
1 change: 1 addition & 0 deletions xmtp_mls/src/api/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use xmtp_proto::xmtp::identity::api::v1::{

const GET_IDENTITY_UPDATES_CHUNK_SIZE: usize = 50;

#[derive(Debug)]
/// A filter for querying identity updates. `sequence_id` is the starting sequence, and only later updates will be returned.
pub struct GetIdentityUpdatesV2Filter {
pub inbox_id: InboxId,
Expand Down
1 change: 1 addition & 0 deletions xmtp_mls/src/groups/group_membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl From<&GroupMembership> for Vec<u8> {
}
}

#[derive(Debug)]
pub struct MembershipDiff<'inbox_id> {
pub added_inboxes: Vec<&'inbox_id String>,
pub removed_inboxes: Vec<&'inbox_id String>,
Expand Down
19 changes: 15 additions & 4 deletions xmtp_mls/src/groups/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,25 @@ impl MlsGroup {
.members
.into_iter()
.map(|(inbox_id, sequence_id)| (inbox_id, sequence_id as i64))
.filter(|(_, sequence_id)| *sequence_id != 0) // Skip the initial state
.collect::<Vec<_>>();

let conn = provider.conn_ref();
let association_state_map = StoredAssociationState::batch_read_from_cache(conn, requests)?;
let association_states =
StoredAssociationState::batch_read_from_cache(conn, requests.clone())?;
let mutable_metadata = self.mutable_metadata()?;
// TODO: Figure out what to do with missing members from the local DB. Do we go to the network? Load from identity updates?
// Right now I am just omitting them
let members = association_state_map
if association_states.len() != requests.len() {
// Cache miss - not expected to happen because:
// 1. We don't allow updates to the group metadata unless we have already validated the association state
// 2. When validating the association state, we must have written it to the cache
log::error!(
"Failed to load all members for group - metadata: {:?}, computed members: {:?}",
requests,
association_states
);
return Err(GroupError::InvalidGroupMembership);
}
let members = association_states
.into_iter()
.map(|association_state| {
let inbox_id_str = association_state.inbox_id().to_string();
Expand Down
38 changes: 34 additions & 4 deletions xmtp_mls/src/identity_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ where
if let Some(association_state) =
StoredAssociationState::read_from_cache(conn, inbox_id.to_string(), last_sequence_id)?
{
log::debug!("Loaded association state from cache");
return Ok(association_state);
}

Expand All @@ -125,7 +124,6 @@ where
last_sequence_id,
association_state.clone(),
)?;
log::debug!("Wrote association state to cache");

Ok(association_state)
}
Expand All @@ -137,6 +135,12 @@ where
starting_sequence_id: Option<i64>,
ending_sequence_id: Option<i64>,
) -> Result<AssociationStateDiff, ClientError> {
log::debug!(
"Computing diff for {:?} from {:?} to {:?}",
inbox_id.as_ref(),
starting_sequence_id,
ending_sequence_id
);
if starting_sequence_id.is_none() {
return Ok(self
.get_association_state(conn, inbox_id.as_ref(), ending_sequence_id)
Expand All @@ -148,8 +152,23 @@ where
.get_association_state(conn, inbox_id.as_ref(), starting_sequence_id)
.await?;

let incremental_updates = conn
.get_identity_updates(inbox_id, starting_sequence_id, ending_sequence_id)?
let incremental_updates =
conn.get_identity_updates(inbox_id.as_ref(), starting_sequence_id, ending_sequence_id)?;

let last_sequence_id = incremental_updates.last().map(|update| update.sequence_id);
if ending_sequence_id.is_some()
&& last_sequence_id.is_some()
&& last_sequence_id != ending_sequence_id
{
log::error!(
"Did not find the expected last sequence id. Expected: {:?}, Found: {:?}",
ending_sequence_id,
last_sequence_id
);
return Err(AssociationError::MissingIdentityUpdate.into());
}

let incremental_updates = incremental_updates
.into_iter()
.map(|update| update.try_into())
.collect::<Result<Vec<IdentityUpdate>, AssociationError>>()?;
Expand All @@ -159,6 +178,16 @@ where
final_state = apply_update(final_state, update).await?;
}

log::debug!("Final state at {:?}: {:?}", last_sequence_id, final_state);
if last_sequence_id.is_some() {
StoredAssociationState::write_to_cache(
conn,
inbox_id.as_ref().to_string(),
last_sequence_id.unwrap(),
final_state.clone(),
)?;
}

Ok(initial_state.diff(&final_state))
}

Expand Down Expand Up @@ -335,6 +364,7 @@ pub async fn load_identity_updates<ApiClient: XmtpApi>(
if inbox_ids.is_empty() {
return Ok(HashMap::new());
}
log::debug!("Fetching identity updates for: {:?}", inbox_ids);

let existing_sequence_ids = conn.get_latest_sequence_id(&inbox_ids)?;
let filters: Vec<GetIdentityUpdatesV2Filter> = inbox_ids
Expand Down
31 changes: 25 additions & 6 deletions xmtp_mls/src/storage/encrypted_store/association_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,22 @@ impl StoredAssociationState {
state: AssociationState,
) -> Result<(), StorageError> {
let state_proto: AssociationStateProto = state.into();
StoredAssociationState {
inbox_id,
let result = StoredAssociationState {
inbox_id: inbox_id.clone(),
sequence_id,
state: state_proto.encode_to_vec(),
}
.store_or_ignore(conn)
.store_or_ignore(conn);

if result.is_ok() {
log::debug!(
"Wrote association state to cache: {} {}",
inbox_id,
sequence_id
);
}

result
}

pub fn read_from_cache(
Expand All @@ -56,7 +66,7 @@ impl StoredAssociationState {
let stored_state: Option<StoredAssociationState> =
conn.fetch(&(inbox_id.to_string(), sequence_id))?;

stored_state
let result = stored_state
.map(|stored_state| {
stored_state
.try_into()
Expand All @@ -66,14 +76,23 @@ impl StoredAssociationState {
))
})
})
.transpose()
.transpose();

if result.is_ok() && result.as_ref().unwrap().is_some() {
log::debug!(
"Loaded association state from cache: {} {}",
inbox_id,
sequence_id
);
}

result
}

pub fn batch_read_from_cache(
conn: &DbConnection,
identifiers: Vec<(InboxId, i64)>,
) -> Result<Vec<AssociationState>, StorageError> {
// If no identifier provided, return empty hash map
if identifiers.is_empty() {
return Ok(vec![]);
}
Expand Down
2 changes: 1 addition & 1 deletion xmtp_mls/src/storage/encrypted_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl EncryptedMessageStore {
.as_ref()
.ok_or(StorageError::PoolNeedsConnection)?;

log::info!(
log::debug!(
"Pulling connection from pool, idle_connections={}, total_connections={}",
pool.state().idle_connections,
pool.state().connections
Expand Down

0 comments on commit 7d41461

Please sign in to comment.