Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
rygine committed Aug 21, 2024
1 parent 49edac1 commit a8de672
Showing 1 changed file with 33 additions and 10 deletions.
43 changes: 33 additions & 10 deletions xmtp_mls/src/groups/message_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,26 @@ where
None,
None,
)?;

log::info!("messages: {:?}", messages);
log::info!("installation_id: {:?}", installation_id);

// find the most recent history request that has not been replied to
let last_history_request: (Option<String>, Option<String>, Option<StoredGroupMessage>) =
messages
.into_iter()
.fold((None, None, None), |mut acc, msg| {
match bincode::deserialize(&msg.decrypted_message_bytes) {
let result =
bincode::deserialize::<MessageHistoryContent>(&msg.decrypted_message_bytes);

log::info!("result: {:?}", result);

match result {
Ok(MessageHistoryContent::Request(request))
// only include requests from the current installation
if msg.sender_installation_id.eq(&installation_id) =>
{
log::info!("request: {:?}", request);
if let Some(ref existing_msg) = acc.2 {
// use the most recent request
if msg.sent_at_ns > existing_msg.sent_at_ns {
Expand All @@ -167,6 +177,7 @@ where
// only include replies to the current installation
if !msg.sender_installation_id.eq(&installation_id) =>
{
log::info!("reply: {:?}", reply);
if let Some(ref existing_request_id) = acc.0 {
// reset the request if it's been replied to
if existing_request_id.eq(&reply.request_id) {
Expand All @@ -176,7 +187,13 @@ where
}
}
}
_ => {}
Ok(result) => {
log::info!("other message content: {:?}", result);
log::info!("sender_installation_id: {:?}", msg.sender_installation_id);
}
Err(err) => {
log::info!("error: {:?}", err);
}
}
acc
});
Expand All @@ -191,13 +208,11 @@ where
let pin_code = history_request.pin_code.clone();
let request_id = history_request.request_id.clone();

let content: MessageHistoryContent =
MessageHistoryContent::Request(MessageHistoryRequest {
request_id: request_id.clone(),
pin_code: pin_code.clone(),
})
.into();
let content_bytes = bincode::serialize(&content)
let content = MessageHistoryContent::Request(MessageHistoryRequest {
request_id: request_id.clone(),
pin_code: pin_code.clone(),
});
let content_bytes = bincode::serialize::<MessageHistoryContent>(&content)
.map_err(|e| MessageProcessingError::Generic(format!("{e}")))?;

let _message_id =
Expand Down Expand Up @@ -250,7 +265,7 @@ where
}

// the reply message
let content: MessageHistoryContent = MessageHistoryContent::Reply(contents.clone()).into();
let content = MessageHistoryContent::Reply(contents.clone());
let content_bytes = bincode::serialize(&content)
.map_err(|e| MessageProcessingError::Generic(format!("{e}")))?;

Expand Down Expand Up @@ -734,6 +749,14 @@ mod tests {
.expect("history request");
assert_eq!(request_id.len(), 32);
assert_eq!(pin_code.len(), 4);

// test that another request will return the same request_id and pin_code
let (request_id2, pin_code2) = client
.send_history_request()
.await
.expect("history request");
assert_eq!(request_id, request_id2);
assert_eq!(pin_code, pin_code2);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
Expand Down

0 comments on commit a8de672

Please sign in to comment.