Skip to content

Commit

Permalink
Abort if retry fails (#945)
Browse files Browse the repository at this point in the history
* Abort if retry fails

* Add error message

* Add test
  • Loading branch information
neekolas authored Aug 7, 2024
1 parent 7d41461 commit 340b0b6
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 3 deletions.
53 changes: 52 additions & 1 deletion xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,7 @@ fn build_group_join_config() -> MlsGroupJoinConfig {

#[cfg(test)]
mod tests {
use diesel::connection::SimpleConnection;
use openmls::prelude::{tls_codec::Serialize, Member, MlsGroup as OpenMlsGroup};
use prost::Message;
use std::sync::Arc;
Expand Down Expand Up @@ -1229,7 +1230,7 @@ mod tests {

use super::{
intents::{Installation, SendWelcomesAction},
MlsGroup,
GroupError, MlsGroup,
};

async fn receive_group_invite<ApiClient>(client: &Client<ApiClient>) -> MlsGroup
Expand Down Expand Up @@ -2754,4 +2755,54 @@ mod tests {
]
);
}

#[tokio::test(flavor = "multi_thread")]
async fn process_messages_abort_on_retryable_error() {
let alix = ClientBuilder::new_test_client(&generate_local_wallet()).await;
let bo = ClientBuilder::new_test_client(&generate_local_wallet()).await;

let alix_group = alix
.create_group(None, GroupMetadataOptions::default())
.unwrap();

alix_group
.add_members_by_inbox_id(&alix, vec![bo.inbox_id()])
.await
.unwrap();

// Create two commits
alix_group
.update_group_name(&alix, "foo".to_string())
.await
.unwrap();
alix_group
.update_group_name(&alix, "bar".to_string())
.await
.unwrap();

let bo_group = receive_group_invite(&bo).await;
// Get the group messages before we lock the DB, simulating an error that happens
// in the middle of a sync instead of the beginning
let bo_messages = bo
.query_group_messages(&bo_group.group_id, &bo.store().conn().unwrap())
.await
.unwrap();

let conn_1 = bo.store().conn().unwrap();
let mut conn_2 = bo.store().raw_conn().unwrap();

// Begin an exclusive transaction on a second connection to lock the database
conn_2.batch_execute("BEGIN EXCLUSIVE").unwrap();
let process_result = bo_group.process_messages(bo_messages, conn_1, &bo).await;
if let Some(GroupError::ReceiveErrors(errors)) = process_result.err() {
assert_eq!(errors.len(), 1);
assert!(errors
.first()
.unwrap()
.to_string()
.contains("database is locked"));
} else {
panic!("Expected error")
}
}
}
13 changes: 12 additions & 1 deletion xmtp_mls/src/groups/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::{
hpke::{encrypt_welcome, HpkeError},
identity::parse_credential,
identity_updates::load_identity_updates,
retry::Retry,
retry::{Retry, RetryableError},
retry_async,
storage::{
db_connection::DbConnection,
Expand Down Expand Up @@ -692,7 +692,18 @@ impl MlsGroup {
})
);
if let Err(e) = result {
let is_retryable = e.is_retryable();
let error_message = e.to_string();
receive_errors.push(e);
// If the error is retryable we cannot move on to the next message
// otherwise you can get into a forked group state.
if is_retryable {
log::error!(
"Aborting message processing for retryable error: {}",
error_message
);
break;
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion xmtp_mls/src/storage/encrypted_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ impl EncryptedMessageStore {
Ok(())
}

fn raw_conn(
pub(crate) fn raw_conn(
&self,
) -> Result<PooledConnection<ConnectionManager<SqliteConnection>>, StorageError> {
let pool_guard = self.pool.read();
Expand Down

0 comments on commit 340b0b6

Please sign in to comment.