Skip to content

Commit

Permalink
advancement
Browse files Browse the repository at this point in the history
  • Loading branch information
mkysel committed Sep 13, 2024
1 parent e8c869a commit 4dc6e88
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 36 deletions.
4 changes: 2 additions & 2 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub async fn create_client(
host,
is_secure
);
let api_client = TonicApiClient::create(host.clone(), is_secure).await?;
let api_client = TonicApiClient::create(host.clone(), is_secure, false).await?;

log::info!(
"Creating message store with path: {:?} and encryption key: {} of length {:?}",
Expand Down Expand Up @@ -159,7 +159,7 @@ pub async fn get_inbox_id_for_address(
account_address: String,
) -> Result<Option<String>, GenericError> {
let api_client = ApiClientWrapper::new(
TonicApiClient::create(host.clone(), is_secure).await?,
TonicApiClient::create(host.clone(), is_secure, false).await?,
Retry::default(),
);

Expand Down
2 changes: 1 addition & 1 deletion bindings_ffi/src/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub async fn create_v2_client(
host: String,
is_secure: bool,
) -> Result<Arc<FfiV2ApiClient>, GenericError> {
let client = GrpcClient::create(host, is_secure).await?;
let client = GrpcClient::create(host, is_secure, false).await?;

let client = FfiV2ApiClient {
inner_client: Arc::new(client),
Expand Down
4 changes: 2 additions & 2 deletions examples/cli/cli-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ async fn create_client(cli: &Cli, account: IdentityStrategy) -> Result<Client, C
info!("Using local network");
builder = builder
.api_client(
ApiClient::create("http://localhost:5556".into(), false)
ApiClient::create("http://localhost:5556".into(), false, false)
.await
.unwrap(),
)
Expand All @@ -425,7 +425,7 @@ async fn create_client(cli: &Cli, account: IdentityStrategy) -> Result<Client, C
info!("Using dev network");
builder = builder
.api_client(
ApiClient::create("https://grpc.dev.xmtp.network:443".into(), true)
ApiClient::create("https://grpc.dev.xmtp.network:443".into(), true, false)
.await
.unwrap(),
)
Expand Down
116 changes: 96 additions & 20 deletions xmtp_api_grpc/src/grpc_api_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time::Duration;

use futures::stream::{AbortHandle, Abortable};
use futures::{SinkExt, Stream, StreamExt, TryStreamExt};
use prost::Message;
use tokio::sync::oneshot;
use tonic::transport::ClientTlsConfig;
use tonic::{metadata::MetadataValue, transport::Channel, Request, Streaming};
Expand All @@ -28,7 +29,8 @@ use xmtp_proto::{
UploadKeyPackageRequest,
},
};
use xmtp_proto::xmtp::xmtpv4::{BatchSubscribeEnvelopesRequest, BatchSubscribeEnvelopesResponse, PublishEnvelopeRequest, PublishEnvelopeResponse, QueryEnvelopesRequest, QueryEnvelopesResponse};
use xmtp_proto::xmtp::xmtpv4::{BatchSubscribeEnvelopesRequest, BatchSubscribeEnvelopesResponse, ClientEnvelope, PayerEnvelope, PublishEnvelopeRequest, PublishEnvelopeResponse, QueryEnvelopesRequest, QueryEnvelopesResponse};
use xmtp_proto::xmtp::xmtpv4::client_envelope::Payload;
use xmtp_proto::xmtp::xmtpv4::replication_api_client::ReplicationApiClient;

async fn create_tls_channel(address: String) -> Result<Channel, Error> {
Expand Down Expand Up @@ -75,10 +77,11 @@ pub struct Client {
pub(crate) app_version: MetadataValue<tonic::metadata::Ascii>,
pub(crate) libxmtp_version: MetadataValue<tonic::metadata::Ascii>,
pub(crate) replication_client: ReplicationApiClient<Channel>,
use_replication_v4: bool,
}

impl Client {
pub async fn create(host: String, is_secure: bool) -> Result<Self, Error> {
pub async fn create(host: String, is_secure: bool, use_replication_v4: bool) -> Result<Self, Error> {
let host = host.to_string();
let app_version = MetadataValue::try_from(&String::from("0.0.0"))
.map_err(|e| Error::new(ErrorKind::MetadataError).with(e))?;
Expand All @@ -105,7 +108,8 @@ impl Client {
app_version,
libxmtp_version,
identity_client,
replication_client
replication_client,
use_replication_v4
})
}

Expand Down Expand Up @@ -340,12 +344,22 @@ impl MutableApiSubscription for GrpcMutableSubscription {
impl XmtpMlsClient for Client {
#[tracing::instrument(level = "trace", skip_all)]
async fn upload_key_package(&self, req: UploadKeyPackageRequest) -> Result<(), Error> {
let client = &mut self.mls_client.clone();
if self.use_replication_v4 {
let client = &mut self.replication_client.clone();
let payload = wrap_client_envelope(req.to_client_envelope());
let res = client.publish_envelope(payload).await;
match res {
Ok(_) => Ok(()),
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)),
}
} else {
let client = &mut self.mls_client.clone();

let res = client.upload_key_package(self.build_request(req)).await;
match res {
Ok(_) => Ok(()),
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)),
let res = client.upload_key_package(self.build_request(req)).await;
match res {
Ok(_) => Ok(()),
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)),
}
}
}

Expand All @@ -363,23 +377,41 @@ impl XmtpMlsClient for Client {

#[tracing::instrument(level = "trace", skip_all)]
async fn send_group_messages(&self, req: SendGroupMessagesRequest) -> Result<(), Error> {
let client = &mut self.mls_client.clone();
let res = client.send_group_messages(self.build_request(req)).await;

match res {
Ok(_) => Ok(()),
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)),
if self.use_replication_v4 {
let client = &mut self.replication_client.clone();
let payload = wrap_client_envelope(req.to_client_envelope());
let res = client.publish_envelope(payload).await;
match res {
Ok(_) => Ok(()),
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)),
}
} else {
let client = &mut self.mls_client.clone();
let res = client.send_group_messages(self.build_request(req)).await;
match res {
Ok(_) => Ok(()),
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)),
}
}
}

