Skip to content

Commit

Permalink
Merge pull request #207 from khonsulabs/tx-log-optimization
Browse files Browse the repository at this point in the history
Remove duplicate CollectionNames from the transaction log
  • Loading branch information
ecton authored Feb 21, 2022
2 parents 543777e + c90235e commit 99c9b61
Show file tree
Hide file tree
Showing 10 changed files with 458 additions and 97 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
in the PliantDb days. After thinking about the job scheduler more, this
initial implementation is better suited for the internal task management than
the higher-level jobs system. As such, it has been internalized.
- `bonsaidb::core::transaction::Changes::Documents` has been changed to store
the `CollectionName`s separately from the `ChangedDocument`s. This makes the
transaction log entries smaller, as collection names aren't copied for each
document.

The storage layer is fully backwards compatible and will automatically convert
existing transactions to the new format.

### Fixed

- Listing executed transactions that were written in `v0.1` was broken in
`v0.2`. Backwards compatibility is now automatically tested to help ensure
this sort of issue won't happen in the future again.

## v0.2.0

Expand Down
8 changes: 0 additions & 8 deletions crates/bonsaidb-core/src/document/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,14 +344,6 @@ impl<'de> Visitor<'de> for DocumentIdVisitor {
Err(E::invalid_length(v.len(), &"< 64 bytes"))
}
}

// Provided for backwards compatibility. No new data is written with this.
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(DocumentId::from_u64(v))
}
}

/// A unique id for a document, either serialized or deserialized.
Expand Down
22 changes: 12 additions & 10 deletions crates/bonsaidb-core/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,14 +689,16 @@ pub async fn store_retrieve_update_delete_tests<C: Connection>(db: &C) -> anyhow
assert_eq!(transactions.len(), 2);
assert!(transactions[0].id < transactions[1].id);
for transaction in &transactions {
let changed_documents = transaction
let changes = transaction
.changes
.documents()
.expect("incorrect transaction type");
assert_eq!(changed_documents.len(), 1);
assert_eq!(changed_documents[0].collection, Basic::collection_name());
assert_eq!(header.id, changed_documents[0].id.deserialize()?);
assert!(!changed_documents[0].deleted);
assert_eq!(changes.documents.len(), 1);
assert_eq!(changes.collections.len(), 1);
assert_eq!(changes.collections[0], Basic::collection_name());
assert_eq!(changes.documents[0].collection, 0);
assert_eq!(header.id, changes.documents[0].id.deserialize()?);
assert!(!changes.documents[0].deleted);
}

db.collection::<Basic>().delete(&doc).await?;
Expand All @@ -706,14 +708,14 @@ pub async fn store_retrieve_update_delete_tests<C: Connection>(db: &C) -> anyhow
.await?;
assert_eq!(transactions.len(), 1);
let transaction = transactions.first().unwrap();
let changed_documents = transaction
let changes = transaction
.changes
.documents()
.expect("incorrect transaction type");
assert_eq!(changed_documents.len(), 1);
assert_eq!(changed_documents[0].collection, Basic::collection_name());
assert_eq!(header.id, changed_documents[0].id.deserialize()?);
assert!(changed_documents[0].deleted);
assert_eq!(changes.documents.len(), 1);
assert_eq!(changes.collections[0], Basic::collection_name());
assert_eq!(header.id, changes.documents[0].id.deserialize()?);
assert!(changes.documents[0].deleted);

