Skip to content

Commit

Permalink
Remove unpublished message, add method for publishing all messages to…
Browse files Browse the repository at this point in the history
… `Group` (#896)

* remove unpublished message directly return ID

* fix bindings

* fix node bindings
  • Loading branch information
insipx authored Jul 9, 2024
1 parent d4d8134 commit ec1da4e
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 141 deletions.
44 changes: 14 additions & 30 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use xmtp_mls::groups::group_permissions::PermissionsPolicies;
use xmtp_mls::groups::intents::PermissionPolicyOption;
use xmtp_mls::groups::intents::PermissionUpdateType;
use xmtp_mls::groups::GroupMetadataOptions;
use xmtp_mls::groups::UnpublishedMessage;
use xmtp_mls::{
api::ApiClientWrapper,
builder::ClientBuilder,
Expand Down Expand Up @@ -646,28 +645,6 @@ pub struct FfiCreateGroupOptions {
pub group_pinned_frame_url: Option<String>,
}

#[derive(uniffi::Object)]
pub struct FfiUnpublishedMessage {
message: UnpublishedMessage<TonicApiClient>,
}

#[uniffi::export(async_runtime = "tokio")]
impl FfiUnpublishedMessage {
pub fn id(&self) -> Vec<u8> {
self.message.id().to_vec()
}

pub async fn publish(&self) -> Result<(), GenericError> {
self.message.publish().await.map_err(Into::into)
}
}

impl From<UnpublishedMessage<TonicApiClient>> for FfiUnpublishedMessage {
fn from(message: UnpublishedMessage<TonicApiClient>) -> FfiUnpublishedMessage {
Self { message }
}
}

impl FfiCreateGroupOptions {
pub fn into_group_metadata_options(self) -> GroupMetadataOptions {
GroupMetadataOptions {
Expand Down Expand Up @@ -695,20 +672,27 @@ impl FfiGroup {
}

/// send a message without immediately publishing to the delivery service.
pub fn send_optimistic(
&self,
content_bytes: Vec<u8>,
) -> Result<FfiUnpublishedMessage, GenericError> {
pub fn send_optimistic(&self, content_bytes: Vec<u8>) -> Result<Vec<u8>, GenericError> {
let group = MlsGroup::new(
self.inner_client.context().clone(),
self.group_id.clone(),
self.created_at_ns,
);

let message =
group.send_message_optimistic(content_bytes.as_slice(), &self.inner_client)?;
let id = group.send_message_optimistic(content_bytes.as_slice())?;

Ok(message.into())
Ok(id)
}

/// Publish all unpublished messages
pub async fn publish_messages(&self) -> Result<(), GenericError> {
let group = MlsGroup::new(
self.inner_client.context().clone(),
self.group_id.clone(),
self.created_at_ns,
);
group.publish_messages(&self.inner_client).await?;
Ok(())
}

pub async fn sync(&self) -> Result<(), GenericError> {
Expand Down
60 changes: 22 additions & 38 deletions bindings_node/src/groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ use xmtp_mls::groups::{
group_metadata::{ConversationType, GroupMetadata},
group_permissions::GroupMutablePermissions,
members::PermissionLevel,
MlsGroup, PreconfiguredPolicies, UnpublishedMessage, UpdateAdminListType,
MlsGroup, PreconfiguredPolicies, UpdateAdminListType,
};
use xmtp_proto::xmtp::mls::message_contents::EncodedContent;

use crate::{
encoded_content::NapiEncodedContent,
messages::{NapiListMessagesOptions, NapiMessage},
mls_client::{RustXmtpClient, TonicApiClient},
mls_client::RustXmtpClient,
streams::NapiStreamCloser,
};

Expand Down Expand Up @@ -105,32 +105,6 @@ impl NapiGroupPermissions {
}
}

#[napi]
pub struct NapiUnpublishedMessage {
message: UnpublishedMessage<TonicApiClient>,
}

#[napi]
impl NapiUnpublishedMessage {
pub fn id(&self) -> Vec<u8> {
self.message.id().to_vec()
}

pub async fn publish(&self) -> Result<()> {
self
.message
.publish()
.await
.map_err(|e| Error::from_reason(format!("{}", e)))
}
}

impl From<UnpublishedMessage<TonicApiClient>> for NapiUnpublishedMessage {
fn from(message: UnpublishedMessage<TonicApiClient>) -> NapiUnpublishedMessage {
Self { message }
}
}

#[derive(Debug)]
#[napi]
pub struct NapiGroup {
Expand Down Expand Up @@ -173,26 +147,36 @@ impl NapiGroup {
Ok(hex::encode(message_id.clone()))
}

/// send a message without immediately publishing to the delivery service.
#[napi]
pub fn send_optimistic(
&self,
encoded_content: NapiEncodedContent,
) -> Result<NapiUnpublishedMessage> {
pub fn send_optimistic(&self, encoded_content: NapiEncodedContent) -> Result<Vec<u8>> {
let encoded_content: EncodedContent = encoded_content.into();
let group = MlsGroup::new(
self.inner_client.context().clone(),
self.group_id.clone(),
self.created_at_ns,
);

let message = group
.send_message_optimistic(
encoded_content.encode_to_vec().as_slice(),
&self.inner_client,
)
let id = group
.send_message_optimistic(encoded_content.encode_to_vec().as_slice())
.map_err(|e| Error::from_reason(format!("{}", e)))?;

Ok(message.into())
Ok(id)
}

/// Publish all unpublished messages
#[napi]
pub async fn publish_messages(&self) -> Result<()> {
let group = MlsGroup::new(
self.inner_client.context().clone(),
self.group_id.clone(),
self.created_at_ns,
);
group
.publish_messages(&self.inner_client)
.await
.map_err(|e| Error::from_reason(format!("{}", e)))?;
Ok(())
}

#[napi]
Expand Down
151 changes: 78 additions & 73 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use self::{
message_history::MessageHistoryError,
validated_commit::CommitValidationError,
};
use std::{collections::HashSet, future::Future, pin::Pin, sync::Arc};
use std::{collections::HashSet, sync::Arc};
use xmtp_cryptography::signature::{sanitize_evm_addresses, AddressValidationError};
use xmtp_id::InboxId;
use xmtp_proto::xmtp::mls::{
Expand Down Expand Up @@ -234,46 +234,6 @@ pub enum UpdateAdminListType {
RemoveSuper,
}

pub type MessagePublishFuture = Pin<Box<dyn Future<Output = Result<(), GroupError>> + Send>>;

/// An Unpublished message with an ID that can be `awaited` to publish all messages.
/// This message can be safely dropped, and [`MlsGroup::sync`] called manually instead.
pub struct UnpublishedMessage<ApiClient> {
message_id: Vec<u8>,
client: Arc<Client<ApiClient>>,
group: MlsGroup,
}

impl<ApiClient> UnpublishedMessage<ApiClient>
where
ApiClient: XmtpApi,
{
fn new(message_id: Vec<u8>, client: Arc<Client<ApiClient>>, group: MlsGroup) -> Self {
Self {
message_id,
client,
group,
}
}

pub fn id(&self) -> &[u8] {
&self.message_id
}

/// Publish messages to the delivery service
pub async fn publish(&self) -> Result<(), GroupError> {
let conn = self.group.context.store.conn()?;
let update_interval = Some(5_000_000);
self.group
.maybe_update_installations(conn.clone(), update_interval, self.client.as_ref())
.await?;
self.group
.publish_intents(conn, self.client.as_ref())
.await?;
Ok(())
}
}

impl MlsGroup {
// Creates a new group instance. Does not validate that the group exists in the DB
pub fn new(context: Arc<XmtpMlsLocalContext>, group_id: Vec<u8>, created_at_ns: i64) -> Self {
Expand Down Expand Up @@ -488,23 +448,28 @@ impl MlsGroup {
message_id
}

/// Send a message, optimistically retrieving ID before the result of a message send.
pub fn send_message_optimistic<ApiClient>(
/// Publish all unpublished messages
pub async fn publish_messages<ApiClient>(
&self,
message: &[u8],
client: &Arc<Client<ApiClient>>,
) -> Result<UnpublishedMessage<ApiClient>, GroupError>
client: &Client<ApiClient>,
) -> Result<(), GroupError>
where
ApiClient: XmtpApi,
{
let conn = self.context.store.conn()?;
let update_interval = Some(5_000_000);
self.maybe_update_installations(conn.clone(), update_interval, client)
.await?;
self.publish_intents(conn, client).await?;
Ok(())
}

/// Send a message, optimistically returning the ID of the message before the result of a message publish.
pub fn send_message_optimistic(&self, message: &[u8]) -> Result<Vec<u8>, GroupError> {
let conn = self.context.store.conn()?;
let message_id = self.prepare_message(message, &conn)?;

Ok(UnpublishedMessage::new(
message_id,
client.clone(),
self.clone(),
))
Ok(message_id)
}

/// Prepare a message (intent & id) on this users XMTP [`Client`].
Expand Down Expand Up @@ -1249,7 +1214,7 @@ mod tests {
group_mutable_metadata::MetadataField,
intents::{PermissionPolicyOption, PermissionUpdateType},
members::{GroupMember, PermissionLevel},
GroupMetadataOptions, PreconfiguredPolicies, UpdateAdminListType,
DeliveryStatus, GroupMetadataOptions, PreconfiguredPolicies, UpdateAdminListType,
},
storage::{
group_intent::IntentState,
Expand Down Expand Up @@ -2653,35 +2618,75 @@ mod tests {
.unwrap();
let bola_group = receive_group_invite(&bola).await;

amal_group
.send_message_optimistic(b"test one", &amal)
.unwrap();
amal_group
.send_message_optimistic(b"test two", &amal)
.unwrap();
amal_group
.send_message_optimistic(b"test three", &amal)
.unwrap();
let four = amal_group
.send_message_optimistic(b"test four", &amal)
.unwrap();
let ids = vec![
amal_group.send_message_optimistic(b"test one").unwrap(),
amal_group.send_message_optimistic(b"test two").unwrap(),
amal_group.send_message_optimistic(b"test three").unwrap(),
amal_group.send_message_optimistic(b"test four").unwrap(),
];

four.publish().await.unwrap();
let messages = amal_group
.find_messages(Some(GroupMessageKind::Application), None, None, None, None)
.unwrap()
.into_iter()
.collect::<Vec<StoredGroupMessage>>();

let text = messages
.iter()
.cloned()
.map(|m| String::from_utf8_lossy(&m.decrypted_message_bytes).to_string())
.collect::<Vec<String>>();
assert_eq!(
ids,
messages
.iter()
.cloned()
.map(|m| m.id)
.collect::<Vec<Vec<u8>>>()
);
assert_eq!(
text,
vec![
"test one".to_string(),
"test two".to_string(),
"test three".to_string(),
"test four".to_string(),
]
);

let delivery = messages
.iter()
.cloned()
.map(|m| m.delivery_status)
.collect::<Vec<DeliveryStatus>>();
assert_eq!(
delivery,
vec![
DeliveryStatus::Unpublished,
DeliveryStatus::Unpublished,
DeliveryStatus::Unpublished,
DeliveryStatus::Unpublished,
]
);

amal_group.publish_messages(&amal).await.unwrap();
bola_group.sync(&bola).await.unwrap();

let messages = bola_group
.find_messages(None, None, None, None, None)
.unwrap();
let delivery = messages
.iter()
.cloned()
.map(|m| m.delivery_status)
.collect::<Vec<DeliveryStatus>>();
assert_eq!(
messages
.into_iter()
.map(|m| m.decrypted_message_bytes)
.collect::<Vec<Vec<u8>>>(),
delivery,
vec![
b"test one".to_vec(),
b"test two".to_vec(),
b"test three".to_vec(),
b"test four".to_vec(),
DeliveryStatus::Published,
DeliveryStatus::Published,
DeliveryStatus::Published,
DeliveryStatus::Published,
]
);
}
Expand Down

0 comments on commit ec1da4e

Please sign in to comment.