#[tracing::instrument(level = "trace", skip_all)]
async fn send_welcome_messages(&self, req: SendWelcomeMessagesRequest) -> Result<(), Error> {
let client = &mut self.mls_client.clone();
let res = client.send_welcome_messages(self.build_request(req)).await;

match res {
Ok(_) => Ok(()),
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)),
if self.use_replication_v4 {
let client = &mut self.replication_client.clone();
let payload = wrap_client_envelope(req.to_client_envelope());
let res = client.publish_envelope(payload).await;
match res {
Ok(_) => Ok(()),
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)),
}
} else {
let client = &mut self.mls_client.clone();
let res = client.send_welcome_messages(self.build_request(req)).await;
match res {
Ok(_) => Ok(()),
Err(e) => Err(Error::new(ErrorKind::MlsError).with(e)),
}
}
}

Expand Down Expand Up @@ -547,3 +579,47 @@ impl XmtpReplicationClient for Client {
Ok(stream.into())
}
}


trait ClientEnvelopeConversion {
fn to_client_envelope(self) -> ClientEnvelope;
}

impl ClientEnvelopeConversion for SendGroupMessagesRequest {
fn to_client_envelope(self) -> ClientEnvelope {
ClientEnvelope {
aad: None,
payload: Some(Payload::GroupMessage(self.messages.first().unwrap().clone()))
}
}
}

impl ClientEnvelopeConversion for SendWelcomeMessagesRequest {
fn to_client_envelope(self) -> ClientEnvelope {
ClientEnvelope {
aad: None,
payload: Some(Payload::WelcomeMessage(self.messages.first().unwrap().clone()))
}
}
}

impl ClientEnvelopeConversion for UploadKeyPackageRequest {
fn to_client_envelope(self) -> ClientEnvelope {
ClientEnvelope {
aad: None,
payload: Some(Payload::UploadKeyPackage(self))
}
}
}