// Use the Collection interface
let mut doc = original_value.clone().push_into(db).await?;
Expand Down
169 changes: 163 additions & 6 deletions crates/bonsaidb-core/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ pub struct Executed {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum Changes {
/// A list of changed documents.
Documents(Vec<ChangedDocument>),
Documents(DocumentChanges),
/// A list of changed keys.
Keys(Vec<ChangedKey>),
}
Expand All @@ -278,9 +278,9 @@ impl Changes {
/// Returns the list of documents changed in this transaction, or None if
/// the transaction was not a document transaction.
#[must_use]
pub fn documents(&self) -> Option<&[ChangedDocument]> {
if let Self::Documents(docs) = self {
Some(docs)
pub const fn documents(&self) -> Option<&DocumentChanges> {
if let Self::Documents(changes) = self {
Some(changes)
} else {
None
}
Expand All @@ -298,11 +298,168 @@ impl Changes {
}
}

/// A list of changed documents.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DocumentChanges {
/// All of the collections changed.
pub collections: Vec<CollectionName>,
/// The individual document changes.
pub documents: Vec<ChangedDocument>,
}

impl DocumentChanges {
/// Returns the changed document and the name of the collection the change
/// happened to.
#[must_use]
pub fn get(&self, index: usize) -> Option<(&CollectionName, &ChangedDocument)> {
self.documents.get(index).and_then(|doc| {
self.collections
.get(usize::from(doc.collection))
.map(|collection| (collection, doc))
})
}

/// Returns the number of changes in this collection.
#[must_use]
pub fn len(&self) -> usize {
self.documents.len()
}

/// Returns true if there are no changes.
#[must_use]
pub fn is_empty(&self) -> bool {
self.documents.is_empty()
}

/// Returns an interator over all of the changed documents.
pub const fn iter(&self) -> DocumentChangesIter<'_> {
DocumentChangesIter {
changes: self,
index: Some(0),
}
}
}

/// An iterator over [`DocumentChanges`].
#[must_use]
pub struct DocumentChangesIter<'a> {
changes: &'a DocumentChanges,
index: Option<usize>,
}

impl<'a> Iterator for DocumentChangesIter<'a> {
type Item = (&'a CollectionName, &'a ChangedDocument);

fn next(&mut self) -> Option<Self::Item> {
self.index.and_then(|index| {
let result = self.changes.get(index);
if result.is_some() {
self.index = index.checked_add(1);
}
result
})
}
}

/// A draining iterator over [`ChangedDocument`]s.
#[must_use]
pub struct DocumentChangesIntoIter {
collections: Vec<CollectionName>,
documents: std::vec::IntoIter<ChangedDocument>,
}

impl Iterator for DocumentChangesIntoIter {
type Item = (CollectionName, ChangedDocument);

fn next(&mut self) -> Option<Self::Item> {
self.documents.next().and_then(|doc| {
self.collections
.get(usize::from(doc.collection))
.map(|collection| (collection.clone(), doc))
})
}
}

impl IntoIterator for DocumentChanges {
type Item = (CollectionName, ChangedDocument);

type IntoIter = DocumentChangesIntoIter;

fn into_iter(self) -> Self::IntoIter {
DocumentChangesIntoIter {
collections: self.collections,
documents: self.documents.into_iter(),
}
}
}

#[test]
fn document_changes_iter() {
let changes = DocumentChanges {
collections: vec![CollectionName::private("a"), CollectionName::private("b")],
documents: vec![
ChangedDocument {
collection: 0,
id: DocumentId::from_u64(0),
deleted: false,
},
ChangedDocument {
collection: 0,
id: DocumentId::from_u64(1),
deleted: false,
},
ChangedDocument {
collection: 1,
id: DocumentId::from_u64(2),
deleted: false,
},
ChangedDocument {
collection: 2,
id: DocumentId::from_u64(3),
deleted: false,
},
],
};

assert_eq!(changes.len(), 4);
assert!(!changes.is_empty());

let mut a_changes = 0;
let mut b_changes = 0;
let mut ids = Vec::new();
for (collection, document) in changes.iter() {
assert!(!ids.contains(&document.id));
ids.push(document.id);
match collection.name.as_ref() {
"a" => a_changes += 1,
"b" => b_changes += 1,
_ => unreachable!("invalid collection name {collection}"),
}
}
assert_eq!(a_changes, 2);
assert_eq!(b_changes, 1);

let mut a_changes = 0;
let mut b_changes = 0;
let mut ids = Vec::new();
for (collection, document) in changes {
assert!(!ids.contains(&document.id));
ids.push(document.id);
match collection.name.as_ref() {
"a" => a_changes += 1,
"b" => b_changes += 1,
_ => unreachable!("invalid collection name {collection}"),
}
}
assert_eq!(a_changes, 2);
assert_eq!(b_changes, 1);
}

/// A record of a changed document.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChangedDocument {
/// The id of the `Collection` of the changed `Document`.
pub collection: CollectionName,
/// The index of the `CollectionName` within the `collections` field of [`Changes::Documents`].
pub collection: u16,

/// The id of the changed `Document`.
pub id: DocumentId,
Expand Down
1 change: 1 addition & 0 deletions crates/bonsaidb-local/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ thiserror = "1"
tokio = { version = "=1.16.1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
pot = "1.0.0"
transmog-versions = "0.1.0"
bincode = "1.3"
flume = "0.10"
itertools = "0.10"
Expand Down
Loading

0 comments on commit 99c9b61

Please sign in to comment.