From 6f34f37257314410740c0b43bcbdeae056513040 Mon Sep 17 00:00:00 2001 From: Jonathan Johnson Date: Sat, 19 Feb 2022 10:48:59 -0800 Subject: [PATCH 1/3] Refactored Executed transactions to be smaller Someone reported that they weren't able to insert as many records in a transaction as I had expected. After some consideration, I realized that one aspect could be optimized greatly. Each changed record had a copy of the collection name, and this new structure keeps a list of changed collections separate from the changed documents, and the collection on the changed document structure is now an index into that collection. There is still code to be written for backwards compatibility. However, it makes more sense to introduce a new unit test in main() that includes a v0.1 database that can be loaded by the current version and verify its still working. --- crates/bonsaidb-core/src/test_util.rs | 10 +- crates/bonsaidb-core/src/transaction.rs | 21 ++-- crates/bonsaidb-local/src/database.rs | 100 ++++++++++++------- crates/bonsaidb-local/src/database/legacy.rs | 81 +++++++++++++++ crates/bonsaidb-local/src/error.rs | 4 + 5 files changed, 170 insertions(+), 46 deletions(-) create mode 100644 crates/bonsaidb-local/src/database/legacy.rs diff --git a/crates/bonsaidb-core/src/test_util.rs b/crates/bonsaidb-core/src/test_util.rs index 8984e9a6f9a..fb973e1241d 100644 --- a/crates/bonsaidb-core/src/test_util.rs +++ b/crates/bonsaidb-core/src/test_util.rs @@ -689,12 +689,14 @@ pub async fn store_retrieve_update_delete_tests(db: &C) -> anyhow assert_eq!(transactions.len(), 2); assert!(transactions[0].id < transactions[1].id); for transaction in &transactions { - let changed_documents = transaction + let (collections, changed_documents) = transaction .changes .documents() .expect("incorrect transaction type"); assert_eq!(changed_documents.len(), 1); - assert_eq!(changed_documents[0].collection, Basic::collection_name()); + assert_eq!(collections.len(), 1); + assert_eq!(collections[0], Basic::collection_name()); + assert_eq!(changed_documents[0].collection, 0); assert_eq!(header.id, changed_documents[0].id.deserialize()?); assert!(!changed_documents[0].deleted); } @@ -706,12 +708,12 @@ pub async fn store_retrieve_update_delete_tests(db: &C) -> anyhow .await?; assert_eq!(transactions.len(), 1); let transaction = transactions.first().unwrap(); - let changed_documents = transaction + let (collections, changed_documents) = transaction .changes .documents() .expect("incorrect transaction type"); assert_eq!(changed_documents.len(), 1); - assert_eq!(changed_documents[0].collection, Basic::collection_name()); + assert_eq!(collections[0], Basic::collection_name()); assert_eq!(header.id, changed_documents[0].id.deserialize()?); assert!(changed_documents[0].deleted); diff --git a/crates/bonsaidb-core/src/transaction.rs b/crates/bonsaidb-core/src/transaction.rs index a3d93546be7..4c526afd008 100644 --- a/crates/bonsaidb-core/src/transaction.rs +++ b/crates/bonsaidb-core/src/transaction.rs @@ -269,7 +269,12 @@ pub struct Executed { #[derive(Clone, Debug, Serialize, Deserialize)] pub enum Changes { /// A list of changed documents. - Documents(Vec), + Documents { + /// All of the collections changed. + collections: Vec, + /// The individual document changes. + changes: Vec, + }, /// A list of changed keys. Keys(Vec), } @@ -278,9 +283,13 @@ impl Changes { /// Returns the list of documents changed in this transaction, or None if /// the transaction was not a document transaction. #[must_use] - pub fn documents(&self) -> Option<&[ChangedDocument]> { - if let Self::Documents(docs) = self { - Some(docs) + pub fn documents(&self) -> Option<(&[CollectionName], &[ChangedDocument])> { + if let Self::Documents { + collections, + changes, + } = self + { + Some((collections, changes)) } else { None } @@ -301,8 +310,8 @@ impl Changes { /// A record of a changed document. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ChangedDocument { - /// The id of the `Collection` of the changed `Document`. - pub collection: CollectionName, + /// The index of the `CollectionName` within the `collections` field of [`Changes::Documents`]. + pub collection: u16, /// The id of the changed `Document`. pub id: DocumentId, diff --git a/crates/bonsaidb-local/src/database.rs b/crates/bonsaidb-local/src/database.rs index 6037dc23799..926d66d3406 100644 --- a/crates/bonsaidb-local/src/database.rs +++ b/crates/bonsaidb-local/src/database.rs @@ -64,6 +64,7 @@ use crate::{ pub mod keyvalue; +mod legacy; pub mod pubsub; /// A local, file-based database. @@ -679,6 +680,8 @@ impl Database { let mut results = Vec::new(); let mut changed_documents = Vec::new(); + let mut collection_indexes = HashMap::new(); + let mut collections = Vec::new(); for op in &transaction.operations { let result = self.execute_operation( op, @@ -686,57 +689,81 @@ impl Database { &open_trees.trees_index_by_name, )?; - match &result { + if let Some((collection, id, deleted)) = match &result { OperationResult::DocumentUpdated { header, collection } => { - changed_documents.push(ChangedDocument { - collection: collection.clone(), - id: header.id, - deleted: false, - }); + Some((collection, header.id, false)) } OperationResult::DocumentDeleted { id, collection } => { - changed_documents.push(ChangedDocument { - collection: collection.clone(), - id: *id, - deleted: true, - }); + Some((collection, *id, true)) } - OperationResult::Success => {} + OperationResult::Success => None, + } { + let collection = match collection_indexes.get(collection) { + Some(index) => *index, + None => { + if let Ok(id) = u16::try_from(collections.len()) { + collection_indexes.insert(collection.clone(), id); + collections.push(collection.clone()); + id + } else { + return Err(Error::TransactionTooLarge); + } + } + }; + changed_documents.push(ChangedDocument { + collection, + id, + deleted, + }); } results.push(result); } - // Insert invalidations for each record changed + self.invalidate_changed_documents( + &mut roots_transaction, + &open_trees, + &collections, + &changed_documents, + )?; + + roots_transaction + .entry_mut() + .set_data(pot::to_vec(&Changes::Documents { + collections, + changes: changed_documents, + })?)?; + + roots_transaction.commit()?; + + Ok(results) + } + + fn invalidate_changed_documents( + &self, + roots_transaction: &mut ExecutingTransaction, + open_trees: &OpenTrees, + collections: &[CollectionName], + changed_documents: &[ChangedDocument], + ) -> Result<(), Error> { for (collection, changed_documents) in &changed_documents .iter() - .group_by(|doc| doc.collection.clone()) + .group_by(|doc| &collections[usize::from(doc.collection)]) { - if let Some(views) = self.data.schema.views_in_collection(&collection) { + if let Some(views) = self.data.schema.views_in_collection(collection) { let changed_documents = changed_documents.collect::>(); - for view in views { - if !view.unique() { - let view_name = view.view_name(); - for changed_document in &changed_documents { - let invalidated_docs = roots_transaction - .tree::( - open_trees.trees_index_by_name - [&view_invalidated_docs_tree_name(&view_name)], - ) - .unwrap(); - invalidated_docs.set(changed_document.id.as_ref().to_vec(), b"")?; - } + for view in views.into_iter().filter(|view| !view.unique()) { + let view_name = view.view_name(); + let tree_name = view_invalidated_docs_tree_name(&view_name); + for changed_document in &changed_documents { + let invalidated_docs = roots_transaction + .tree::(open_trees.trees_index_by_name[&tree_name]) + .unwrap(); + invalidated_docs.set(changed_document.id.as_ref().to_vec(), b"")?; } } } } - - roots_transaction - .entry_mut() - .set_data(pot::to_vec(&Changes::Documents(changed_documents))?)?; - - roots_transaction.commit()?; - - Ok(results) + Ok(()) } fn execute_operation( @@ -1331,7 +1358,8 @@ impl Connection for Database { let changes = match pot::from_slice(data) { Ok(changes) => changes, Err(pot::Error::NotAPot) => { - Changes::Documents(bincode::deserialize(entry.data().unwrap())?) + todo!() + // Changes::Documents(bincode::deserialize(entry.data().unwrap())?) } other => other?, }; diff --git a/crates/bonsaidb-local/src/database/legacy.rs b/crates/bonsaidb-local/src/database/legacy.rs new file mode 100644 index 00000000000..7e9f430bf71 --- /dev/null +++ b/crates/bonsaidb-local/src/database/legacy.rs @@ -0,0 +1,81 @@ +use std::collections::HashMap; + +use bonsaidb_core::{ + document::DocumentId, + schema::CollectionName, + transaction::{ChangedDocument, ChangedKey, Changes, Executed}, +}; +use serde::{Deserialize, Serialize}; + +/// Details about an executed transaction. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ExecutedTransactionV0 { + /// The id of the transaction. + pub id: u64, + + /// A list of containing ids of `Documents` changed. + pub changes: ChangesV0, +} + +impl From for Executed { + fn from(legacy: ExecutedTransactionV0) -> Self { + Self { + id: legacy.id, + changes: legacy.changes.into(), + } + } +} + +/// A list of changes. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum ChangesV0 { + /// A list of changed documents. + Documents(Vec), + /// A list of changed keys. + Keys(Vec), +} + +impl From for Changes { + fn from(legacy: ChangesV0) -> Self { + match legacy { + ChangesV0::Documents(legacy_documents) => { + let mut changed_documents = Vec::with_capacity(legacy_documents.len()); + let mut collections = Vec::new(); + let mut collection_indexes = HashMap::new(); + for changed in legacy_documents { + let collection = if let Some(id) = collection_indexes.get(&changed.collection) { + *id + } else { + let id = u16::try_from(collections.len()).unwrap(); + collection_indexes.insert(changed.collection.clone(), id); + collections.push(changed.collection); + id + }; + changed_documents.push(ChangedDocument { + collection, + id: changed.id, + deleted: changed.deleted, + }); + } + Self::Documents { + collections, + changes: changed_documents, + } + } + ChangesV0::Keys(changes) => Self::Keys(changes), + } + } +} + +/// A record of a changed document. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChangedDocumentV0 { + /// The id of the `Collection` of the changed `Document`. + pub collection: CollectionName, + + /// The id of the changed `Document`. + pub id: DocumentId, + + /// If the `Document` has been deleted, this will be `true`. + pub deleted: bool, +} diff --git a/crates/bonsaidb-local/src/error.rs b/crates/bonsaidb-local/src/error.rs index 929af7b6172..87cc4708713 100644 --- a/crates/bonsaidb-local/src/error.rs +++ b/crates/bonsaidb-local/src/error.rs @@ -26,6 +26,10 @@ pub enum Error { #[error("error while communicating internally")] InternalCommunication, + /// A transaction was too large to execute. + #[error("transaction is too large")] + TransactionTooLarge, + /// An error occurred while executing a view #[error("error from view: {0}")] View(#[from] view::Error), From a12d6cd9cca1f7010da8e4c860e2f279a3767915 Mon Sep 17 00:00:00 2001 From: Jonathan Johnson Date: Sun, 20 Feb 2022 09:28:16 -0800 Subject: [PATCH 2/3] Implemented backwards compatibility for tx log This builds on the compatibility tests by actually testing the executed transaction loading from old databases. It turns out that my attempt at making DocumentId parse u64 or bytes was ill-fated, as it requires calling deserialize_any. Not wanting to prevent DocumentId from working in formats like bincode, I decided to just embrace the change as part of this upgrade flow. --- crates/bonsaidb-core/src/document/id.rs | 8 - crates/bonsaidb-local/Cargo.toml | 1 + crates/bonsaidb-local/src/database.rs | 21 +-- crates/bonsaidb-local/src/database/compat.rs | 151 ++++++++++++++++++ .../bonsaidb-local/src/database/keyvalue.rs | 5 +- crates/bonsaidb-local/src/database/legacy.rs | 81 ---------- crates/bonsaidb-local/src/error.rs | 16 +- .../bonsaidb-local/src/tests/compatibility.rs | 55 ++++--- 8 files changed, 206 insertions(+), 132 deletions(-) create mode 100644 crates/bonsaidb-local/src/database/compat.rs delete mode 100644 crates/bonsaidb-local/src/database/legacy.rs diff --git a/crates/bonsaidb-core/src/document/id.rs b/crates/bonsaidb-core/src/document/id.rs index a4507146df3..284ea115412 100644 --- a/crates/bonsaidb-core/src/document/id.rs +++ b/crates/bonsaidb-core/src/document/id.rs @@ -344,14 +344,6 @@ impl<'de> Visitor<'de> for DocumentIdVisitor { Err(E::invalid_length(v.len(), &"< 64 bytes")) } } - - // Provided for backwards compatibility. No new data is written with this. - fn visit_u64(self, v: u64) -> Result - where - E: serde::de::Error, - { - Ok(DocumentId::from_u64(v)) - } } /// A unique id for a document, either serialized or deserialized. diff --git a/crates/bonsaidb-local/Cargo.toml b/crates/bonsaidb-local/Cargo.toml index 9db89608fd7..adaebf88f78 100644 --- a/crates/bonsaidb-local/Cargo.toml +++ b/crates/bonsaidb-local/Cargo.toml @@ -57,6 +57,7 @@ thiserror = "1" tokio = { version = "=1.16.1", features = ["full"] } serde = { version = "1", features = ["derive"] } pot = "1.0.0" +transmog-versions = "0.1.0" bincode = "1.3" flume = "0.10" itertools = "0.10" diff --git a/crates/bonsaidb-local/src/database.rs b/crates/bonsaidb-local/src/database.rs index 926d66d3406..51c5592c415 100644 --- a/crates/bonsaidb-local/src/database.rs +++ b/crates/bonsaidb-local/src/database.rs @@ -64,7 +64,7 @@ use crate::{ pub mod keyvalue; -mod legacy; +pub(crate) mod compat; pub mod pubsub; /// A local, file-based database. @@ -728,10 +728,12 @@ impl Database { roots_transaction .entry_mut() - .set_data(pot::to_vec(&Changes::Documents { - collections, - changes: changed_documents, - })?)?; + .set_data(compat::serialize_executed_transaction_changes( + &Changes::Documents { + collections, + changes: changed_documents, + }, + )?)?; roots_transaction.commit()?; @@ -1355,14 +1357,7 @@ impl Connection for Database { .into_iter() .map(|entry| { if let Some(data) = entry.data() { - let changes = match pot::from_slice(data) { - Ok(changes) => changes, - Err(pot::Error::NotAPot) => { - todo!() - // Changes::Documents(bincode::deserialize(entry.data().unwrap())?) - } - other => other?, - }; + let changes = compat::deserialize_executed_transaction_changes(data)?; Ok(Some(transaction::Executed { id: entry.id, changes, diff --git a/crates/bonsaidb-local/src/database/compat.rs b/crates/bonsaidb-local/src/database/compat.rs new file mode 100644 index 00000000000..22464aee44e --- /dev/null +++ b/crates/bonsaidb-local/src/database/compat.rs @@ -0,0 +1,151 @@ +use std::{any::type_name, collections::HashMap, marker::PhantomData}; + +use bonsaidb_core::{ + arc_bytes::serde::Bytes, + document::DocumentId, + schema::CollectionName, + transaction::{ChangedDocument, ChangedKey, Changes}, +}; +use serde::{Deserialize, Serialize}; +use transmog_versions::Versioned; + +#[derive(thiserror::Error)] +pub struct UnknownVersion(PhantomData); + +impl Default for UnknownVersion { + fn default() -> Self { + Self(PhantomData) + } +} + +impl std::fmt::Debug for UnknownVersion { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("UnknownVersion") + .field(&type_name::()) + .finish() + } +} + +impl std::fmt::Display for UnknownVersion { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "incompatilbe version of {}", type_name::()) + } +} + +#[derive(Clone, Copy, Debug)] +enum ChangesVersions { + Legacy = 0, + V1 = 1, +} + +impl Versioned for ChangesVersions { + fn version(&self) -> u64 { + *self as u64 + } +} + +impl TryFrom for ChangesVersions { + type Error = UnknownVersion; + + fn try_from(value: u64) -> Result { + match value { + 0 => Ok(ChangesVersions::Legacy), + 1 => Ok(ChangesVersions::V1), + _ => Err(UnknownVersion::default()), + } + } +} + +pub fn deserialize_executed_transaction_changes(data: &[u8]) -> Result { + let (version, data) = transmog_versions::unwrap_version(data); + match ChangesVersions::try_from(version)? { + ChangesVersions::Legacy => { + let legacy: ChangesV0 = match pot::from_slice(data) { + Ok(changes) => changes, + Err(pot::Error::NotAPot) => ChangesV0::Documents(bincode::deserialize(data)?), + other => other?, + }; + Changes::try_from(legacy).map_err(crate::Error::from) + } + ChangesVersions::V1 => pot::from_slice(data).map_err(crate::Error::from), + } +} + +pub fn serialize_executed_transaction_changes(changes: &Changes) -> Result, crate::Error> { + let mut serialized = Vec::new(); + transmog_versions::write_header(&ChangesVersions::V1, &mut serialized)?; + pot::to_writer(changes, &mut serialized)?; + Ok(serialized) +} + +/// A list of changes. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum ChangesV0 { + /// A list of changed documents. + Documents(Vec), + /// A list of changed keys. + Keys(Vec), +} + +impl TryFrom for Changes { + type Error = bonsaidb_core::Error; + fn try_from(legacy: ChangesV0) -> Result { + match legacy { + ChangesV0::Documents(legacy_documents) => { + let mut changed_documents = Vec::with_capacity(legacy_documents.len()); + let mut collections = Vec::new(); + let mut collection_indexes = HashMap::new(); + for changed in legacy_documents { + let collection = if let Some(id) = collection_indexes.get(&changed.collection) { + *id + } else { + let id = u16::try_from(collections.len()).unwrap(); + collection_indexes.insert(changed.collection.clone(), id); + collections.push(changed.collection); + id + }; + changed_documents.push(ChangedDocument { + collection, + id: changed.id.try_into()?, + deleted: changed.deleted, + }); + } + Ok(Self::Documents { + collections, + changes: changed_documents, + }) + } + ChangesV0::Keys(changes) => Ok(Self::Keys(changes)), + } + } +} + +/// A record of a changed document. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChangedDocumentV0 { + /// The id of the `Collection` of the changed `Document`. + pub collection: CollectionName, + + /// The id of the changed `Document`. + pub id: LegacyDocumentId, + + /// If the `Document` has been deleted, this will be `true`. + pub deleted: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(untagged)] +pub enum LegacyDocumentId { + U64(u64), + Document(Bytes), +} + +impl TryFrom for DocumentId { + type Error = bonsaidb_core::Error; + fn try_from(id: LegacyDocumentId) -> Result { + match id { + LegacyDocumentId::Document(id) => DocumentId::try_from(&id[..]), + LegacyDocumentId::U64(version) => Ok(DocumentId::from_u64(version)), + } + } +} diff --git a/crates/bonsaidb-local/src/database/keyvalue.rs b/crates/bonsaidb-local/src/database/keyvalue.rs index 383dee62dc4..9fb437189e5 100644 --- a/crates/bonsaidb-local/src/database/keyvalue.rs +++ b/crates/bonsaidb-local/src/database/keyvalue.rs @@ -28,6 +28,7 @@ use tokio::{ use crate::{ config::KeyValuePersistence, + database::compat, tasks::{Job, Keyed, Task}, Database, Error, }; @@ -709,7 +710,9 @@ impl KeyValueState { if !changed_keys.is_empty() { transaction .entry_mut() - .set_data(pot::to_vec(&Changes::Keys(changed_keys))?) + .set_data(compat::serialize_executed_transaction_changes( + &Changes::Keys(changed_keys), + )?) .map_err(Error::from)?; transaction.commit().map_err(Error::from)?; } diff --git a/crates/bonsaidb-local/src/database/legacy.rs b/crates/bonsaidb-local/src/database/legacy.rs deleted file mode 100644 index 7e9f430bf71..00000000000 --- a/crates/bonsaidb-local/src/database/legacy.rs +++ /dev/null @@ -1,81 +0,0 @@ -use std::collections::HashMap; - -use bonsaidb_core::{ - document::DocumentId, - schema::CollectionName, - transaction::{ChangedDocument, ChangedKey, Changes, Executed}, -}; -use serde::{Deserialize, Serialize}; - -/// Details about an executed transaction. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ExecutedTransactionV0 { - /// The id of the transaction. - pub id: u64, - - /// A list of containing ids of `Documents` changed. - pub changes: ChangesV0, -} - -impl From for Executed { - fn from(legacy: ExecutedTransactionV0) -> Self { - Self { - id: legacy.id, - changes: legacy.changes.into(), - } - } -} - -/// A list of changes. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum ChangesV0 { - /// A list of changed documents. - Documents(Vec), - /// A list of changed keys. - Keys(Vec), -} - -impl From for Changes { - fn from(legacy: ChangesV0) -> Self { - match legacy { - ChangesV0::Documents(legacy_documents) => { - let mut changed_documents = Vec::with_capacity(legacy_documents.len()); - let mut collections = Vec::new(); - let mut collection_indexes = HashMap::new(); - for changed in legacy_documents { - let collection = if let Some(id) = collection_indexes.get(&changed.collection) { - *id - } else { - let id = u16::try_from(collections.len()).unwrap(); - collection_indexes.insert(changed.collection.clone(), id); - collections.push(changed.collection); - id - }; - changed_documents.push(ChangedDocument { - collection, - id: changed.id, - deleted: changed.deleted, - }); - } - Self::Documents { - collections, - changes: changed_documents, - } - } - ChangesV0::Keys(changes) => Self::Keys(changes), - } - } -} - -/// A record of a changed document. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ChangedDocumentV0 { - /// The id of the `Collection` of the changed `Document`. - pub collection: CollectionName, - - /// The id of the changed `Document`. - pub id: DocumentId, - - /// If the `Document` has been deleted, this will be `true`. - pub deleted: bool, -} diff --git a/crates/bonsaidb-local/src/error.rs b/crates/bonsaidb-local/src/error.rs index 87cc4708713..480ec07b539 100644 --- a/crates/bonsaidb-local/src/error.rs +++ b/crates/bonsaidb-local/src/error.rs @@ -7,6 +7,8 @@ use bonsaidb_core::{ }; use nebari::AbortError; +use crate::database::compat::UnknownVersion; + /// Errors that can occur from interacting with storage. #[derive(thiserror::Error, Debug)] pub enum Error { @@ -16,7 +18,7 @@ pub enum Error { /// An error occurred serializing the underlying database structures. #[error("error while serializing internal structures: {0}")] - InternalSerialization(#[from] bincode::Error), + InternalSerialization(String), /// An error occurred serializing the contents of a `Document` or results of a `View`. #[error("error while serializing: {0}")] @@ -81,6 +83,18 @@ impl From for Error { } } +impl From for Error { + fn from(err: bincode::Error) -> Self { + Self::InternalSerialization(err.to_string()) + } +} + +impl From> for Error { + fn from(err: UnknownVersion) -> Self { + Self::InternalSerialization(err.to_string()) + } +} + #[cfg(feature = "password-hashing")] impl From for Error { fn from(err: argon2::Error) -> Self { diff --git a/crates/bonsaidb-local/src/tests/compatibility.rs b/crates/bonsaidb-local/src/tests/compatibility.rs index 5607b6c189a..4056e3f2dec 100644 --- a/crates/bonsaidb-local/src/tests/compatibility.rs +++ b/crates/bonsaidb-local/src/tests/compatibility.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashSet, path::{Path, PathBuf}, time::Duration, }; @@ -235,34 +236,32 @@ async fn test_basic(db: Database) { assert_eq!(mapping.value, expected_value); } - // This was written in the tx-log-optimization branch, but brought back - // temporarily to allow creating a v0.2 database for testing against. - // - // let transactions = db.list_executed_transactions(None, None).await.unwrap(); - // let kv_transactions = transactions - // .iter() - // .filter_map(|t| t.changes.keys()) - // .collect::>(); - // assert_eq!(kv_transactions.len(), 2); - // let keys = kv_transactions - // .iter() - // .flat_map(|changed_keys| { - // changed_keys - // .iter() - // .map(|changed_key| changed_key.key.as_str()) - // }) - // .collect::>(); - // assert_eq!(keys.len(), 2); - // assert!(keys.contains("string")); - // assert!(keys.contains("integer")); - // let basic_transactions = transactions.iter().filter_map(|t| { - // t.changes.documents().and_then(|(collections, documents)| { - // collections - // .contains(&Basic::collection_name()) - // .then(|| documents) - // }) - // }); - // assert_eq!(basic_transactions.count(), 5); + let transactions = db.list_executed_transactions(None, None).await.unwrap(); + let kv_transactions = transactions + .iter() + .filter_map(|t| t.changes.keys()) + .collect::>(); + assert_eq!(kv_transactions.len(), 2); + let keys = kv_transactions + .iter() + .flat_map(|changed_keys| { + changed_keys + .iter() + .map(|changed_key| changed_key.key.as_str()) + }) + .collect::>(); + assert_eq!(keys.len(), 2); + assert!(keys.contains("string")); + assert!(keys.contains("integer")); + + let basic_transactions = transactions.iter().filter_map(|t| { + t.changes.documents().and_then(|(collections, documents)| { + collections + .contains(&Basic::collection_name()) + .then(|| documents) + }) + }); + assert_eq!(basic_transactions.count(), 5); } async fn test_unique(db: Database) { From c90235eb1d52212ede45593baa171a148bfacc99 Mon Sep 17 00:00:00 2001 From: Jonathan Johnson Date: Mon, 21 Feb 2022 10:08:10 -0800 Subject: [PATCH 3/3] Refactored Changes::Documents into its own type This makes it easier to consume and iterate. --- CHANGELOG.md | 13 ++ crates/bonsaidb-core/src/test_util.rs | 24 +-- crates/bonsaidb-core/src/transaction.rs | 174 ++++++++++++++++-- crates/bonsaidb-local/src/database.rs | 9 +- crates/bonsaidb-local/src/database/compat.rs | 8 +- .../bonsaidb-local/src/tests/compatibility.rs | 7 +- 6 files changed, 199 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index aa7f7a11f38..df4e9756552 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 in the PliantDb days. After thinking about the job scheduler more, this initial implementation is better suited for the internal task management than the higher-level jobs system. As such, it has been internalized. +- `bonsaidb::core::transaction::Changes::Documents` has been changed to store + the `CollectionName`s separately from the `ChangedDocument`s. This makes the + transaction log entries smaller, as collection names aren't copied for each + document. + + The storage layer is fully backwards compatible and will automatically convert + existing transactions to the new format. + +### Fixed + +- Listing executed transactions that were written in `v0.1` was broken in + `v0.2`. Backwards compatibility is now automatically tested to help ensure + this sort of issue won't happen in the future again. ## v0.2.0 diff --git a/crates/bonsaidb-core/src/test_util.rs b/crates/bonsaidb-core/src/test_util.rs index fb973e1241d..c99f3be28c9 100644 --- a/crates/bonsaidb-core/src/test_util.rs +++ b/crates/bonsaidb-core/src/test_util.rs @@ -689,16 +689,16 @@ pub async fn store_retrieve_update_delete_tests(db: &C) -> anyhow assert_eq!(transactions.len(), 2); assert!(transactions[0].id < transactions[1].id); for transaction in &transactions { - let (collections, changed_documents) = transaction + let changes = transaction .changes .documents() .expect("incorrect transaction type"); - assert_eq!(changed_documents.len(), 1); - assert_eq!(collections.len(), 1); - assert_eq!(collections[0], Basic::collection_name()); - assert_eq!(changed_documents[0].collection, 0); - assert_eq!(header.id, changed_documents[0].id.deserialize()?); - assert!(!changed_documents[0].deleted); + assert_eq!(changes.documents.len(), 1); + assert_eq!(changes.collections.len(), 1); + assert_eq!(changes.collections[0], Basic::collection_name()); + assert_eq!(changes.documents[0].collection, 0); + assert_eq!(header.id, changes.documents[0].id.deserialize()?); + assert!(!changes.documents[0].deleted); } db.collection::().delete(&doc).await?; @@ -708,14 +708,14 @@ pub async fn store_retrieve_update_delete_tests(db: &C) -> anyhow .await?; assert_eq!(transactions.len(), 1); let transaction = transactions.first().unwrap(); - let (collections, changed_documents) = transaction + let changes = transaction .changes .documents() .expect("incorrect transaction type"); - assert_eq!(changed_documents.len(), 1); - assert_eq!(collections[0], Basic::collection_name()); - assert_eq!(header.id, changed_documents[0].id.deserialize()?); - assert!(changed_documents[0].deleted); + assert_eq!(changes.documents.len(), 1); + assert_eq!(changes.collections[0], Basic::collection_name()); + assert_eq!(header.id, changes.documents[0].id.deserialize()?); + assert!(changes.documents[0].deleted); // Use the Collection interface let mut doc = original_value.clone().push_into(db).await?; diff --git a/crates/bonsaidb-core/src/transaction.rs b/crates/bonsaidb-core/src/transaction.rs index 4c526afd008..148782d75bb 100644 --- a/crates/bonsaidb-core/src/transaction.rs +++ b/crates/bonsaidb-core/src/transaction.rs @@ -269,12 +269,7 @@ pub struct Executed { #[derive(Clone, Debug, Serialize, Deserialize)] pub enum Changes { /// A list of changed documents. - Documents { - /// All of the collections changed. - collections: Vec, - /// The individual document changes. - changes: Vec, - }, + Documents(DocumentChanges), /// A list of changed keys. Keys(Vec), } @@ -283,13 +278,9 @@ impl Changes { /// Returns the list of documents changed in this transaction, or None if /// the transaction was not a document transaction. #[must_use] - pub fn documents(&self) -> Option<(&[CollectionName], &[ChangedDocument])> { - if let Self::Documents { - collections, - changes, - } = self - { - Some((collections, changes)) + pub const fn documents(&self) -> Option<&DocumentChanges> { + if let Self::Documents(changes) = self { + Some(changes) } else { None } @@ -307,6 +298,163 @@ impl Changes { } } +/// A list of changed documents. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct DocumentChanges { + /// All of the collections changed. + pub collections: Vec, + /// The individual document changes. + pub documents: Vec, +} + +impl DocumentChanges { + /// Returns the changed document and the name of the collection the change + /// happened to. + #[must_use] + pub fn get(&self, index: usize) -> Option<(&CollectionName, &ChangedDocument)> { + self.documents.get(index).and_then(|doc| { + self.collections + .get(usize::from(doc.collection)) + .map(|collection| (collection, doc)) + }) + } + + /// Returns the number of changes in this collection. + #[must_use] + pub fn len(&self) -> usize { + self.documents.len() + } + + /// Returns true if there are no changes. + #[must_use] + pub fn is_empty(&self) -> bool { + self.documents.is_empty() + } + + /// Returns an interator over all of the changed documents. + pub const fn iter(&self) -> DocumentChangesIter<'_> { + DocumentChangesIter { + changes: self, + index: Some(0), + } + } +} + +/// An iterator over [`DocumentChanges`]. +#[must_use] +pub struct DocumentChangesIter<'a> { + changes: &'a DocumentChanges, + index: Option, +} + +impl<'a> Iterator for DocumentChangesIter<'a> { + type Item = (&'a CollectionName, &'a ChangedDocument); + + fn next(&mut self) -> Option { + self.index.and_then(|index| { + let result = self.changes.get(index); + if result.is_some() { + self.index = index.checked_add(1); + } + result + }) + } +} + +/// A draining iterator over [`ChangedDocument`]s. +#[must_use] +pub struct DocumentChangesIntoIter { + collections: Vec, + documents: std::vec::IntoIter, +} + +impl Iterator for DocumentChangesIntoIter { + type Item = (CollectionName, ChangedDocument); + + fn next(&mut self) -> Option { + self.documents.next().and_then(|doc| { + self.collections + .get(usize::from(doc.collection)) + .map(|collection| (collection.clone(), doc)) + }) + } +} + +impl IntoIterator for DocumentChanges { + type Item = (CollectionName, ChangedDocument); + + type IntoIter = DocumentChangesIntoIter; + + fn into_iter(self) -> Self::IntoIter { + DocumentChangesIntoIter { + collections: self.collections, + documents: self.documents.into_iter(), + } + } +} + +#[test] +fn document_changes_iter() { + let changes = DocumentChanges { + collections: vec![CollectionName::private("a"), CollectionName::private("b")], + documents: vec![ + ChangedDocument { + collection: 0, + id: DocumentId::from_u64(0), + deleted: false, + }, + ChangedDocument { + collection: 0, + id: DocumentId::from_u64(1), + deleted: false, + }, + ChangedDocument { + collection: 1, + id: DocumentId::from_u64(2), + deleted: false, + }, + ChangedDocument { + collection: 2, + id: DocumentId::from_u64(3), + deleted: false, + }, + ], + }; + + assert_eq!(changes.len(), 4); + assert!(!changes.is_empty()); + + let mut a_changes = 0; + let mut b_changes = 0; + let mut ids = Vec::new(); + for (collection, document) in changes.iter() { + assert!(!ids.contains(&document.id)); + ids.push(document.id); + match collection.name.as_ref() { + "a" => a_changes += 1, + "b" => b_changes += 1, + _ => unreachable!("invalid collection name {collection}"), + } + } + assert_eq!(a_changes, 2); + assert_eq!(b_changes, 1); + + let mut a_changes = 0; + let mut b_changes = 0; + let mut ids = Vec::new(); + for (collection, document) in changes { + assert!(!ids.contains(&document.id)); + ids.push(document.id); + match collection.name.as_ref() { + "a" => a_changes += 1, + "b" => b_changes += 1, + _ => unreachable!("invalid collection name {collection}"), + } + } + assert_eq!(a_changes, 2); + assert_eq!(b_changes, 1); +} + /// A record of a changed document. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ChangedDocument { diff --git a/crates/bonsaidb-local/src/database.rs b/crates/bonsaidb-local/src/database.rs index 51c5592c415..a38a5a14ae3 100644 --- a/crates/bonsaidb-local/src/database.rs +++ b/crates/bonsaidb-local/src/database.rs @@ -31,7 +31,8 @@ use bonsaidb_core::{ Collection, CollectionName, Map, MappedValue, Schema, Schematic, ViewName, }, transaction::{ - self, ChangedDocument, Changes, Command, Operation, OperationResult, Transaction, + self, ChangedDocument, Changes, Command, DocumentChanges, Operation, OperationResult, + Transaction, }, }; use bonsaidb_utils::fast_async_lock; @@ -729,10 +730,10 @@ impl Database { roots_transaction .entry_mut() .set_data(compat::serialize_executed_transaction_changes( - &Changes::Documents { + &Changes::Documents(DocumentChanges { collections, - changes: changed_documents, - }, + documents: changed_documents, + }), )?)?; roots_transaction.commit()?; diff --git a/crates/bonsaidb-local/src/database/compat.rs b/crates/bonsaidb-local/src/database/compat.rs index 22464aee44e..0ace25aadb2 100644 --- a/crates/bonsaidb-local/src/database/compat.rs +++ b/crates/bonsaidb-local/src/database/compat.rs @@ -4,7 +4,7 @@ use bonsaidb_core::{ arc_bytes::serde::Bytes, document::DocumentId, schema::CollectionName, - transaction::{ChangedDocument, ChangedKey, Changes}, + transaction::{ChangedDocument, ChangedKey, Changes, DocumentChanges}, }; use serde::{Deserialize, Serialize}; use transmog_versions::Versioned; @@ -110,10 +110,10 @@ impl TryFrom for Changes { deleted: changed.deleted, }); } - Ok(Self::Documents { + Ok(Self::Documents(DocumentChanges { collections, - changes: changed_documents, - }) + documents: changed_documents, + })) } ChangesV0::Keys(changes) => Ok(Self::Keys(changes)), } diff --git a/crates/bonsaidb-local/src/tests/compatibility.rs b/crates/bonsaidb-local/src/tests/compatibility.rs index 4056e3f2dec..2465c6ae647 100644 --- a/crates/bonsaidb-local/src/tests/compatibility.rs +++ b/crates/bonsaidb-local/src/tests/compatibility.rs @@ -255,10 +255,11 @@ async fn test_basic(db: Database) { assert!(keys.contains("integer")); let basic_transactions = transactions.iter().filter_map(|t| { - t.changes.documents().and_then(|(collections, documents)| { - collections + t.changes.documents().and_then(|changes| { + changes + .collections .contains(&Basic::collection_name()) - .then(|| documents) + .then(|| &changes.documents) }) }); assert_eq!(basic_transactions.count(), 5);