Skip to content

Commit

Permalink
Add an enqueue time to the send queue system
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Salinas authored and Daniel Salinas committed Dec 7, 2024
1 parent bf6fa4c commit b1ef4ca
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 24 deletions.
20 changes: 12 additions & 8 deletions crates/matrix-sdk-base/src/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use ruma::{
},
serde::Raw,
time::Instant,
CanonicalJsonObject, EventId, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedTransactionId,
OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId,
CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri,
OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId,
};
use tracing::{debug, instrument, trace, warn};

Expand Down Expand Up @@ -809,12 +809,15 @@ impl StateStore for MemoryStore {
kind: QueuedRequestKind,
priority: usize,
) -> Result<(), Self::Error> {
self.send_queue_events
.write()
.unwrap()
.entry(room_id.to_owned())
.or_default()
.push(QueuedRequest { kind, transaction_id, error: None, priority });
self.send_queue_events.write().unwrap().entry(room_id.to_owned()).or_default().push(
QueuedRequest {
kind,
transaction_id,
error: None,
priority,
enqueue_time: Some(MilliSecondsSinceUnixEpoch::now()),
},
);
Ok(())
}

Expand Down Expand Up @@ -912,6 +915,7 @@ impl StateStore for MemoryStore {
parent_transaction_id: parent_transaction_id.to_owned(),
own_transaction_id,
parent_key: None,
enqueue_time: None,
},
);
Ok(())
Expand Down
9 changes: 8 additions & 1 deletion crates/matrix-sdk-base/src/store/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use ruma::{
AnyMessageLikeEventContent, EventContent as _, RawExt as _,
},
serde::Raw,
OwnedDeviceId, OwnedEventId, OwnedTransactionId, OwnedUserId, TransactionId, UInt,
MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedEventId, OwnedTransactionId, OwnedUserId,
TransactionId, UInt,
};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -131,6 +132,9 @@ pub struct QueuedRequest {
/// The bigger the value, the higher the priority at which this request
/// should be handled.
pub priority: usize,

/// The time that the request was original attempted.
pub enqueue_time: Option<MilliSecondsSinceUnixEpoch>,
}

