Skip to content

Commit

Permalink
Expose the timestamp through the matrix-sdk-ui + FFI layers
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Salinas committed Dec 9, 2024
1 parent 013da78 commit f06e529
Show file tree
Hide file tree
Showing 19 changed files with 156 additions and 84 deletions.
15 changes: 12 additions & 3 deletions bindings/matrix-sdk-ffi/src/timeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,10 @@ impl TimelineItem {
#[derive(Clone, uniffi::Enum)]
pub enum EventSendState {
/// The local event has not been sent yet.
NotSentYet,
NotSentYet {
/// When the send was first attempted.
created_at: Option<u64>,
},

/// The local event has been sent to the server, but unsuccessfully: The
/// sending has failed.
Expand All @@ -1019,6 +1022,9 @@ pub enum EventSendState {
/// while an unrecoverable error will be parked, until the user
/// decides to cancel sending it.
is_recoverable: bool,

/// When the send was first attempted.
created_at: Option<u64>,
},

/// The local event has been sent successfully to the server.
Expand All @@ -1030,12 +1036,15 @@ impl From<&matrix_sdk_ui::timeline::EventSendState> for EventSendState {
use matrix_sdk_ui::timeline::EventSendState::*;

match value {
NotSentYet => Self::NotSentYet,
SendingFailed { error, is_recoverable } => {
NotSentYet { created_at } => {
Self::NotSentYet { created_at: created_at.map(|ts| ts.as_secs().into()) }
}
SendingFailed { error, is_recoverable, created_at } => {
let as_queue_wedge_error: matrix_sdk::QueueWedgeError = (&**error).into();
Self::SendingFailed {
is_recoverable: *is_recoverable,
error: as_queue_wedge_error.into(),
created_at: created_at.map(|ts| ts.as_secs().into()),
}
}
Sent { event_id } => Self::Sent { event_id: event_id.to_string() },
Expand Down
14 changes: 8 additions & 6 deletions crates/matrix-sdk-base/src/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,17 +808,18 @@ impl StateStore for MemoryStore {
transaction_id: OwnedTransactionId,
kind: QueuedRequestKind,
priority: usize,
) -> Result<(), Self::Error> {
) -> Result<MilliSecondsSinceUnixEpoch, Self::Error> {
let created_at = MilliSecondsSinceUnixEpoch::now();
self.send_queue_events.write().unwrap().entry(room_id.to_owned()).or_default().push(
QueuedRequest {
kind,
transaction_id,
error: None,
priority,
created_at: Some(MilliSecondsSinceUnixEpoch::now()),
created_at: Some(created_at.clone()),
},
);
Ok(())
Ok(created_at)
}

async fn update_send_queue_request(
Expand Down Expand Up @@ -908,17 +909,18 @@ impl StateStore for MemoryStore {
parent_transaction_id: &TransactionId,
own_transaction_id: ChildTransactionId,
content: DependentQueuedRequestKind,
) -> Result<(), Self::Error> {
) -> Result<MilliSecondsSinceUnixEpoch, Self::Error> {
let created_at = MilliSecondsSinceUnixEpoch::now();
self.dependent_send_queue_events.write().unwrap().entry(room.to_owned()).or_default().push(
DependentQueuedRequest {
kind: content,
parent_transaction_id: parent_transaction_id.to_owned(),
own_transaction_id,
parent_key: None,
created_at: None,
created_at: Some(created_at.clone()),
},
);
Ok(())
Ok(created_at)
}

async fn mark_dependent_queued_requests_as_ready(
Expand Down
12 changes: 6 additions & 6 deletions crates/matrix-sdk-base/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use ruma::{
},
serde::Raw,
time::SystemTime,
EventId, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId,
TransactionId, UserId,
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, OwnedRoomId,
OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -361,7 +361,7 @@ pub trait StateStore: AsyncTraitDeps {
transaction_id: OwnedTransactionId,
request: QueuedRequestKind,
priority: usize,
) -> Result<(), Self::Error>;
) -> Result<MilliSecondsSinceUnixEpoch, Self::Error>;

/// Updates a send queue request with the given content, and resets its
/// error status.
Expand Down Expand Up @@ -422,7 +422,7 @@ pub trait StateStore: AsyncTraitDeps {
parent_txn_id: &TransactionId,
own_txn_id: ChildTransactionId,
content: DependentQueuedRequestKind,
) -> Result<(), Self::Error>;
) -> Result<MilliSecondsSinceUnixEpoch, Self::Error>;

/// Mark a set of dependent send queue requests as ready, using a key
/// identifying the homeserver's response.
Expand Down Expand Up @@ -659,7 +659,7 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
transaction_id: OwnedTransactionId,
content: QueuedRequestKind,
priority: usize,
) -> Result<(), Self::Error> {
) -> Result<MilliSecondsSinceUnixEpoch, Self::Error> {
self.0
.save_send_queue_request(room_id, transaction_id, content, priority)
.await
Expand Down Expand Up @@ -712,7 +712,7 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
parent_txn_id: &TransactionId,
own_txn_id: ChildTransactionId,
content: DependentQueuedRequestKind,
) -> Result<(), Self::Error> {
) -> Result<MilliSecondsSinceUnixEpoch, Self::Error> {
self.0
.save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, content)
.await
Expand Down
25 changes: 15 additions & 10 deletions crates/matrix-sdk-indexeddb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ use ruma::{
events::{
presence::PresenceEvent,
receipt::{Receipt, ReceiptThread, ReceiptType},
room::member::{
MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent, SyncRoomMemberEvent,
room::{
create,
member::{
MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent,
SyncRoomMemberEvent,
},
},
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncStateEvent,
GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, SyncStateEvent,
Expand Down Expand Up @@ -442,7 +446,7 @@ struct PersistedQueuedRequest {
/// The time the original message was first attempted to be sent at.
#[serde(skip_serializing_if = "Option::is_none")]
created_at: Option<MilliSecondsSinceUnixEpoch>,

// Migrated fields: keep these private, they're not used anymore elsewhere in the code base.
/// Deprecated (from old format), now replaced with error field.
is_wedged: Option<bool>,
Expand Down Expand Up @@ -1370,7 +1374,7 @@ impl_state_store!({
transaction_id: OwnedTransactionId,
kind: QueuedRequestKind,
priority: usize,
) -> Result<()> {
) -> Result<MilliSecondsSinceUnixEpoch> {
let encoded_key = self.encode_key(keys::ROOM_SEND_QUEUE, room_id);

let tx = self
Expand All @@ -1389,7 +1393,7 @@ impl_state_store!({
|| Ok(Vec::new()),
|val| self.deserialize_value::<Vec<PersistedQueuedRequest>>(&val),
)?;

let created_at = MilliSecondsSinceUnixEpoch::now();
// Push the new request.
prev.push(PersistedQueuedRequest {
room_id: room_id.to_owned(),
Expand All @@ -1399,15 +1403,15 @@ impl_state_store!({
is_wedged: None,
event: None,
priority: Some(priority),
created_at: Some(MilliSecondsSinceUnixEpoch::now()),
created_at: Some(created_at.clone()),
});

// Save the new vector into db.
obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;

tx.await.into_result()?;

Ok(())
Ok(created_at)
}

async fn update_send_queue_request(
Expand Down Expand Up @@ -1570,7 +1574,7 @@ impl_state_store!({
parent_txn_id: &TransactionId,
own_txn_id: ChildTransactionId,
content: DependentQueuedRequestKind,
) -> Result<()> {
) -> Result<MilliSecondsSinceUnixEpoch> {
let encoded_key = self.encode_key(keys::DEPENDENT_SEND_QUEUE, room_id);

let tx = self.inner.transaction_on_one_with_mode(
Expand All @@ -1589,21 +1593,22 @@ impl_state_store!({
|val| self.deserialize_value::<Vec<DependentQueuedRequest>>(&val),
)?;

let created_at = MilliSecondsSinceUnixEpoch::now();
// Push the new request.
prev.push(DependentQueuedRequest {
kind: content,
parent_transaction_id: parent_txn_id.to_owned(),
own_transaction_id: own_txn_id,
parent_key: None,
created_at: None,
created_at: Some(created_at.clone()),
});

// Save the new vector into db.
obj.put_key_val(&encoded_key, &self.serialize_value(&prev)?)?;

tx.await.into_result()?;

Ok(())
Ok(created_at)
}

async fn update_dependent_queued_request(
Expand Down
26 changes: 18 additions & 8 deletions crates/matrix-sdk-sqlite/src/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1759,7 +1759,7 @@ impl StateStore for SqliteStateStore {
transaction_id: OwnedTransactionId,
content: QueuedRequestKind,
priority: usize,
) -> Result<(), Self::Error> {
) -> Result<MilliSecondsSinceUnixEpoch, Self::Error> {
let room_id_key = self.encode_key(keys::SEND_QUEUE, room_id);
let room_id_value = self.serialize_value(&room_id.to_owned())?;

Expand All @@ -1769,11 +1769,13 @@ impl StateStore for SqliteStateStore {
// it (with encode_key) or encrypt it (through serialize_value). After
// all, it carries no personal information, so this is considered fine.

let created_at = MilliSecondsSinceUnixEpoch::now();
let created_at_ts: u64 = created_at.0.into();
self.acquire()
.await?
.with_transaction(move |txn| {
txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority) VALUES (?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority))?;
Ok(())
txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, priority, created_at) VALUES (?, ?, ?, ?, ?, ?)")?.execute((room_id_key, room_id_value, transaction_id.to_string(), content, priority, created_at_ts))?;
Ok(created_at)
})
.await
}
Expand Down Expand Up @@ -1914,24 +1916,32 @@ impl StateStore for SqliteStateStore {
parent_txn_id: &TransactionId,
own_txn_id: ChildTransactionId,
content: DependentQueuedRequestKind,
) -> Result<()> {
) -> Result<MilliSecondsSinceUnixEpoch> {
let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);
let content = self.serialize_json(&content)?;

// See comment in `save_send_queue_event`.
let parent_txn_id = parent_txn_id.to_string();
let own_txn_id = own_txn_id.to_string();

let created_at = MilliSecondsSinceUnixEpoch::now();
let created_at_ts: u64 = created_at.0.into();
self.acquire()
.await?
.with_transaction(move |txn| {
txn.prepare_cached(
r#"INSERT INTO dependent_send_queue_events
(room_id, parent_transaction_id, own_transaction_id, content)
VALUES (?, ?, ?, ?)"#,
(room_id, parent_transaction_id, own_transaction_id, content, created_at)
VALUES (?, ?, ?, ?, ?)"#,
)?
.execute((room_id, parent_txn_id, own_txn_id, content))?;
Ok(())
.execute((
room_id,
parent_txn_id,
own_txn_id,
content,
created_at_ts,
))?;
Ok(created_at)
})
.await
}
Expand Down
23 changes: 18 additions & 5 deletions crates/matrix-sdk-ui/src/timeline/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,9 @@ impl<P: RoomDataProvider> TimelineController<P> {
warn!("We looked for a local item, but it transitioned as remote??");
return false;
};
prev_local_item.with_send_state(EventSendState::NotSentYet)
prev_local_item.with_send_state(EventSendState::NotSentYet {
created_at: prev_local_item.send_handle.clone().map(|h| h.created_at).flatten(),
})
};

// Replace the local-related state (kind) and the content state.
Expand Down Expand Up @@ -1260,6 +1262,7 @@ impl<P: RoomDataProvider> TimelineController<P> {
}
};

let created_at = send_handle.created_at.clone();
self.handle_local_event(
echo.transaction_id.clone(),
TimelineEventKind::Message { content, relations: Default::default() },
Expand All @@ -1273,6 +1276,7 @@ impl<P: RoomDataProvider> TimelineController<P> {
EventSendState::SendingFailed {
error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(send_error)),
is_recoverable: false,
created_at,
},
)
.await;
Expand Down Expand Up @@ -1356,16 +1360,25 @@ impl<P: RoomDataProvider> TimelineController<P> {
}
}

RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
RoomSendQueueUpdate::SendError {
transaction_id,
error,
is_recoverable,
created_at,
} => {
self.update_event_send_state(
&transaction_id,
EventSendState::SendingFailed { error, is_recoverable },
EventSendState::SendingFailed { error, is_recoverable, created_at },
)
.await;
}

RoomSendQueueUpdate::RetryEvent { transaction_id } => {
self.update_event_send_state(&transaction_id, EventSendState::NotSentYet).await;
RoomSendQueueUpdate::RetryEvent { transaction_id, created_at } => {
self.update_event_send_state(
&transaction_id,
EventSendState::NotSentYet { created_at },
)
.await;
}

RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
Expand Down
4 changes: 3 additions & 1 deletion crates/matrix-sdk-ui/src/timeline/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,9 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> {

let kind: EventTimelineItemKind = match &self.ctx.flow {
Flow::Local { txn_id, send_handle } => LocalEventTimelineItem {
send_state: EventSendState::NotSentYet,
send_state: EventSendState::NotSentYet {
created_at: send_handle.clone().map(|h| h.created_at).flatten(),
},
transaction_id: txn_id.to_owned(),
send_handle: send_handle.clone(),
}
Expand Down
10 changes: 8 additions & 2 deletions crates/matrix-sdk-ui/src/timeline/event_item/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;

use as_variant::as_variant;
use matrix_sdk::{send_queue::SendHandle, Error};
use ruma::{EventId, OwnedEventId, OwnedTransactionId};
use ruma::{EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId};

use super::TimelineEventItemId;

Expand Down Expand Up @@ -65,7 +65,10 @@ impl LocalEventTimelineItem {
#[derive(Clone, Debug)]
pub enum EventSendState {
/// The local event has not been sent yet.
NotSentYet,
NotSentYet {
/// When the send was first attempted.
created_at: Option<MilliSecondsSinceUnixEpoch>,
},
/// The local event has been sent to the server, but unsuccessfully: The
/// sending has failed.
SendingFailed {
Expand All @@ -77,6 +80,9 @@ pub enum EventSendState {
/// while an unrecoverable error will be parked, until the user
/// decides to cancel sending it.
is_recoverable: bool,

/// When the send was first attempted.
created_at: Option<MilliSecondsSinceUnixEpoch>,
},
/// The local event has been sent successfully to the server.
Sent {
Expand Down
Loading

0 comments on commit f06e529

Please sign in to comment.