fn wrap_client_envelope(req: ClientEnvelope) -> PublishEnvelopeRequest {
let mut buf = vec![];
req.encode(&mut buf).unwrap();

PublishEnvelopeRequest {
payer_envelope: Some(PayerEnvelope {
unsigned_client_envelope: buf,
payer_signature: None,
}),
}
}
18 changes: 9 additions & 9 deletions xmtp_api_grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ mod tests {

#[tokio::test]
async fn grpc_query_test() {
let mut client = Client::create(LOCALHOST_ADDRESS.to_string(), false)
let mut client = Client::create(LOCALHOST_ADDRESS.to_string(), false, false)
.await
.unwrap();

Expand All @@ -70,7 +70,7 @@ mod tests {

#[tokio::test]
async fn grpc_batch_query_test() {
let client = Client::create(LOCALHOST_ADDRESS.to_string(), false)
let client = Client::create(LOCALHOST_ADDRESS.to_string(), false, false)
.await
.unwrap();
let req = BatchQueryRequest {
Expand All @@ -85,7 +85,7 @@ mod tests {

#[tokio::test]
async fn publish_test() {
let client = Client::create(LOCALHOST_ADDRESS.to_string(), false)
let client = Client::create(LOCALHOST_ADDRESS.to_string(), false, false)
.await
.unwrap();

Expand Down Expand Up @@ -115,7 +115,7 @@ mod tests {
#[tokio::test]
async fn subscribe_test() {
tokio::time::timeout(std::time::Duration::from_secs(5), async move {
let client = Client::create(LOCALHOST_ADDRESS.to_string(), false)
let client = Client::create(LOCALHOST_ADDRESS.to_string(), false, false)
.await
.unwrap();

Expand Down Expand Up @@ -162,7 +162,7 @@ mod tests {

#[tokio::test]
async fn tls_test() {
let client = Client::create(DEV_ADDRESS.to_string(), true).await.unwrap();
let client = Client::create(DEV_ADDRESS.to_string(), true, false).await.unwrap();

let result = client
.query(QueryRequest {
Expand All @@ -177,7 +177,7 @@ mod tests {

#[tokio::test]
async fn bidrectional_streaming_test() {
let client = Client::create(LOCALHOST_ADDRESS.to_string(), false)
let client = Client::create(LOCALHOST_ADDRESS.to_string(), false, false)
.await
.unwrap();

Expand Down Expand Up @@ -234,7 +234,7 @@ mod tests {
#[tokio::test]
async fn test_dev_publish() {
let auth_token = get_auth_token();
let dev_client = Client::create(DEV_ADDRESS.to_string(), true).await.unwrap();
let dev_client = Client::create(DEV_ADDRESS.to_string(), true, false).await.unwrap();
dev_client
.publish(
auth_token,
Expand All @@ -254,7 +254,7 @@ mod tests {
async fn long_lived_subscribe_test() {
let auth_token = get_auth_token();
tokio::time::timeout(std::time::Duration::from_secs(100), async move {
let client = Client::create(DEV_ADDRESS.to_string(), true).await.unwrap();
let client = Client::create(DEV_ADDRESS.to_string(), true, false).await.unwrap();

let topic = uuid::Uuid::new_v4();
let mut subscription = client
Expand Down Expand Up @@ -307,7 +307,7 @@ mod tests {

#[tokio::test]
async fn metadata_test() {
let mut client = Client::create(DEV_ADDRESS.to_string(), true).await.unwrap();
let mut client = Client::create(DEV_ADDRESS.to_string(), true, false).await.unwrap();
let app_version = "test/1.0.0".to_string();
let libxmtp_version = "0.0.1".to_string();

Expand Down
4 changes: 2 additions & 2 deletions xmtp_mls/src/utils/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ impl XmtpTestClient for XmtpHttpApiClient {

impl XmtpTestClient for GrpcClient {
async fn create_local() -> Self {
GrpcClient::create("http://localhost:5556".into(), false)
GrpcClient::create("http://localhost:5556".into(), false, false)
.await
.unwrap()
}

async fn create_dev() -> Self {
GrpcClient::create("https://grpc.dev.xmtp.network:443".into(), false)
GrpcClient::create("https://grpc.dev.xmtp.network:443".into(), false, false)
.await
.unwrap()
}
Expand Down

0 comments on commit 4dc6e88

Please sign in to comment.