impl QueuedRequest {
Expand Down Expand Up @@ -365,6 +369,9 @@ pub struct DependentQueuedRequest {
/// If the parent request has been sent, the parent's request identifier
/// returned by the server once the local echo has been sent out.
pub parent_key: Option<SentRequestKey>,

/// The time that the request was original attempted.
pub enqueue_time: Option<MilliSecondsSinceUnixEpoch>,
}

impl DependentQueuedRequest {
Expand Down
16 changes: 13 additions & 3 deletions crates/matrix-sdk-indexeddb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ use ruma::{
GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType, SyncStateEvent,
},
serde::Raw,
CanonicalJsonObject, EventId, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedTransactionId,
OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId,
CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri,
OwnedRoomId, OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UserId,
};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tracing::{debug, warn};
Expand Down Expand Up @@ -444,6 +444,8 @@ struct PersistedQueuedRequest {
is_wedged: Option<bool>,

event: Option<SerializableEventContent>,

enqueue_time: Option<MilliSecondsSinceUnixEpoch>,
}

impl PersistedQueuedRequest {
Expand All @@ -464,7 +466,13 @@ impl PersistedQueuedRequest {
// By default, events without a priority have a priority of 0.
let priority = self.priority.unwrap_or(0);

Some(QueuedRequest { kind, transaction_id: self.transaction_id, error, priority })
Some(QueuedRequest {
kind,
transaction_id: self.transaction_id,
error,
priority,
enqueue_time: self.enqueue_time,
})
}
}

Expand Down Expand Up @@ -1389,6 +1397,7 @@ impl_state_store!({
is_wedged: None,
event: None,
priority: Some(priority),
enqueue_time: Some(MilliSecondsSinceUnixEpoch::now()),
});

// Save the new vector into db.
Expand Down Expand Up @@ -1584,6 +1593,7 @@ impl_state_store!({
parent_transaction_id: parent_txn_id.to_owned(),
own_transaction_id: own_txn_id,
parent_key: None,
enqueue_time: None,
});

// Save the new vector into db.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- Migration script to add the enqueue_time column to the send_queue_events table
ALTER TABLE "send_queue_events"
ADD COLUMN "enqueue_time" INTEGER NOT NULL DEFAULT (strftime('%s', 'now'));

ALTER TABLE "dependent_send_queue_events"
ADD COLUMN "enqueue_time" INTEGER NOT NULL DEFAULT (strftime('%s', 'now'));
36 changes: 24 additions & 12 deletions crates/matrix-sdk-sqlite/src/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use ruma::{
GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
},
serde::Raw,
CanonicalJsonObject, EventId, OwnedEventId, OwnedRoomId, OwnedTransactionId, OwnedUserId,
RoomId, RoomVersionId, TransactionId, UserId,
CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
OwnedTransactionId, OwnedUserId, RoomId, RoomVersionId, TransactionId, UInt, UserId,
};
use rusqlite::{OptionalExtension, Transaction};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
Expand Down Expand Up @@ -69,7 +69,7 @@ mod keys {
/// This is used to figure whether the sqlite database requires a migration.
/// Every new SQL migration should imply a bump of this number, and changes in
/// the [`SqliteStateStore::run_migrations`] function..
const DATABASE_VERSION: u8 = 10;
const DATABASE_VERSION: u8 = 11;

/// A sqlite based cryptostore.
#[derive(Clone)]
Expand Down Expand Up @@ -318,6 +318,17 @@ impl SqliteStateStore {
.await?;
}

if from < 11 && to >= 11 {
conn.with_transaction(move |txn| {
// Run the migration.
txn.execute_batch(include_str!(
"../migrations/state_store/010_send_queue_enqueue_time.sql"
))?;
txn.set_db_version(11)
})
.await?;
}

Ok(())
}

Expand Down Expand Up @@ -1753,7 +1764,6 @@ impl StateStore for SqliteStateStore {
let room_id_value = self.serialize_value(&room_id.to_owned())?;

let content = self.serialize_json(&content)?;

// The transaction id is used both as a key (in remove/update) and a value (as
// it's useful for the callers), so we keep it as is, and neither hash
// it (with encode_key) or encrypt it (through serialize_value). After
Expand Down Expand Up @@ -1824,26 +1834,28 @@ impl StateStore for SqliteStateStore {
// Note: ROWID is always present and is an auto-incremented integer counter. We
// want to maintain the insertion order, so we can sort using it.
// Note 2: transaction_id is not encoded, see why in `save_send_queue_event`.
let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize)> = self
let res: Vec<(String, Vec<u8>, Option<Vec<u8>>, usize, u64)> = self
.acquire()
.await?
.prepare(
"SELECT transaction_id, content, wedge_reason, priority FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
"SELECT transaction_id, content, wedge_reason, priority, enqueue_time FROM send_queue_events WHERE room_id = ? ORDER BY priority DESC, ROWID",
|mut stmt| {
stmt.query((room_id,))?
.mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)))
.mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
.collect()
},
)
.await?;

let mut requests = Vec::with_capacity(res.len());
for entry in res {
let enqueue_time = MilliSecondsSinceUnixEpoch(UInt::new(entry.4).unwrap());
requests.push(QueuedRequest {
transaction_id: entry.0.into(),
kind: self.deserialize_json(&entry.1)?,
error: entry.2.map(|v| self.deserialize_value(&v)).transpose()?,
priority: entry.3,
enqueue_time: Some(enqueue_time),
});
}

