Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up empty intents #969

Merged
merged 6 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2303,6 +2303,112 @@ mod tests {
assert_eq!(client2_members.len(), 2);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_create_new_installations_does_not_fork_group() {
let bo_wallet_key = &mut rng();
let bo_wallet = xmtp_cryptography::utils::LocalWallet::new(bo_wallet_key);

// Create clients
let alix = new_test_client().await;
let bo = new_test_client_with_wallet(bo_wallet.clone()).await;
let caro = new_test_client().await;

// Alix begins a stream for all messages
let message_callbacks = RustStreamCallback::default();
let stream_messages = alix
.conversations()
.stream_all_messages(Box::new(message_callbacks.clone()))
.await;
stream_messages.wait_for_ready().await;

// Alix creates a group with Bo and Caro
let group = alix
.conversations()
.create_group(
vec![bo.account_address.clone(), caro.account_address.clone()],
FfiCreateGroupOptions::default(),
)
.await
.unwrap();

// Alix and Caro Sync groups
alix.conversations().sync().await.unwrap();
bo.conversations().sync().await.unwrap();
caro.conversations().sync().await.unwrap();

// Alix and Caro find the group
let alix_group = alix.group(group.id()).unwrap();
let bo_group = bo.group(group.id()).unwrap();
let caro_group = caro.group(group.id()).unwrap();

// Alix sends a message in the group
alix_group
.send("First message".as_bytes().to_vec())
.await
.unwrap();

// Caro sends a message in the group
caro_group
.send("Second message".as_bytes().to_vec())
.await
.unwrap();

// Bo logs back in with a new installation
let bo2 = new_test_client_with_wallet(bo_wallet).await;

// Bo begins a stream for all messages
let bo_message_callbacks = RustStreamCallback::default();
let bo_stream_messages = bo2
.conversations()
.stream_all_messages(Box::new(bo_message_callbacks.clone()))
.await;
bo_stream_messages.wait_for_ready().await;

// Alix sends a message to the group
alix_group
.send("Third message".as_bytes().to_vec())
.await
.unwrap();

// New installation of bo finds the group
bo2.conversations().sync().await.unwrap();
let bo2_group = bo2.group(group.id()).unwrap();

// Bo sends a message to the group
bo2_group
.send("Fourth message".as_bytes().to_vec())
.await
.unwrap();

// Caro sends a message in the group
caro_group
.send("Fifth message".as_bytes().to_vec())
.await
.unwrap();

// Get the message count for all the clients
let caro_messages = caro_group
.find_messages(FfiListMessagesOptions::default())
.unwrap();
let alix_messages = alix_group
.find_messages(FfiListMessagesOptions::default())
.unwrap();
bo_group.sync().await.unwrap();
let bo_messages = bo_group
.find_messages(FfiListMessagesOptions::default())
.unwrap();

let bo2_messages = bo2_group
.find_messages(FfiListMessagesOptions::default())
.unwrap();

assert_eq!(caro_messages.len(), 5);
assert_eq!(alix_messages.len(), 6);
assert_eq!(bo_messages.len(), 5);
// Bo 2 only sees three messages since it joined after the first 2 were sent
assert_eq!(bo2_messages.len(), 3);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 5)]
async fn test_can_send_messages_when_epochs_behind() {
let alix = new_test_client().await;
Expand Down
91 changes: 48 additions & 43 deletions xmtp_mls/src/groups/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ impl MlsGroup {
state: IntentState::Error,
..
})) => {
log::warn!("not retrying intent ID {id}. since it is in state Error",);
log::warn!(
"not retrying intent ID {id}. since it is in state Error. {:?}",
last_err
);
return Err(last_err.unwrap_or(GroupError::Generic(
"Group intent could not be committed".to_string(),
)));
Expand Down Expand Up @@ -803,50 +806,52 @@ impl MlsGroup {
})
);

if let Err(err) = result {
log::error!("error getting publish intent data {:?}", err);
if (intent.publish_attempts + 1) as usize >= MAX_INTENT_PUBLISH_ATTEMPTS {
log::error!("intent {} has reached max publish attempts", intent.id);
// TODO: Eventually clean up errored attempts
provider
.conn()
.set_group_intent_error_and_fail_msg(&intent)?;
} else {
provider
.conn()
.increment_intent_publish_attempt_count(intent.id)?;
}

return Err(err);
}
match result {
Err(err) => {
log::error!("error getting publish intent data {:?}", err);
if (intent.publish_attempts + 1) as usize >= MAX_INTENT_PUBLISH_ATTEMPTS {
log::error!("intent {} has reached max publish attempts", intent.id);
// TODO: Eventually clean up errored attempts
provider
.conn()
.set_group_intent_error_and_fail_msg(&intent)?;
} else {
provider
.conn()
.increment_intent_publish_attempt_count(intent.id)?;
}

if let Some((payload, post_commit_data)) = result.expect("checked") {
let payload_slice = payload.as_slice();
return Err(err);
}
Ok(Some((payload, post_commit_data))) => {
let payload_slice = payload.as_slice();

client
.api_client
.send_group_messages(vec![payload_slice])
.await?;
log::info!(
"[{}] published intent [{}] of type [{}]",
client.inbox_id(),
intent.id,
intent.kind
);
provider.conn().set_group_intent_published(
intent.id,
sha256(payload_slice),
post_commit_data,
)?;
log::debug!(
"client [{}] set stored intent [{}] to state `published`",
client.inbox_id(),
intent.id
);
} else {
provider
.conn()
.set_group_intent_error_and_fail_msg(&intent)?;
client
.api_client
.send_group_messages(vec![payload_slice])
.await?;
log::info!(
"[{}] published intent [{}] of type [{}]",
client.inbox_id(),
intent.id,
intent.kind
);
provider.conn().set_group_intent_published(
intent.id,
sha256(payload_slice),
post_commit_data,
)?;
log::debug!(
"client [{}] set stored intent [{}] to state `published`",
client.inbox_id(),
intent.id
);
}
Ok(None) => {
log::info!("Skipping intent because no publish data returned");
let deleter: &dyn Delete<StoredGroupIntent, Key = i32> = &provider.conn();
deleter.delete(intent.id)?;
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion xmtp_mls/src/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ where
let (tx, rx) = oneshot::channel();

let handle = tokio::spawn(async move {
let mut stream = client.stream_conversations().await.unwrap();
let mut stream = client.stream_conversations().await?;
let _ = tx.send(());
while let Some(convo) = stream.next().await {
convo_callback(convo)
Expand Down
Loading