Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reproduce perf test in rust #958

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl XmtpMlsLocalContext {
self.identity.installation_keys.to_public_vec()
}

/// Get the account address of the blockchain account associated with this client
/// Get the inbox id associated with this client
pub fn inbox_id(&self) -> InboxId {
self.identity.inbox_id().clone()
}
Expand Down
88 changes: 87 additions & 1 deletion xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1202,7 +1202,10 @@ mod tests {
use diesel::connection::SimpleConnection;
use openmls::prelude::{tls_codec::Serialize, Member, MlsGroup as OpenMlsGroup};
use prost::Message;
use std::sync::Arc;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use tracing_test::traced_test;
use xmtp_cryptography::utils::generate_local_wallet;
use xmtp_proto::xmtp::mls::message_contents::EncodedContent;
Expand All @@ -1224,6 +1227,7 @@ mod tests {
group_intent::IntentState,
group_message::{GroupMessageKind, StoredGroupMessage},
},
utils::test,
xmtp_openmls_provider::XmtpOpenMlsProvider,
Client, InboxOwner, XmtpApi,
};
Expand Down Expand Up @@ -2805,4 +2809,86 @@ mod tests {
panic!("Expected error")
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 10)]
async fn group_intent_could_not_be_committed() {
let alix_wallet = generate_local_wallet();
let alix = Arc::new(ClientBuilder::new_test_client(&alix_wallet).await);
let peers = test::create_bulk_clients(20).await;
let bo = peers[0].clone();

let group_callbacks = Arc::new(AtomicUsize::new(0));
let msg_callbacks = Arc::new(AtomicUsize::new(0));
let bo_group_callbacks = Arc::new(AtomicUsize::new(0));
let bo_message_callbacks = Arc::new(AtomicUsize::new(0));

let mut handle = alix.clone().stream_conversations_with_callback({
let group_callbacks = group_callbacks.clone();
move |_convo| {
group_callbacks.fetch_add(1, Ordering::SeqCst);
}
});
handle.wait_for_ready().await;

let mut handle = alix.clone().stream_all_messages_with_callback({
let msg_callbacks = msg_callbacks.clone();
move |_message| {
msg_callbacks.fetch_add(1, Ordering::SeqCst);
}
});
handle.wait_for_ready().await;

let mut handle = bo.clone().stream_conversations_with_callback({
let bo_group_callbacks = bo_group_callbacks.clone();
move |_convo| {
bo_group_callbacks.fetch_add(1, Ordering::SeqCst);
}
});
handle.wait_for_ready().await;

let mut handle = bo.clone().stream_all_messages_with_callback({
let bo_message_callbacks = bo_message_callbacks.clone();
move |_message| {
bo_message_callbacks.fetch_add(1, Ordering::SeqCst);
}
});
handle.wait_for_ready().await;

let groups = test::create_groups(&alix, &peers, 1, 10).await.unwrap();

log::info!(
"Alix streamed {} groups (1)",
group_callbacks.load(Ordering::SeqCst)
);
log::info!(
"Alix streamed {} messages (10)",
msg_callbacks.load(Ordering::SeqCst)
);

let alix_group = groups[0].clone();
let bo_group = bo.group(alix_group.group_id.clone()).unwrap();

test::create_messages(&bo_group, &bo, 10, "Bo")
.await
.unwrap();
test::create_messages(&alix_group, &alix, 10, "Alix")
.await
.unwrap();

log::info!(
"Alix Streamed {} groups (1)",
group_callbacks.load(Ordering::SeqCst)
);
log::info!(
"Alix Streamed {} messages (30)",
msg_callbacks.load(Ordering::SeqCst)
);
log::info!(
"Bo Streamed {} groups (1)",
bo_group_callbacks.load(Ordering::SeqCst),
);
log::info!(
"Bo Streamed {} messages (30)",
bo_message_callbacks.load(Ordering::SeqCst)
);
}
}
22 changes: 11 additions & 11 deletions xmtp_mls/src/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,13 @@ where
ApiClient: XmtpApi,
{
pub fn stream_conversations_with_callback(
client: Arc<Client<ApiClient>>,
self: Arc<Self>,
mut convo_callback: impl FnMut(MlsGroup) + Send + 'static,
) -> StreamHandle<Result<(), ClientError>> {
let (tx, rx) = oneshot::channel();

let handle = tokio::spawn(async move {
let mut stream = client.stream_conversations().await.unwrap();
let mut stream = self.stream_conversations().await.unwrap();
let _ = tx.send(());
while let Some(convo) = stream.next().await {
convo_callback(convo)
Expand All @@ -252,14 +252,14 @@ where
}

pub(crate) fn stream_messages_with_callback(
client: Arc<Client<ApiClient>>,
self: Arc<Self>,
group_id_to_info: HashMap<Vec<u8>, MessagesStreamInfo>,
mut callback: impl FnMut(StoredGroupMessage) + Send + 'static,
) -> StreamHandle<Result<(), ClientError>> {
let (tx, rx) = oneshot::channel();

let handle = tokio::spawn(async move {
let mut stream = Self::stream_messages(client, group_id_to_info).await?;
let mut stream = Self::stream_messages(self, group_id_to_info).await?;
let _ = tx.send(());
while let Some(message) = stream.next().await {
callback(message)
Expand All @@ -274,11 +274,11 @@ where
}

pub async fn stream_all_messages(
client: Arc<Client<ApiClient>>,
self: Arc<Self>,
) -> Result<impl Stream<Item = Result<StoredGroupMessage, ClientError>>, ClientError> {
client.sync_welcomes().await?;
self.sync_welcomes().await?;

let mut group_id_to_info = client
let mut group_id_to_info = self
.store()
.conn()?
.find_groups(None, None, None, None)?
Expand All @@ -287,8 +287,8 @@ where
.collect::<HashMap<Vec<u8>, MessagesStreamInfo>>();

let stream = async_stream::stream! {
let client = client.clone();
let mut messages_stream = client
let client = self.clone();
let mut messages_stream = self
.clone()
.stream_messages(group_id_to_info.clone())
.await?;
Expand Down Expand Up @@ -351,13 +351,13 @@ where
}

pub fn stream_all_messages_with_callback(
client: Arc<Client<ApiClient>>,
self: Arc<Self>,
mut callback: impl FnMut(StoredGroupMessage) + Send + Sync + 'static,
) -> StreamHandle<Result<(), ClientError>> {
let (tx, rx) = oneshot::channel();

let handle = tokio::spawn(async move {
let mut stream = Self::stream_all_messages(client).await?;
let mut stream = Self::stream_all_messages(self).await?;
let _ = tx.send(());
while let Some(message) = stream.next().await {
match message {
Expand Down
61 changes: 61 additions & 0 deletions xmtp_mls/src/utils/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ use rand::{
use std::sync::Arc;
use tokio::{sync::Notify, time::error::Elapsed};
use xmtp_api_grpc::grpc_api_helper::Client as GrpcClient;
use xmtp_cryptography::utils::generate_local_wallet;
use xmtp_id::associations::{generate_inbox_id, RecoverableEcdsaSignature};

use crate::{
builder::ClientBuilder,
groups::{GroupMetadataOptions, MlsGroup},
identity::IdentityStrategy,
storage::{EncryptedMessageStore, StorageOption},
types::Address,
Expand Down Expand Up @@ -48,6 +50,65 @@ pub fn rand_time() -> i64 {
rng.gen_range(0..1_000_000_000)
}

/// Create a bunch of random clients
pub async fn create_bulk_clients(num: usize) -> Vec<Arc<Client<TestClient>>> {
let mut futures = vec![];
for _ in 0..num {
futures.push(async move {
let local = generate_local_wallet();
Arc::new(ClientBuilder::new_test_client(&local).await)
});
}
futures::future::join_all(futures).await
}

pub async fn create_groups(
client: &Client<TestClient>,
peers: &[Arc<Client<TestClient>>],
num_groups: usize,
num_msgs: usize,
) -> Result<Vec<MlsGroup>, anyhow::Error> {
let mut groups = vec![];
let ids = peers.iter().map(|p| p.inbox_id()).collect::<Vec<String>>();

for index in 0..num_groups {
let group = client.create_group(
None,
GroupMetadataOptions {
name: Some(format!("group {index}")),
image_url_square: Some(format!("www.group{index}.com")),
description: Some(format!("group {index}")),
..Default::default()
},
)?;
group.add_members_by_inbox_id(client, ids.clone()).await?;
for msg_index in 0..num_msgs {
group
.send_message(format!("Alix message {msg_index}").as_bytes(), client)
.await?;
}
groups.push(group);
}
Ok(groups)
}

pub async fn create_messages<S: AsRef<str>>(
group: &MlsGroup,
client: &Client<TestClient>,
num_msgs: usize,
name: S,
) -> Result<usize, anyhow::Error> {
let mut messages = 0;
let name = name.as_ref();
for msg_index in 0..num_msgs {
group
.send_message(format!("{name} Message {msg_index}").as_bytes(), client)
.await?;
messages += 1;
}
Ok(messages)
}

#[async_trait::async_trait]
#[cfg(feature = "http-api")]
impl XmtpTestClient for XmtpHttpApiClient {
Expand Down
Loading