Expand Down Expand Up @@ -1909,7 +1921,6 @@ impl StateStore for SqliteStateStore {
// See comment in `save_send_queue_event`.
let parent_txn_id = parent_txn_id.to_string();
let own_txn_id = own_txn_id.to_string();

self.acquire()
.await?
.with_transaction(move |txn| {
Expand Down Expand Up @@ -2011,26 +2022,28 @@ impl StateStore for SqliteStateStore {
let room_id = self.encode_key(keys::DEPENDENTS_SEND_QUEUE, room_id);

// Note: transaction_id is not encoded, see why in `save_send_queue_event`.
let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>)> = self
let res: Vec<(String, String, Option<Vec<u8>>, Vec<u8>, u64)> = self
.acquire()
.await?
.prepare(
"SELECT own_transaction_id, parent_transaction_id, parent_key, content FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
"SELECT own_transaction_id, parent_transaction_id, parent_key, content, enqueue_time FROM dependent_send_queue_events WHERE room_id = ? ORDER BY ROWID",
|mut stmt| {
stmt.query((room_id,))?
.mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)))
.mapped(|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?)))
.collect()
},
)
.await?;

let mut dependent_events = Vec::with_capacity(res.len());
for entry in res {
let enqueue_time = MilliSecondsSinceUnixEpoch(UInt::new(entry.4).unwrap());
dependent_events.push(DependentQueuedRequest {
own_transaction_id: entry.0.into(),
parent_transaction_id: entry.1.into(),
parent_key: entry.2.map(|bytes| self.deserialize_value(&bytes)).transpose()?,
kind: self.deserialize_json(&entry.3)?,
enqueue_time: Some(enqueue_time),
});
}

Expand Down Expand Up @@ -2427,7 +2440,6 @@ mod migration_tests {
let room_id_value = this.serialize_value(&room_id.to_owned())?;

let content = this.serialize_json(&content)?;

txn.prepare_cached("INSERT INTO send_queue_events (room_id, room_id_val, transaction_id, content, wedged) VALUES (?, ?, ?, ?, ?)")?
.execute((room_id_key, room_id_value, transaction_id.to_string(), content, is_wedged))?;

Expand Down
8 changes: 8 additions & 0 deletions crates/matrix-sdk/src/send_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2275,6 +2275,7 @@ mod tests {
.unwrap(),
},
parent_key: None,
enqueue_time: None,
};
let res = canonicalize_dependent_requests(&[edit]);

Expand All @@ -2295,6 +2296,7 @@ mod tests {
parent_transaction_id: txn.clone(),
kind: DependentQueuedRequestKind::RedactEvent,
parent_key: None,
enqueue_time: None,
};

let edit = DependentQueuedRequest {
Expand All @@ -2307,6 +2309,7 @@ mod tests {
.unwrap(),
},
parent_key: None,
enqueue_time: None,
};

inputs.push({
Expand Down Expand Up @@ -2346,6 +2349,7 @@ mod tests {
.unwrap(),
},
parent_key: None,
enqueue_time: None,
})
.collect::<Vec<_>>();

Expand Down Expand Up @@ -2377,6 +2381,7 @@ mod tests {
kind: DependentQueuedRequestKind::RedactEvent,
parent_transaction_id: txn1.clone(),
parent_key: None,
enqueue_time: None,
},
// This one pertains to txn2.
DependentQueuedRequest {
Expand All @@ -2389,6 +2394,7 @@ mod tests {
},
parent_transaction_id: txn2.clone(),
parent_key: None,
enqueue_time: None,
},
];

Expand Down Expand Up @@ -2419,6 +2425,7 @@ mod tests {
kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() },
parent_transaction_id: txn.clone(),
parent_key: None,
enqueue_time: None,
};

let edit_id = ChildTransactionId::new();
Expand All @@ -2432,6 +2439,7 @@ mod tests {
},
parent_transaction_id: txn,
parent_key: None,
enqueue_time: None,
};

let res = canonicalize_dependent_requests(&[react, edit]);
Expand Down

0 comments on commit b1ef4ca

Please sign in to comment.