From 480ffc87d51b3fbbedc1c7415e5ec8ee8e4f4659 Mon Sep 17 00:00:00 2001 From: Jonathan Johnson Date: Sun, 15 May 2022 15:28:20 -0700 Subject: [PATCH] Exposing more B+Tree functionality This largely enables external Roots to be written. As part of khonsulabs/bonsaidb#250, I am moving the document map into the view entries tree, which requires using two roots like the Versioned root. The biggest limiting factor was a lot of unexported functionality and types. This set of changes also removes some of the &mut requirements for some of the closures. The last major change is adding the Value associated type to Root, which allows a tree to use something other than ArcBytes on its public API surface. The two built-in trees will continue to use ArcBytes, but the ViewEntries tree in BonsaiDb will be using a custom type to prevent extra deserialization. --- benchmarks/benches/nebari-bench.rs | 7 +- fuzz/fuzz_targets/compare_swap.rs | 4 +- nebari/Cargo.toml | 2 +- nebari/examples/embedded-indexes.rs | 9 +- nebari/examples/low-level.rs | 12 +- nebari/src/chunk_cache.rs | 4 + nebari/src/lib.rs | 4 +- nebari/src/roots.rs | 123 ++++---- nebari/src/tree/{btree_entry.rs => btree.rs} | 238 +++++++++------ nebari/src/tree/by_id.rs | 130 +++++--- nebari/src/tree/by_sequence.rs | 6 +- nebari/src/tree/interior.rs | 6 +- nebari/src/tree/key_entry.rs | 4 +- nebari/src/tree/mod.rs | 298 ++++++++++++------- nebari/src/tree/modify.rs | 9 +- nebari/src/tree/root.rs | 11 +- nebari/src/tree/unversioned.rs | 116 ++++---- nebari/src/tree/versioned.rs | 195 ++++++------ nebari/src/vault.rs | 1 + 19 files changed, 701 insertions(+), 478 deletions(-) rename nebari/src/tree/{btree_entry.rs => btree.rs} (88%) diff --git a/benchmarks/benches/nebari-bench.rs b/benchmarks/benches/nebari-bench.rs index e91fa0dc09..6ef94a7488 100644 --- a/benchmarks/benches/nebari-bench.rs +++ b/benchmarks/benches/nebari-bench.rs @@ -98,7 +98,7 @@ pub trait BenchConfig: Display { pub trait NebariBenchmark { const BACKEND: &'static str; - type Root: Root + Default; + type Root: Root> + Default; } pub struct VersionedBenchmark; @@ -120,7 +120,10 @@ use criterion::{ criterion_group, criterion_main, measurement::WallTime, BenchmarkGroup, BenchmarkId, Criterion, Throughput, }; -use nebari::tree::{Root, Unversioned, Versioned}; +use nebari::{ + tree::{Root, Unversioned, Versioned}, + ArcBytes, +}; fn all_benches(c: &mut Criterion) { blobs::benches(c); diff --git a/fuzz/fuzz_targets/compare_swap.rs b/fuzz/fuzz_targets/compare_swap.rs index 1ddd19b1c0..7ad7a0868e 100644 --- a/fuzz/fuzz_targets/compare_swap.rs +++ b/fuzz/fuzz_targets/compare_swap.rs @@ -5,8 +5,8 @@ use libfuzzer_sys::fuzz_target; use nebari::{ io::{fs::StdFile, FileManager}, tree::{ - CompareSwap, KeyOperation, Modification, Operation, PersistenceMode, State, TreeFile, - Unversioned, + btree::KeyOperation, CompareSwap, Modification, Operation, PersistenceMode, State, + TreeFile, Unversioned, }, ArcBytes, Context, }; diff --git a/nebari/Cargo.toml b/nebari/Cargo.toml index 1e7204b5a4..cd3b9ec4ea 100644 --- a/nebari/Cargo.toml +++ b/nebari/Cargo.toml @@ -29,7 +29,7 @@ parking_lot = "0.12.0" tracing = { version = "0.1.30", optional = true } num_cpus = "1.13.1" backtrace = "0.3.64" -arc-bytes = "0.3.2" +arc-bytes = "0.3.5" [dev-dependencies] nanorand = "0.7.0" diff --git a/nebari/examples/embedded-indexes.rs b/nebari/examples/embedded-indexes.rs index 01d723c2ae..3e349024b9 100644 --- a/nebari/examples/embedded-indexes.rs +++ b/nebari/examples/embedded-indexes.rs @@ -4,9 +4,10 @@ use byteorder::BigEndian; use nanorand::{Pcg64, Rng}; use nebari::{ tree::{ - EmbeddedIndex, Indexer, Reducer, Root, ScanEvaluation, Serializable, VersionedTreeRoot, + btree::{Indexer, Reducer}, + EmbeddedIndex, Root, ScanEvaluation, Serializable, VersionedTreeRoot, }, - Error, + ArcBytes, Error, }; fn main() -> Result<(), Error> { @@ -55,7 +56,7 @@ fn main() -> Result<(), Error> { #[derive(Clone, Debug)] pub struct Zeroes(pub u32); -impl EmbeddedIndex for Zeroes { +impl EmbeddedIndex> for Zeroes { type Reduced = Self; type Indexer = ZeroesIndexer; } @@ -63,7 +64,7 @@ impl EmbeddedIndex for Zeroes { #[derive(Default, Clone, Debug)] pub struct ZeroesIndexer; -impl Indexer for ZeroesIndexer { +impl Indexer, Zeroes> for ZeroesIndexer { fn index( &self, _key: &nebari::ArcBytes<'_>, diff --git a/nebari/examples/low-level.rs b/nebari/examples/low-level.rs index 531b37618f..9e66c83639 100644 --- a/nebari/examples/low-level.rs +++ b/nebari/examples/low-level.rs @@ -48,8 +48,8 @@ fn main() -> Result<(), Error> { .., true, false, - &mut |_key| ScanEvaluation::ReadData, - &mut |key, data| { + |_key| ScanEvaluation::ReadData, + |key, data| { println!( "Key {:?} contained {:?} at sequence {:?}. Previous sequence: {:?}", key.key, data, key.sequence, key.last_sequence @@ -61,7 +61,7 @@ fn main() -> Result<(), Error> { Ok(()) } -fn tree_basics( +fn tree_basics>, File: ManagedFile>( tree: &mut TreeFile, ) -> Result<(), Error> { // Insert a few key-value pairs. @@ -76,9 +76,9 @@ fn tree_basics( &(..), true, false, - &mut |_, _, _| ScanEvaluation::ReadData, - &mut |_key, _index| ScanEvaluation::ReadData, - &mut |key, _index, value| { + |_, _, _| ScanEvaluation::ReadData, + |_key, _index| ScanEvaluation::ReadData, + |key, _index, value| { println!("Found key {:?} with value {:?}", key, value); Ok(()) }, diff --git a/nebari/src/chunk_cache.rs b/nebari/src/chunk_cache.rs index e6393240c6..8401b8b98a 100644 --- a/nebari/src/chunk_cache.rs +++ b/nebari/src/chunk_cache.rs @@ -88,8 +88,12 @@ impl ChunkCache { } } +/// A cached chunk of data that has possibly been decoded already. #[derive(Clone)] pub enum CacheEntry { + /// A buffer of bytes that has been cached. ArcBytes(ArcBytes<'static>), + /// A previously decoded value that was stored using + /// [`ChunkCache::replace_with_decoded()`]. Decoded(Arc), } diff --git a/nebari/src/lib.rs b/nebari/src/lib.rs index 58bde8fc39..289099267f 100644 --- a/nebari/src/lib.rs +++ b/nebari/src/lib.rs @@ -32,12 +32,12 @@ mod test_util; pub use arc_bytes::ArcBytes; pub use self::{ - chunk_cache::ChunkCache, + chunk_cache::{CacheEntry, ChunkCache}, context::Context, error::{Error, ErrorKind}, roots::{ AbortError, CompareAndSwapError, Config, ExecutingTransaction, LockedTransactionTree, Roots, ThreadPool, TransactionTree, Tree, UnlockedTransactionTree, }, - vault::Vault, + vault::{AnyVault, Vault}, }; diff --git a/nebari/src/roots.rs b/nebari/src/roots.rs index 9bb4df4ae0..8a07937e90 100644 --- a/nebari/src/roots.rs +++ b/nebari/src/roots.rs @@ -28,7 +28,7 @@ use crate::{ state::AnyTreeState, EmbeddedIndex, KeySequence, Modification, ModificationResult, Operation, PersistenceMode, ScanEvaluation, SequenceEntry, SequenceId, SequenceIndex, State, TransactableCompaction, - TreeFile, TreeRoot, VersionedTreeRoot, + TreeEntry, TreeFile, TreeRoot, TreeValueIndex, VersionedTreeRoot, }, vault::AnyVault, ArcBytes, ChunkCache, ErrorKind, @@ -421,7 +421,7 @@ impl AnyTransactionTree for Transacti impl TransactionTree, File> where - Index: Clone + EmbeddedIndex + Debug + 'static, + Index: Clone + EmbeddedIndex> + Debug + 'static, { /// Returns the latest sequence id. #[must_use] @@ -503,7 +503,7 @@ impl TransactionTree { pub fn set( &mut self, key: impl Into>, - value: impl Into>, + value: impl Into, ) -> Result { self.tree.set( PersistenceMode::Transactional(self.transaction_id), @@ -516,7 +516,7 @@ impl TransactionTree { pub fn modify<'a>( &mut self, keys: Vec>, - operation: Operation<'a, ArcBytes<'static>, Root::Index>, + operation: Operation<'a, Root::Value, Root::Index>, ) -> Result>, Error> { self.tree.modify(Modification { keys, @@ -532,14 +532,14 @@ impl TransactionTree { pub fn replace( &mut self, key: impl Into>, - value: impl Into>, - ) -> Result<(Option>, Root::Index), Error> { + value: impl Into, + ) -> Result<(Option, Root::Index), Error> { self.tree.replace(key, value, self.transaction_id) } /// Returns the current value of `key`. This will return updated information /// if it has been previously updated within this transaction. - pub fn get(&mut self, key: &[u8]) -> Result>, Error> { + pub fn get(&mut self, key: &[u8]) -> Result, Error> { self.tree.get(key, true) } @@ -551,30 +551,28 @@ impl TransactionTree { /// Returns the current value and index of `key`. This will return updated /// information if it has been previously updated within this transaction. - pub fn get_with_index( - &mut self, - key: &[u8], - ) -> Result, Root::Index)>, Error> { + pub fn get_with_index(&mut self, key: &[u8]) -> Result>, Error> { self.tree.get_with_index(key, true) } /// Removes `key` and returns the existing value amd index, if present. - pub fn remove( - &mut self, - key: &[u8], - ) -> Result, Root::Index)>, Error> { + pub fn remove(&mut self, key: &[u8]) -> Result>, Error> { self.tree.remove(key, self.transaction_id) } /// Compares the value of `key` against `old`. If the values match, key will /// be set to the new value if `new` is `Some` or removed if `new` is /// `None`. - pub fn compare_and_swap( + pub fn compare_and_swap( &mut self, key: &[u8], - old: Option<&[u8]>, - new: Option>, - ) -> Result<(), CompareAndSwapError> { + old: Option<&Old>, + new: Option, + ) -> Result<(), CompareAndSwapError> + where + Old: PartialEq, + Root::Value: AsRef + Clone, + { self.tree .compare_and_swap(key, old, new, self.transaction_id) } @@ -584,7 +582,7 @@ impl TransactionTree { pub fn get_multiple<'keys, KeysIntoIter, KeysIter>( &mut self, keys: KeysIntoIter, - ) -> Result, ArcBytes<'static>)>, Error> + ) -> Result, Root::Value)>, Error> where KeysIntoIter: IntoIterator, KeysIter: Iterator + ExactSizeIterator, @@ -611,7 +609,7 @@ impl TransactionTree { pub fn get_multiple_with_indexes<'keys, KeysIntoIter, KeysIter>( &mut self, keys: KeysIntoIter, - ) -> Result, ArcBytes<'static>, Root::Index)>, Error> + ) -> Result>, Error> where KeysIntoIter: IntoIterator, KeysIter: Iterator + ExactSizeIterator, @@ -623,7 +621,7 @@ impl TransactionTree { pub fn get_range<'keys, KeyRangeBounds>( &mut self, range: &'keys KeyRangeBounds, - ) -> Result, ArcBytes<'static>)>, Error> + ) -> Result, Root::Value)>, Error> where KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized, { @@ -645,7 +643,7 @@ impl TransactionTree { pub fn get_range_with_indexes<'keys, KeyRangeBounds>( &mut self, range: &'keys KeyRangeBounds, - ) -> Result, ArcBytes<'static>, Root::Index)>, Error> + ) -> Result>, Error> where KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized, { @@ -692,7 +690,7 @@ impl TransactionTree { DataCallback: FnMut( ArcBytes<'static>, &Root::Index, - ArcBytes<'static>, + Root::Value, ) -> Result<(), AbortError>, CallerError: Display + Debug, { @@ -735,7 +733,7 @@ impl TransactionTree { /// Returns the first key and value of the tree. #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))] - pub fn first(&mut self) -> Result, ArcBytes<'static>)>, Error> { + pub fn first(&mut self) -> Result, Root::Value)>, Error> { self.tree.first(true) } @@ -747,17 +745,17 @@ impl TransactionTree { /// Returns the last key and value of the tree. #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))] - pub fn last(&mut self) -> Result, ArcBytes<'static>)>, Error> { + pub fn last(&mut self) -> Result, Root::Value)>, Error> { self.tree.last(true) } } /// An error returned from `compare_and_swap()`. #[derive(Debug, thiserror::Error)] -pub enum CompareAndSwapError { +pub enum CompareAndSwapError { /// The stored value did not match the conditional value. #[error("value did not match. existing value: {0:?}")] - Conflict(Option>), + Conflict(Option), /// Another error occurred while executing the operation. #[error("error during compare_and_swap: {0}")] Error(#[from] Error), @@ -907,7 +905,7 @@ impl Tree { pub fn set( &self, key: impl Into>, - value: impl Into>, + value: impl Into, ) -> Result<(), Error> { let transaction = self.begin_transaction()?; transaction.tree::(0).unwrap().set(key, value)?; @@ -945,7 +943,7 @@ impl Tree { /// Retrieves the current value of `key`, if present. Does not reflect any /// changes in pending transactions. - pub fn get(&self, key: &[u8]) -> Result>, Error> { + pub fn get(&self, key: &[u8]) -> Result, Error> { catch_compaction_and_retry(|| { let mut tree = match self.open_for_read() { Ok(tree) => tree, @@ -973,10 +971,7 @@ impl Tree { /// Retrieves the current value and index of `key`, if present. Does not reflect any /// changes in pending transactions. - pub fn get_with_index( - &self, - key: &[u8], - ) -> Result, Root::Index)>, Error> { + pub fn get_with_index(&self, key: &[u8]) -> Result>, Error> { catch_compaction_and_retry(|| { let mut tree = match self.open_for_read() { Ok(tree) => tree, @@ -996,8 +991,8 @@ impl Tree { pub fn replace( &mut self, key: impl Into>, - value: impl Into>, - ) -> Result<(Option>, Root::Index), Error> { + value: impl Into, + ) -> Result<(Option, Root::Index), Error> { let transaction = self.begin_transaction()?; let existing_value = transaction.tree::(0).unwrap().replace(key, value)?; transaction.commit()?; @@ -1009,7 +1004,7 @@ impl Tree { pub fn modify<'a>( &mut self, keys: Vec>, - operation: Operation<'a, ArcBytes<'static>, Root::Index>, + operation: Operation<'a, Root::Value, Root::Index>, ) -> Result>, Error> { let transaction = self.begin_transaction()?; let results = transaction @@ -1023,7 +1018,7 @@ impl Tree { /// Removes `key` and returns the existing value and index, if present. This /// is executed within its own transaction. #[allow(clippy::missing_panics_doc)] - pub fn remove(&self, key: &[u8]) -> Result, Root::Index)>, Error> { + pub fn remove(&self, key: &[u8]) -> Result>, Error> { let transaction = self.begin_transaction()?; let existing_value = transaction.tree::(0).unwrap().remove(key)?; transaction.commit()?; @@ -1034,12 +1029,16 @@ impl Tree { /// be set to the new value if `new` is `Some` or removed if `new` is /// `None`. This is executed within its own transaction. #[allow(clippy::missing_panics_doc)] - pub fn compare_and_swap( + pub fn compare_and_swap( &self, key: &[u8], - old: Option<&[u8]>, - new: Option>, - ) -> Result<(), CompareAndSwapError> { + old: Option<&Old>, + new: Option, + ) -> Result<(), CompareAndSwapError> + where + Old: PartialEq, + Root::Value: AsRef + Clone, + { let transaction = self.begin_transaction()?; transaction .tree::(0) @@ -1055,7 +1054,7 @@ impl Tree { pub fn get_multiple<'keys, Keys>( &self, keys: Keys, - ) -> Result, ArcBytes<'static>)>, Error> + ) -> Result, Root::Value)>, Error> where Keys: Iterator + ExactSizeIterator + Clone, { @@ -1099,7 +1098,7 @@ impl Tree { pub fn get_multiple_with_indexes<'keys, KeysIntoIter, KeysIter>( &self, keys: KeysIntoIter, - ) -> Result, ArcBytes<'static>, Root::Index)>, Error> + ) -> Result>, Error> where KeysIntoIter: IntoIterator + Clone, KeysIter: Iterator + ExactSizeIterator, @@ -1119,7 +1118,7 @@ impl Tree { pub fn get_range<'keys, KeyRangeBounds>( &self, range: &'keys KeyRangeBounds, - ) -> Result, ArcBytes<'static>)>, Error> + ) -> Result, Root::Value)>, Error> where KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + Clone + ?Sized, { @@ -1157,7 +1156,7 @@ impl Tree { pub fn get_range_with_indexes<'keys, KeyRangeBounds>( &self, range: &'keys KeyRangeBounds, - ) -> Result, ArcBytes<'static>, Root::Index)>, Error> + ) -> Result>, Error> where KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized, { @@ -1211,7 +1210,7 @@ impl Tree { DataCallback: FnMut( ArcBytes<'static>, &Root::Index, - ArcBytes<'static>, + Root::Value, ) -> Result<(), AbortError>, CallerError: Display + Debug, { @@ -1278,7 +1277,7 @@ impl Tree { /// Returns the first key and value of the tree. #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))] - pub fn first(&self) -> Result, ArcBytes<'static>)>, Error> { + pub fn first(&self) -> Result, Root::Value)>, Error> { catch_compaction_and_retry(|| { let mut tree = match self.open_for_read() { Ok(tree) => tree, @@ -1306,7 +1305,7 @@ impl Tree { /// Returns the last key and value of the tree. #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))] - pub fn last(&self) -> Result, ArcBytes<'static>)>, Error> { + pub fn last(&self) -> Result, Root::Value)>, Error> { catch_compaction_and_retry(|| { let mut tree = match self.open_for_read() { Ok(tree) => tree, @@ -1393,7 +1392,7 @@ impl AnyTreeRoot for Tree impl Tree, File> where - Index: EmbeddedIndex + Clone + Debug + 'static, + Index: EmbeddedIndex> + Clone + Debug + 'static, { /// Returns the latest sequence id. #[must_use] @@ -1413,14 +1412,14 @@ where &self, range: Range, forwards: bool, - key_evaluator: &mut KeyEvaluator, - data_callback: &mut DataCallback, + key_evaluator: KeyEvaluator, + data_callback: DataCallback, ) -> Result<(), AbortError> where Range: Clone + RangeBounds + Debug + 'static, - KeyEvaluator: FnMut(KeySequence) -> ScanEvaluation, - DataCallback: - FnMut(KeySequence, ArcBytes<'static>) -> Result<(), AbortError>, + KeyEvaluator: FnMut(KeySequence) -> ScanEvaluation + Clone, + DataCallback: FnMut(KeySequence, ArcBytes<'static>) -> Result<(), AbortError> + + Clone, CallerError: Display + Debug, { catch_compaction_and_retry_abortable(|| { @@ -1431,7 +1430,13 @@ where Some(self.roots.transactions()), )?; - tree.scan_sequences(range.clone(), forwards, false, key_evaluator, data_callback) + tree.scan_sequences( + range.clone(), + forwards, + false, + key_evaluator.clone(), + data_callback.clone(), + ) }) } @@ -1700,7 +1705,7 @@ mod tests { use crate::{ io::{any::AnyFileManager, fs::StdFileManager, memory::MemoryFileManager}, test_util::RotatorVault, - tree::{Root, Unversioned, Versioned}, + tree::{Root, Unversioned, ValueIndex, Versioned}, }; fn basic_get_set(file_manager: M) { @@ -1839,7 +1844,7 @@ mod tests { compact_test::(AnyFileManager::memory()); } - fn compact_test(file_manager: M) + fn compact_test>, M: FileManager>(file_manager: M) where R::Reducer: Default, { @@ -1863,7 +1868,7 @@ mod tests { let absolute_id = (worker * OPERATION_COUNT + relative_id) as u64; tree.set(absolute_id.to_be_bytes(), absolute_id.to_be_bytes()) .unwrap(); - let (value, _) = tree + let ValueIndex { value, .. } = tree .remove(&absolute_id.to_be_bytes()) .unwrap() .ok_or_else(|| panic!("value not found: {:?}", absolute_id)) diff --git a/nebari/src/tree/btree_entry.rs b/nebari/src/tree/btree.rs similarity index 88% rename from nebari/src/tree/btree_entry.rs rename to nebari/src/tree/btree.rs index 673ea78188..54a3372973 100644 --- a/nebari/src/tree/btree_entry.rs +++ b/nebari/src/tree/btree.rs @@ -13,14 +13,13 @@ use super::{ key_entry::KeyEntry, modify::{Modification, Operation}, serialization::BinarySerialization, - versioned::ChangeResult, - KeyRange, PagedWriter, + ChangeResult, KeyRange, PagedWriter, }; use crate::{ chunk_cache::CacheEntry, error::Error, io::File, - tree::{key_entry::ValueIndex, read_chunk, versioned::Children, ScanEvaluation}, + tree::{key_entry::PositionIndex, read_chunk, versioned::Children, ScanEvaluation}, vault::AnyVault, AbortError, ArcBytes, ChunkCache, ErrorKind, }; @@ -90,9 +89,9 @@ pub trait Reducer: Debug + Clone + Send + Sync { } /// Creates an `Index` from a key and value. -pub trait Indexer: Debug + Send + Sync { +pub trait Indexer: Debug + Send + Sync { /// Index the key and value. - fn index(&self, key: &ArcBytes<'_>, value: Option<&ArcBytes<'static>>) -> Index; + fn index(&self, key: &ArcBytes<'_>, value: Option<&Value>) -> Index; } impl Reducer for () { @@ -114,31 +113,73 @@ impl Reducer for () { } } -pub struct ModificationContext< - IndexedType, - Index, - ReducedIndex, - Context, - Indexer, - Loader, - IndexReducer, -> where +/// A context for a modification of a [`BTreeNode`]. +pub struct ModificationContext +where Indexer: FnMut( &ArcBytes<'_>, - Option<&IndexedType>, + Option<&Value>, Option<&Index>, - &mut Context, &mut PagedWriter<'_>, ) -> Result, Error>, IndexReducer: Reducer, - Loader: FnMut(&Index, &mut PagedWriter<'_>) -> Result, Error>, + Loader: FnMut(&Index, &mut PagedWriter<'_>) -> Result, Error>, { + /// The maximum number of children allowed per node. pub current_order: usize, + /// The minimum number of children allowed per node. pub minimum_children: usize, + /// The indexing function that produces `Index`es for a given `Value`. + /// + /// The parameters to this function are: + /// + /// - The entry's key + /// - The entry's value, if the value is present. + /// - The existing `Index`, if the value was previously present. + /// - The writer used to read and write chunks to the file. pub indexer: Indexer, + /// A function that is called to load a `Value` from an `Index`. + /// + /// The parameters to this function are: + /// + /// - The index to load a value for. + /// - The paged writer that can be used to read chunks from the file. pub loader: Loader, + /// A [`Reducer`] that reduces many `Index`es or many `ReducedIndex`es into + /// a single `ReducedIndex` value. pub reducer: IndexReducer, - pub _phantom: PhantomData<(IndexedType, Index, ReducedIndex, Context)>, + _phantom: PhantomData<(Value, Index, ReducedIndex)>, +} + +impl + ModificationContext +where + Indexer: FnMut( + &ArcBytes<'_>, + Option<&Value>, + Option<&Index>, + &mut PagedWriter<'_>, + ) -> Result, Error>, + IndexReducer: Reducer, + Loader: FnMut(&Index, &mut PagedWriter<'_>) -> Result, Error>, +{ + /// Returns a new context. + pub fn new( + current_order: usize, + minimum_children: usize, + indexer: Indexer, + loader: Loader, + reducer: IndexReducer, + ) -> Self { + Self { + current_order, + minimum_children, + indexer, + loader, + reducer, + _phantom: PhantomData, + } + } } #[cfg(any(debug_assertions, feature = "paranoid"))] @@ -159,23 +200,29 @@ macro_rules! assert_children_order { impl BTreeEntry where - Index: ValueIndex + Clone + BinarySerialization + Debug + 'static, + Index: PositionIndex + Clone + BinarySerialization + Debug + 'static, ReducedIndex: Clone + BinarySerialization + Debug + 'static, { - pub(crate) fn modify( + /// Modifies this entry. This function may not apply all of `modification`, + /// as the entry may enter a state in which the parent of this entry needs + /// to act upon. For example, if this node grows too large, the modification + /// operation will return [`ChangeResult::Split`]. + /// + /// When each operation is performed, it is removed from `modification`. To + /// ensure all modifications are executed, call this functino repeatedly + /// until `modification` has no keys remaining. + pub fn modify( &mut self, modification: &mut Modification<'_, IndexedType, Index>, context: &mut ModificationContext< IndexedType, Index, ReducedIndex, - Context, Indexer, Loader, IndexReducer, >, max_key: Option<&ArcBytes<'_>>, - changes: &mut Context, writer: &mut PagedWriter<'_>, ) -> Result where @@ -183,7 +230,6 @@ where &ArcBytes<'_>, Option<&IndexedType>, Option<&Index>, - &mut Context, &mut PagedWriter<'_>, ) -> Result, Error>, IndexReducer: Reducer, @@ -191,7 +237,7 @@ where { match &mut self.node { BTreeNode::Leaf(children) => { - if Self::modify_leaf(children, modification, context, max_key, changes, writer)? { + if Self::modify_leaf(children, modification, context, max_key, writer)? { self.dirty = true; Ok(Self::clean_up_leaf( children, @@ -203,14 +249,7 @@ where } } BTreeNode::Interior(children) => { - match Self::modify_interior( - children, - modification, - context, - max_key, - changes, - writer, - )? { + match Self::modify_interior(children, modification, context, max_key, writer)? { ChangeResult::Changed => { self.dirty = true; Ok(Self::clean_up_interior( @@ -265,20 +304,18 @@ where } #[allow(clippy::too_many_lines)] // TODO refactor, too many lines - fn modify_leaf( + fn modify_leaf( children: &mut Vec>, modification: &mut Modification<'_, IndexedType, Index>, context: &mut ModificationContext< IndexedType, Index, ReducedIndex, - Context, Indexer, Loader, IndexReducer, >, max_key: Option<&ArcBytes<'_>>, - changes: &mut Context, writer: &mut PagedWriter<'_>, ) -> Result where @@ -286,7 +323,6 @@ where &ArcBytes<'_>, Option<&IndexedType>, Option<&Index>, - &mut Context, &mut PagedWriter<'_>, ) -> Result, Error>, IndexReducer: Reducer, @@ -311,7 +347,6 @@ where &key, Some(value), Some(&children[last_index].index), - changes, writer, )?, Operation::SetEach(values) => (context.indexer)( @@ -320,14 +355,12 @@ where ErrorKind::message("need the same number of keys as values") })?), Some(&children[last_index].index), - changes, writer, )?, Operation::Remove => (context.indexer)( &key, None, Some(&children[last_index].index), - changes, writer, )?, Operation::CompareSwap(callback) => { @@ -339,16 +372,11 @@ where &key, Some(&new_value), Some(current_index), - changes, - writer, - )?, - KeyOperation::Remove => (context.indexer)( - &key, - None, - Some(current_index), - changes, writer, )?, + KeyOperation::Remove => { + (context.indexer)(&key, None, Some(current_index), writer)? + } } } }; @@ -373,7 +401,7 @@ where let key = modification.keys.pop().unwrap(); let operation = match &mut modification.operation { Operation::Set(new_value) => { - (context.indexer)(&key, Some(new_value), None, changes, writer)? + (context.indexer)(&key, Some(new_value), None, writer)? } Operation::SetEach(new_values) => (context.indexer)( &key, @@ -381,7 +409,6 @@ where ErrorKind::message("need the same number of keys as values") })?), None, - changes, writer, )?, Operation::Remove => { @@ -391,11 +418,9 @@ where Operation::CompareSwap(callback) => match callback(&key, None, None) { KeyOperation::Skip => KeyOperation::Skip, KeyOperation::Set(new_value) => { - (context.indexer)(&key, Some(&new_value), None, changes, writer)? - } - KeyOperation::Remove => { - (context.indexer)(&key, None, None, changes, writer)? + (context.indexer)(&key, Some(&new_value), None, writer)? } + KeyOperation::Remove => (context.indexer)(&key, None, None, writer)?, }, }; match operation { @@ -424,20 +449,18 @@ where Ok(any_changes) } - fn modify_interior( + fn modify_interior( children: &mut Vec>, modification: &mut Modification<'_, IndexedType, Index>, context: &mut ModificationContext< IndexedType, Index, ReducedIndex, - Context, Indexer, Loader, IndexReducer, >, max_key: Option<&ArcBytes<'_>>, - changes: &mut Context, writer: &mut PagedWriter<'_>, ) -> Result where @@ -445,7 +468,6 @@ where &ArcBytes<'_>, Option<&IndexedType>, Option<&Index>, - &mut Context, &mut PagedWriter<'_>, ) -> Result, Error>, IndexReducer: Reducer, @@ -498,7 +520,6 @@ where modification, context, Some(&max_key_for_modification), - changes, writer, )?, last_index, @@ -529,7 +550,7 @@ where }) } - fn process_interior_change_result( + fn process_interior_change_result( result: ChangeResult, child_index: usize, children: &mut Vec>, @@ -537,7 +558,6 @@ where IndexedType, Index, ReducedIndex, - Context, Indexer, Loader, IndexReducer, @@ -549,7 +569,6 @@ where &ArcBytes<'_>, Option<&IndexedType>, Option<&Index>, - &mut Context, &mut PagedWriter<'_>, ) -> Result, Error>, IndexReducer: Reducer, @@ -579,14 +598,13 @@ where } } - fn process_absorb( + fn process_absorb( child_index: usize, children: &mut Vec>, context: &ModificationContext< IndexedType, Index, ReducedIndex, - Context, Indexer, Loader, IndexReducer, @@ -598,7 +616,6 @@ where &ArcBytes<'_>, Option<&IndexedType>, Option<&Index>, - &mut Context, &mut PagedWriter<'_>, ) -> Result, Error>, IndexReducer: Reducer, @@ -653,14 +670,13 @@ where } } - fn process_interior_split( + fn process_interior_split( child_index: usize, children: &mut Vec>, context: &ModificationContext< IndexedType, Index, ReducedIndex, - Context, Indexer, Loader, IndexReducer, @@ -672,7 +688,6 @@ where &ArcBytes<'_>, Option<&IndexedType>, Option<&Index>, - &mut Context, &mut PagedWriter<'_>, ) -> Result, Error>, IndexReducer: Reducer, @@ -732,14 +747,13 @@ where Ok((ChangeResult::Changed, should_backup)) } - fn steal_children_from_start( + fn steal_children_from_start( child_index: usize, children: &mut [Interior], context: &ModificationContext< IndexedType, Index, ReducedIndex, - Context, Indexer, Loader, IndexReducer, @@ -751,7 +765,6 @@ where &ArcBytes<'_>, Option<&IndexedType>, Option<&Index>, - &mut Context, &mut PagedWriter<'_>, ) -> Result, Error>, IndexReducer: Reducer, @@ -834,14 +847,13 @@ where Ok((ChangeResult::Unchanged, should_backup)) } - fn steal_children_from_end( + fn steal_children_from_end( child_index: usize, children: &mut [Interior], context: &ModificationContext< IndexedType, Index, ReducedIndex, - Context, Indexer, Loader, IndexReducer, @@ -853,7 +865,6 @@ where &ArcBytes<'_>, Option<&IndexedType>, Option<&Index>, - &mut Context, &mut PagedWriter<'_>, ) -> Result, Error>, IndexReducer: Reducer, @@ -1041,7 +1052,11 @@ where (Interior::new(lower, reducer), Interior::new(upper, reducer)) } - pub(crate) fn split_root(&mut self, reducer: &R) + /// Replaces this node with a [`BTreeNode::Interior`] containing two + /// children nodes that were created by splitting the children of this node. + /// + /// This function should only be called on a root node. + pub fn split_root(&mut self, reducer: &R) where R: Reducer, { @@ -1070,11 +1085,23 @@ where } } + /// Scans this entry looking for keys in `range`. + /// + /// For each interior node that may have keys in `range`, + /// [`ScanArgs::node_evaluator`] is invoked. This allows fine-grained + /// control over skipping entire branches of a tree or aborting a scan + /// early. If [`ScanEvaluation::ReadData`] is returned, the interior node's + /// children will be scanned. + /// + /// Once a leaf node is encountered, each key is checked against `range` and + /// [`ScanArgs::key_evaluator`] is invoked. If [`ScanEvaluation::ReadData`] + /// is returned, the stored value for the entry will be loaded and + /// [`ScanArgs::data_callback`] will be invoked. #[cfg_attr( feature = "tracing", tracing::instrument(skip(self, args, file, vault, cache)) )] - pub(crate) fn scan< + pub fn scan< 'k, 'keys, CallerError: Display + Debug, @@ -1086,6 +1113,7 @@ where &self, range: &'keys KeyRangeBounds, args: &mut ScanArgs< + ArcBytes<'static>, Index, ReducedIndex, CallerError, @@ -1205,11 +1233,19 @@ where Ok(true) } - pub(crate) fn get_multiple( + /// Read the values stored for one or more keys. + /// + /// `key_evaluator` is invoked once for each key found. If + /// [`ScanEvaluation::ReadData`] is returned, the key's value will be loaded + /// and `key_reader` will be invoked with the result. + /// + /// The `key_reader` function is not invoked immediately in an effort to + /// optimize the order of reads from the disk. + pub fn get_multiple( &self, keys: &mut Keys, - key_evaluator: &mut KeyEvaluator, - key_reader: &mut KeyReader, + mut key_evaluator: KeyEvaluator, + mut key_reader: KeyReader, file: &mut dyn File, vault: Option<&dyn AnyVault>, cache: Option<&ChunkCache>, @@ -1254,11 +1290,19 @@ where Ok(()) } + /// Read the values stored within a range of keys. + /// + /// `key_evaluator` is invoked once for each key found. If + /// [`ScanEvaluation::ReadData`] is returned, the key's value will be loaded + /// and `key_reader` will be invoked with the result. + /// + /// The `key_reader` function is not invoked immediately in an effort to + /// optimize the order of reads from the disk. #[cfg_attr( feature = "tracing", tracing::instrument(skip(self, key_evaluator, keys, key_reader, file, vault, cache)) )] - pub(crate) fn get( + fn get( &self, keys: &mut KeyRange, key_evaluator: &mut KeyEvaluator, @@ -1345,8 +1389,11 @@ where Ok(true) } + /// Recursively copy all stored data from `file` to `writer`. + /// + /// Returns true if any data was copied. #[allow(clippy::too_many_arguments)] - pub(crate) fn copy_data_to( + pub fn copy_data_to( &mut self, include_nodes: NodeInclusion, file: &mut dyn File, @@ -1397,22 +1444,26 @@ where } } +/// Determines whether a data copy should include nodes or just data. #[derive(Clone, Debug, Copy, Eq, PartialEq)] pub enum NodeInclusion { + /// Excludes nodes from this copy. Exclude, + /// Excludes the current node, but includes all subsequent nodes. IncludeNext, + /// Includes all nodes. Include, } impl NodeInclusion { - pub const fn next(self) -> Self { + pub(crate) const fn next(self) -> Self { match self { Self::Exclude => Self::Exclude, Self::IncludeNext | Self::Include => Self::Include, } } - pub const fn should_include(self) -> bool { + pub(crate) const fn should_include(self) -> bool { matches!(self, Self::Include) } } @@ -1536,7 +1587,9 @@ impl<'a, I> Iterator for DirectionalSliceIterator<'a, I> { } } +/// Arguments for a tree scan operation. pub struct ScanArgs< + Value, Index, ReducedIndex, CallerError: Display + Debug, @@ -1546,30 +1599,39 @@ pub struct ScanArgs< > where NodeEvaluator: FnMut(&ArcBytes<'static>, &ReducedIndex, usize) -> ScanEvaluation, KeyEvaluator: FnMut(&ArcBytes<'static>, &Index) -> ScanEvaluation, - DataCallback: - FnMut(ArcBytes<'static>, &Index, ArcBytes<'static>) -> Result<(), AbortError>, + DataCallback: FnMut(ArcBytes<'static>, &Index, Value) -> Result<(), AbortError>, { + /// Controls the order of the scan. If true, the scan starts at the lowest + /// ordered key and scans forward. If false, the scan starts at the highest + /// ordered key and scans in reverse. pub forwards: bool, + /// A callback that is invoked for each interior node being considered in + /// the scan operation. pub node_evaluator: NodeEvaluator, + /// A callback that is invoked for each matching key found during the scan + /// operation. pub key_evaluator: KeyEvaluator, + /// A callback that is invoked once data has been loaded for a key during + /// the scan operation. pub data_callback: DataCallback, - _phantom: PhantomData<(Index, ReducedIndex, CallerError)>, + _phantom: PhantomData<(Value, Index, ReducedIndex, CallerError)>, } impl< + Value, Index, ReducedIndex, CallerError: Display + Debug, NodeEvaluator, KeyEvaluator, DataCallback, - > ScanArgs + > ScanArgs where NodeEvaluator: FnMut(&ArcBytes<'static>, &ReducedIndex, usize) -> ScanEvaluation, KeyEvaluator: FnMut(&ArcBytes<'static>, &Index) -> ScanEvaluation, - DataCallback: - FnMut(ArcBytes<'static>, &Index, ArcBytes<'static>) -> Result<(), AbortError>, + DataCallback: FnMut(ArcBytes<'static>, &Index, Value) -> Result<(), AbortError>, { + /// Returns a new instance of the scan arguments. pub fn new( forwards: bool, node_evaluator: NodeEvaluator, diff --git a/nebari/src/tree/by_id.rs b/nebari/src/tree/by_id.rs index 9f2cfb8a63..02d70a9ca9 100644 --- a/nebari/src/tree/by_id.rs +++ b/nebari/src/tree/by_id.rs @@ -1,15 +1,17 @@ +use std::marker::PhantomData; + use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; -use super::{btree_entry::Reducer, BinarySerialization, PagedWriter}; +use super::{btree::Reducer, BinarySerialization, PagedWriter}; use crate::{ error::Error, - tree::{by_sequence::SequenceId, key_entry::ValueIndex}, + tree::{by_sequence::SequenceId, key_entry::PositionIndex}, ArcBytes, }; /// The index stored within [`VersionedTreeRoot::by_id_root`](crate::tree::VersionedTreeRoot::by_id_root). #[derive(Clone, Debug)] -pub struct VersionedByIdIndex { +pub struct VersionedByIdIndex, Value> { /// The unique sequence id generated when writing the value to the file. pub sequence_id: SequenceId, /// The size of the value stored on disk. @@ -18,11 +20,35 @@ pub struct VersionedByIdIndex { pub position: u64, /// The embedded index. pub embedded: EmbeddedIndex, + + _value: PhantomData, } -impl BinarySerialization for VersionedByIdIndex +impl VersionedByIdIndex where - EmbeddedIndex: super::EmbeddedIndex, + EmbeddedIndex: super::EmbeddedIndex, +{ + /// Retruns a new index instance. + pub fn new( + sequence_id: SequenceId, + value_length: u32, + position: u64, + embedded: EmbeddedIndex, + ) -> Self { + Self { + sequence_id, + value_length, + position, + embedded, + _value: PhantomData, + } + } +} + +impl BinarySerialization for VersionedByIdIndex +where + EmbeddedIndex: super::EmbeddedIndex, + Value: Send + Sync, { fn serialize_to( &mut self, @@ -42,16 +68,19 @@ where let sequence_id = SequenceId(reader.read_u64::()?); let value_length = reader.read_u32::()?; let position = reader.read_u64::()?; - Ok(Self { + Ok(Self::new( sequence_id, value_length, position, - embedded: EmbeddedIndex::deserialize_from(reader)?, - }) + EmbeddedIndex::deserialize_from(reader)?, + )) } } -impl ValueIndex for VersionedByIdIndex { +impl PositionIndex for VersionedByIdIndex +where + EmbeddedIndex: super::EmbeddedIndex, +{ fn position(&self) -> u64 { self.position } @@ -59,18 +88,36 @@ impl ValueIndex for VersionedByIdIndex { +pub struct UnversionedByIdIndex, Value> { /// The size of the value stored on disk. pub value_length: u32, /// The position of the value on disk. pub position: u64, /// The embedded index. pub embedded: EmbeddedIndex, + + _value: PhantomData, } -impl BinarySerialization for UnversionedByIdIndex +impl UnversionedByIdIndex where - EmbeddedIndex: super::EmbeddedIndex, + EmbeddedIndex: super::EmbeddedIndex, +{ + /// Retruns a new index instance. + pub fn new(value_length: u32, position: u64, embedded: EmbeddedIndex) -> Self { + Self { + value_length, + position, + embedded, + _value: PhantomData, + } + } +} + +impl BinarySerialization for UnversionedByIdIndex +where + EmbeddedIndex: super::EmbeddedIndex, + Value: Send + Sync, { fn serialize_to( &mut self, @@ -88,15 +135,18 @@ where ) -> Result { let value_length = reader.read_u32::()?; let position = reader.read_u64::()?; - Ok(Self { + Ok(Self::new( value_length, position, - embedded: EmbeddedIndex::deserialize_from(reader)?, - }) + EmbeddedIndex::deserialize_from(reader)?, + )) } } -impl ValueIndex for UnversionedByIdIndex { +impl PositionIndex for UnversionedByIdIndex +where + EmbeddedIndex: super::EmbeddedIndex, +{ fn position(&self) -> u64 { self.position } @@ -159,20 +209,24 @@ where #[derive(Clone, Default, Debug)] pub struct ByIdIndexer(pub EmbeddedIndexer); -impl - Reducer, ByIdStats> +impl + Reducer, ByIdStats> for ByIdIndexer where EmbeddedIndexer: Reducer, - EmbeddedIndex: super::EmbeddedIndex, + EmbeddedIndex: super::EmbeddedIndex, + Value: Send + Sync + 'static, { fn reduce<'a, Indexes, IndexesIter>(&self, indexes: Indexes) -> ByIdStats where EmbeddedIndex: 'a, - Indexes: IntoIterator, IntoIter = IndexesIter> - + ExactSizeIterator, - IndexesIter: - Iterator> + ExactSizeIterator + Clone, + Indexes: IntoIterator< + Item = &'a VersionedByIdIndex, + IntoIter = IndexesIter, + > + ExactSizeIterator, + IndexesIter: Iterator> + + ExactSizeIterator + + Clone, { self.reduce(indexes) } @@ -194,20 +248,24 @@ where } } -impl - Reducer, ByIdStats> +impl + Reducer, ByIdStats> for ByIdIndexer where EmbeddedIndexer: Reducer, - EmbeddedIndex: super::EmbeddedIndex, + EmbeddedIndex: super::EmbeddedIndex, + Value: 'static, { fn reduce<'a, Indexes, IndexesIter>(&self, indexes: Indexes) -> ByIdStats where EmbeddedIndex: 'a, - Indexes: IntoIterator, IntoIter = IndexesIter> - + ExactSizeIterator, - IndexesIter: - Iterator> + ExactSizeIterator + Clone, + Indexes: IntoIterator< + Item = &'a UnversionedByIdIndex, + IntoIter = IndexesIter, + > + ExactSizeIterator, + IndexesIter: Iterator> + + ExactSizeIterator + + Clone, { self.reduce(indexes) } @@ -229,14 +287,14 @@ where } impl ByIdIndexer { - fn reduce<'a, EmbeddedIndex, EmbeddedStats, Id, Indexes, IndexesIter>( + fn reduce<'a, EmbeddedIndex, EmbeddedStats, Id, Indexes, IndexesIter, Value>( &self, values: Indexes, ) -> ByIdStats where Id: IdIndex + 'a, EmbeddedIndex: - super::EmbeddedIndex + 'a, + super::EmbeddedIndex + 'a, Indexes: IntoIterator + ExactSizeIterator, IndexesIter: Iterator + ExactSizeIterator + Clone, EmbeddedIndexer: Reducer, @@ -300,9 +358,9 @@ pub trait IdIndex { fn embedded(&self) -> &EmbeddedIndex; } -impl IdIndex for UnversionedByIdIndex +impl IdIndex for UnversionedByIdIndex where - EmbeddedIndex: super::EmbeddedIndex, + EmbeddedIndex: super::EmbeddedIndex, { fn value_size(&self) -> u32 { self.value_length @@ -317,9 +375,9 @@ where } } -impl IdIndex for VersionedByIdIndex +impl IdIndex for VersionedByIdIndex where - EmbeddedIndex: super::EmbeddedIndex, + EmbeddedIndex: super::EmbeddedIndex, { fn value_size(&self) -> u32 { self.value_length diff --git a/nebari/src/tree/by_sequence.rs b/nebari/src/tree/by_sequence.rs index 2425cabdbb..207e89082c 100644 --- a/nebari/src/tree/by_sequence.rs +++ b/nebari/src/tree/by_sequence.rs @@ -4,7 +4,7 @@ use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use crate::{ error::Error, - tree::{btree_entry::Reducer, key_entry::ValueIndex, BinarySerialization, PagedWriter}, + tree::{btree::Reducer, key_entry::PositionIndex, BinarySerialization, PagedWriter}, ArcBytes, ErrorKind, }; @@ -62,7 +62,7 @@ pub struct BySequenceIndex { impl BinarySerialization for BySequenceIndex where - Embedded: super::EmbeddedIndex, + Embedded: super::EmbeddedIndex>, { fn serialize_to( &mut self, @@ -125,7 +125,7 @@ where } } -impl ValueIndex for BySequenceIndex { +impl PositionIndex for BySequenceIndex { fn position(&self) -> u64 { self.position } diff --git a/nebari/src/tree/interior.rs b/nebari/src/tree/interior.rs index 849371bd85..e16dbc3766 100644 --- a/nebari/src/tree/interior.rs +++ b/nebari/src/tree/interior.rs @@ -5,12 +5,12 @@ use std::{ use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; -use super::{btree_entry::BTreeEntry, read_chunk, BinarySerialization, PagedWriter}; +use super::{btree::BTreeEntry, read_chunk, BinarySerialization, PagedWriter}; use crate::{ chunk_cache::CacheEntry, error::Error, io::File, - tree::{btree_entry::NodeInclusion, key_entry::ValueIndex}, + tree::{btree::NodeInclusion, key_entry::PositionIndex}, vault::AnyVault, AbortError, ArcBytes, ChunkCache, ErrorKind, }; @@ -155,7 +155,7 @@ impl< } impl< - Index: Clone + ValueIndex + BinarySerialization + Debug + 'static, + Index: Clone + PositionIndex + BinarySerialization + Debug + 'static, ReducedIndex: Clone + BinarySerialization + Debug + 'static, > Interior { diff --git a/nebari/src/tree/key_entry.rs b/nebari/src/tree/key_entry.rs index adac240a0c..2d119ff5ab 100644 --- a/nebari/src/tree/key_entry.rs +++ b/nebari/src/tree/key_entry.rs @@ -15,12 +15,12 @@ pub struct KeyEntry { } /// An index that serializes a value to the file. -pub trait ValueIndex { +pub trait PositionIndex { /// The position on-disk of the stored value. fn position(&self) -> u64; } -impl KeyEntry { +impl KeyEntry { pub(crate) fn copy_data_to( &mut self, file: &mut dyn File, diff --git a/nebari/src/tree/mod.rs b/nebari/src/tree/mod.rs index 069e77ca39..606fbb4bab 100644 --- a/nebari/src/tree/mod.rs +++ b/nebari/src/tree/mod.rs @@ -70,12 +70,16 @@ use crate::{ io::{File, FileManager, FileOp, ManagedFile, ManagedFileOpener, OpenableFile, OperableFile}, roots::AbortError, transaction::{ManagedTransaction, TransactionManager}, - tree::{btree_entry::ScanArgs, serialization::BinarySerialization}, + tree::{ + btree::{BTreeNode, Indexer, KeyOperation, Reducer, ScanArgs}, + serialization::BinarySerialization, + }, vault::AnyVault, ArcBytes, ChunkCache, CompareAndSwapError, Context, ErrorKind, }; -mod btree_entry; +/// B+Tree types +pub mod btree; mod by_id; mod by_sequence; mod interior; @@ -90,11 +94,10 @@ mod versioned; pub(crate) const DEFAULT_MAX_ORDER: usize = 1000; pub use self::{ - btree_entry::{BTreeEntry, BTreeNode, Indexer, KeyOperation, Reducer}, by_id::{ByIdIndexer, ByIdStats, UnversionedByIdIndex, VersionedByIdIndex}, by_sequence::{BySequenceIndex, BySequenceStats, SequenceId}, interior::{Interior, Pointer}, - key_entry::{KeyEntry, ValueIndex}, + key_entry::{KeyEntry, PositionIndex}, modify::{CompareSwap, CompareSwapFn, Modification, Operation, PersistenceMode}, root::{AnyTreeRoot, Root, TreeRoot}, state::{ActiveState, State}, @@ -318,7 +321,7 @@ impl TreeFile { &mut self, persistence_mode: impl Into, key: impl Into>, - value: impl Into>, + value: impl Into, ) -> Result { Ok(self .file @@ -344,7 +347,7 @@ impl TreeFile { /// updated indexes, if the keys are still present. pub fn modify( &mut self, - modification: Modification<'_, ArcBytes<'static>, Root::Index>, + modification: Modification<'_, Root::Value, Root::Index>, ) -> Result>, Error> { self.file.execute(TreeModifier { state: &self.state, @@ -358,13 +361,17 @@ impl TreeFile { /// Compares the value of `key` against `old`. If the values match, key will /// be set to the new value if `new` is `Some` or removed if `new` is /// `None`. - pub fn compare_and_swap( + pub fn compare_and_swap( &mut self, key: &[u8], - old: Option<&[u8]>, - mut new: Option>, + old: Option<&Old>, + mut new: Option, persistence_mode: impl Into, - ) -> Result<(), CompareAndSwapError> { + ) -> Result<(), CompareAndSwapError> + where + Old: PartialEq + ?Sized, + Root::Value: AsRef + Clone, + { let mut result = Ok(()); self.modify(Modification { persistence_mode: persistence_mode.into(), @@ -372,17 +379,15 @@ impl TreeFile { operation: Operation::CompareSwap(CompareSwap::new(&mut |_key, _index, value: Option< - ArcBytes<'_>, + Root::Value, >| { - if value.as_deref() == old { + if old == value.as_ref().map(AsRef::as_ref) { match new.take() { - Some(new) => KeyOperation::Set(new.into_owned()), + Some(new) => KeyOperation::Set(new), None => KeyOperation::Remove, } } else { - result = Err(CompareAndSwapError::Conflict( - value.map(ArcBytes::into_owned), - )); + result = Err(CompareAndSwapError::Conflict(value)); KeyOperation::Skip } })), @@ -395,7 +400,7 @@ impl TreeFile { &mut self, key: &[u8], persistence_mode: impl Into, - ) -> Result, Root::Index)>, Error> { + ) -> Result>, Error> { let mut existing_value = None; self.modify(Modification { persistence_mode: persistence_mode.into(), @@ -403,7 +408,10 @@ impl TreeFile { operation: Operation::CompareSwap(CompareSwap::new( &mut |_key, index: Option<&Root::Index>, value| { existing_value = if let (Some(index), Some(value)) = (index, value) { - Some((value, index.clone())) + Some(ValueIndex { + value, + index: index.clone(), + }) } else { None }; @@ -422,9 +430,9 @@ impl TreeFile { pub fn replace( &mut self, key: impl Into>, - value: impl Into>, + value: impl Into, persistence_mode: impl Into, - ) -> Result<(Option>, Root::Index), Error> { + ) -> Result<(Option, Root::Index), Error> { let mut existing_value = None; let mut value = Some(value.into()); let result = self @@ -447,11 +455,7 @@ impl TreeFile { /// Gets the value stored for `key`. #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))] - pub fn get( - &mut self, - key: &[u8], - in_transaction: bool, - ) -> Result>, Error> { + pub fn get(&mut self, key: &[u8], in_transaction: bool) -> Result, Error> { let mut buffer = None; self.file.execute(TreeGetter { from_transaction: in_transaction, @@ -497,7 +501,7 @@ impl TreeFile { &mut self, key: &[u8], in_transaction: bool, - ) -> Result, Root::Index)>, Error> { + ) -> Result>, Error> { let mut buffer = None; let mut found_index = None; self.file.execute(TreeGetter { @@ -513,8 +517,8 @@ impl TreeFile { }, key_evaluator: |_, _| ScanEvaluation::ReadData, })?; - if let (Some(buffer), Some(index)) = (buffer, found_index) { - Ok(Some((buffer, index))) + if let (Some(value), Some(index)) = (buffer, found_index) { + Ok(Some(ValueIndex { value, index })) } else { Ok(None) } @@ -528,7 +532,7 @@ impl TreeFile { &mut self, keys: KeysIntoIter, in_transaction: bool, - ) -> Result, ArcBytes<'static>)>, Error> + ) -> Result, Root::Value)>, Error> where KeysIntoIter: IntoIterator, KeysIter: Iterator + ExactSizeIterator, @@ -588,7 +592,7 @@ impl TreeFile { &mut self, keys: KeysIntoIter, in_transaction: bool, - ) -> Result, ArcBytes<'static>, Root::Index)>, Error> + ) -> Result>, Error> where KeysIntoIter: IntoIterator, KeysIter: Iterator + ExactSizeIterator, @@ -602,7 +606,7 @@ impl TreeFile { cache: self.cache.as_ref(), keys: KeyRange::new(keys), key_reader: |key, value, index| { - buffers.push((key, value, index)); + buffers.push(Entry { key, value, index }); Ok(()) }, key_evaluator: |_, _| ScanEvaluation::ReadData, @@ -616,7 +620,7 @@ impl TreeFile { &mut self, range: &'keys KeyRangeBounds, in_transaction: bool, - ) -> Result, ArcBytes<'static>)>, Error> + ) -> Result, Root::Value)>, Error> where KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized, { @@ -625,9 +629,9 @@ impl TreeFile { range, true, in_transaction, - &mut |_, _, _| ScanEvaluation::ReadData, - &mut |_, _| ScanEvaluation::ReadData, - &mut |key, _index, value| { + |_, _, _| ScanEvaluation::ReadData, + |_, _| ScanEvaluation::ReadData, + |key, _index, value| { results.push((key, value)); Ok(()) }, @@ -650,12 +654,12 @@ impl TreeFile { range, true, in_transaction, - &mut |_, _, _| ScanEvaluation::ReadData, - &mut |key, index| { + |_, _, _| ScanEvaluation::ReadData, + |key, index| { results.push((key.clone(), index.clone())); ScanEvaluation::Skip }, - &mut |_key, _index, _value| unreachable!(), + |_key, _index, _value| unreachable!(), )?; Ok(results) } @@ -667,7 +671,7 @@ impl TreeFile { &mut self, range: &'keys KeyRangeBounds, in_transaction: bool, - ) -> Result, ArcBytes<'static>, Root::Index)>, Error> + ) -> Result>, Error> where KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized, { @@ -676,10 +680,14 @@ impl TreeFile { range, true, in_transaction, - &mut |_, _, _| ScanEvaluation::ReadData, - &mut |_, _| ScanEvaluation::ReadData, - &mut |key, index, value| { - results.push((key, value, index.clone())); + |_, _, _| ScanEvaluation::ReadData, + |_, _| ScanEvaluation::ReadData, + |key, index, value| { + results.push(Entry { + key, + value, + index: index.clone(), + }); Ok(()) }, )?; @@ -715,9 +723,9 @@ impl TreeFile { range: &'keys KeyRangeBounds, forwards: bool, in_transaction: bool, - node_evaluator: &mut NodeEvaluator, - key_evaluator: &mut KeyEvaluator, - key_reader: &mut DataCallback, + node_evaluator: NodeEvaluator, + key_evaluator: KeyEvaluator, + key_reader: DataCallback, ) -> Result<(), AbortError> where KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized, @@ -726,7 +734,7 @@ impl TreeFile { DataCallback: FnMut( ArcBytes<'static>, &Root::Index, - ArcBytes<'static>, + Root::Value, ) -> Result<(), AbortError>, CallerError: Display + Debug, { @@ -827,12 +835,12 @@ impl TreeFile { &(..), true, in_transaction, - &mut |_, _, _| ScanEvaluation::ReadData, - &mut |key, _index| { + |_, _, _| ScanEvaluation::ReadData, + |key, _index| { result = Some(key.clone()); ScanEvaluation::Stop }, - &mut |_key, _index, _value| Ok(()), + |_key, _index, _value| Ok(()), )?; Ok(result) @@ -842,15 +850,15 @@ impl TreeFile { pub fn first( &mut self, in_transaction: bool, - ) -> Result, ArcBytes<'static>)>, Error> { + ) -> Result, Root::Value)>, Error> { let mut result = None; let mut key_requested = false; self.scan( &(..), true, in_transaction, - &mut |_, _, _| ScanEvaluation::ReadData, - &mut |_, _| { + |_, _, _| ScanEvaluation::ReadData, + |_, _| { if key_requested { ScanEvaluation::Stop } else { @@ -858,7 +866,7 @@ impl TreeFile { ScanEvaluation::ReadData } }, - &mut |key, _index, value| { + |key, _index, value| { result = Some((key, value)); Ok(()) }, @@ -874,12 +882,12 @@ impl TreeFile { &(..), false, in_transaction, - &mut |_, _, _| ScanEvaluation::ReadData, - &mut |key, _index| { + |_, _, _| ScanEvaluation::ReadData, + |key, _index| { result = Some(key.clone()); ScanEvaluation::Stop }, - &mut |_key, _index, _value| Ok(()), + |_key, _index, _value| Ok(()), )?; Ok(result) @@ -889,15 +897,15 @@ impl TreeFile { pub fn last( &mut self, in_transaction: bool, - ) -> Result, ArcBytes<'static>)>, Error> { + ) -> Result, Root::Value)>, Error> { let mut result = None; let mut key_requested = false; self.scan( &(..), false, in_transaction, - &mut |_, _, _| ScanEvaluation::ReadData, - &mut |_, _| { + |_, _, _| ScanEvaluation::ReadData, + |_, _| { if key_requested { ScanEvaluation::Stop } else { @@ -905,7 +913,7 @@ impl TreeFile { ScanEvaluation::ReadData } }, - &mut |key, _index, value| { + |key, _index, value| { result = Some((key, value)); Ok(()) }, @@ -1034,7 +1042,7 @@ where impl TreeFile, File> where - Index: EmbeddedIndex + Clone + Debug + 'static, + Index: EmbeddedIndex> + Clone + Debug + 'static, { /// Scans the tree for keys that are contained within `range`. If `forwards` /// is true, scanning starts at the lowest sort-order key and scans forward. @@ -1052,8 +1060,8 @@ where range: Range, forwards: bool, in_transaction: bool, - key_evaluator: &mut KeyEvaluator, - data_callback: &mut DataCallback, + mut key_evaluator: KeyEvaluator, + data_callback: DataCallback, ) -> Result<(), AbortError> where Range: RangeBounds + Debug + 'static, @@ -1338,7 +1346,7 @@ struct TreeModifier<'a, 'm, Root: root::Root> { state: &'a State, vault: Option<&'a dyn AnyVault>, cache: Option<&'a ChunkCache>, - modification: Option, Root::Index>>, + modification: Option>, scratch: &'a mut Vec, } @@ -1477,7 +1485,7 @@ struct TreeGetter< 'keys, Root: root::Root, KeyEvaluator: FnMut(&ArcBytes<'static>, &Root::Index) -> ScanEvaluation, - KeyReader: FnMut(ArcBytes<'static>, ArcBytes<'static>, Root::Index) -> Result<(), Error>, + KeyReader: FnMut(ArcBytes<'static>, Root::Value, Root::Index) -> Result<(), Error>, Keys: Iterator, > { from_transaction: bool, @@ -1493,7 +1501,7 @@ impl<'a, 'keys, KeyEvaluator, KeyReader, Root, Keys> FileOp> for TreeGetter<'a, 'keys, Root, KeyEvaluator, KeyReader, Keys> where KeyEvaluator: FnMut(&ArcBytes<'static>, &Root::Index) -> ScanEvaluation, - KeyReader: FnMut(ArcBytes<'static>, ArcBytes<'static>, Root::Index) -> Result<(), Error>, + KeyReader: FnMut(ArcBytes<'static>, Root::Value, Root::Index) -> Result<(), Error>, Keys: Iterator, Root: root::Root, { @@ -1542,11 +1550,8 @@ struct TreeScanner< > where NodeEvaluator: FnMut(&ArcBytes<'static>, &Root::ReducedIndex, usize) -> ScanEvaluation, KeyEvaluator: FnMut(&ArcBytes<'static>, &Root::Index) -> ScanEvaluation, - KeyReader: FnMut( - ArcBytes<'static>, - &Root::Index, - ArcBytes<'static>, - ) -> Result<(), AbortError>, + KeyReader: + FnMut(ArcBytes<'static>, &Root::Index, Root::Value) -> Result<(), AbortError>, KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized, CallerError: Display + Debug, { @@ -1585,11 +1590,8 @@ impl< where NodeEvaluator: FnMut(&ArcBytes<'static>, &Root::ReducedIndex, usize) -> ScanEvaluation, KeyEvaluator: FnMut(&ArcBytes<'static>, &Root::Index) -> ScanEvaluation, - KeyReader: FnMut( - ArcBytes<'static>, - &Root::Index, - ArcBytes<'static>, - ) -> Result<(), AbortError>, + KeyReader: + FnMut(ArcBytes<'static>, &Root::Index, Root::Value) -> Result<(), AbortError>, KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized, CallerError: Display + Debug, { @@ -1636,7 +1638,7 @@ where struct TreeSequenceGetter< 'a, - Index: EmbeddedIndex + Clone + Debug + 'static, + Index: EmbeddedIndex> + Clone + Debug + 'static, KeyEvaluator: for<'k> FnMut(SequenceId, &'k BySequenceIndex) -> ScanEvaluation, KeyReader: FnMut(SequenceId, BySequenceIndex, ArcBytes<'static>) -> Result<(), Error>, Keys: Iterator, @@ -1656,7 +1658,7 @@ where KeyEvaluator: for<'k> FnMut(SequenceId, &'k BySequenceIndex) -> ScanEvaluation, KeyReader: FnMut(SequenceId, BySequenceIndex, ArcBytes<'static>) -> Result<(), Error>, Keys: Iterator, - Index: EmbeddedIndex + Clone + Debug + 'static, + Index: EmbeddedIndex> + Clone + Debug + 'static, { fn execute(mut self, file: &mut dyn File) -> Result<(), Error> { if self.from_transaction { @@ -1670,10 +1672,10 @@ where .keys .into_iter() .map(|sequence| sequence.0.to_be_bytes()), - &mut |key, index| { + |key, index| { (self.key_evaluator)(SequenceId::try_from(key.as_slice()).unwrap(), index) }, - &mut |key, value, index| { + |key, value, index| { (self.key_reader)(SequenceId::try_from(key.as_slice()).unwrap(), index, value) }, file, @@ -1691,10 +1693,10 @@ where .keys .into_iter() .map(|sequence| sequence.0.to_be_bytes()), - &mut |key, index| { + |key, index| { (self.key_evaluator)(SequenceId::try_from(key.as_slice()).unwrap(), index) }, - &mut |key, value, index| { + |key, value, index| { (self.key_reader)(SequenceId::try_from(key.as_slice()).unwrap(), index, value) }, file, @@ -1719,7 +1721,7 @@ struct TreeSequenceScanner< DataCallback: FnMut(KeySequence, ArcBytes<'static>) -> Result<(), AbortError>, CallerError: Display + Debug, - Index: EmbeddedIndex + Clone + Debug + 'static, + Index: EmbeddedIndex> + Clone + Debug + 'static, { forwards: bool, from_transaction: bool, @@ -1748,7 +1750,7 @@ where DataCallback: FnMut(KeySequence, ArcBytes<'static>) -> Result<(), AbortError>, CallerError: Display + Debug, - Index: EmbeddedIndex + Clone + Debug + 'static, + Index: EmbeddedIndex> + Clone + Debug + 'static, { fn execute(self, file: &mut dyn File) -> Result<(), AbortError> { let Self { @@ -1931,7 +1933,7 @@ impl<'a> PagedWriter<'a> { /// Writes a chunk of data to the file, after possibly encrypting it. /// Returns the position that this chunk can be read from in the file. #[allow(clippy::cast_possible_truncation)] - fn write_chunk(&mut self, contents: &[u8]) -> Result { + pub fn write_chunk(&mut self, contents: &[u8]) -> Result { let possibly_encrypted = self.vault.as_ref().map_or_else( || Ok(Cow::Borrowed(contents)), |vault| vault.encrypt(contents).map(Cow::Owned), @@ -1948,7 +1950,9 @@ impl<'a> PagedWriter<'a> { Ok(position) } - fn read_chunk(&mut self, position: u64) -> Result { + /// Reads a "chunk" of data located at `position`. `position` should be a + /// location previously returned by [`Self::write_chunk()`]. + pub fn read_chunk(&mut self, position: u64) -> Result { read_chunk(position, false, self.file, self.vault, self.cache) } @@ -2048,7 +2052,8 @@ pub(crate) fn copy_chunk( clippy::cast_possible_truncation, clippy::cast_sign_loss )] -fn dynamic_order(number_of_records: u64, max_order: Option) -> usize { +#[must_use] +pub fn dynamic_order(number_of_records: u64, max_order: Option) -> usize { // Current approximation is the 3rd root. let max_order = max_order.unwrap_or(DEFAULT_MAX_ORDER); if number_of_records > max_order.pow(3) as u64 { @@ -2174,16 +2179,63 @@ impl U64Range { } } +/// The key and value of an entry.. +#[derive(Eq, PartialEq, Clone, Debug, Default)] +pub struct KeyValue { + /// The key of this entry. + pub key: Key, + /// The value of this entry. + pub value: Value, +} + +/// The key and index of an entry. +#[derive(Eq, PartialEq, Clone, Debug, Default)] +pub struct KeyIndex { + /// The key of this entry. + pub key: ArcBytes<'static>, + /// The index of this entry. + pub index: Index, +} + +/// A key and index of an entry from a tree with [`Root`] `R`. +pub type TreeKeyIndex = KeyIndex<::Index>; + +/// The value and index of an entry. +#[derive(Eq, PartialEq, Clone, Debug, Default)] +pub struct ValueIndex { + /// The value of this entry. + pub value: Value, + /// The index of this entry. + pub index: Index, +} + +/// A value and index of an entry from a tree with [`Root`] `R`. +pub type TreeValueIndex = ValueIndex<::Value, ::Index>; + +/// A complete entry in a tree. +#[derive(Eq, PartialEq, Clone, Debug, Default)] +pub struct Entry { + /// The key of this entry. + pub key: ArcBytes<'static>, + /// The value of this entry. + pub value: Value, + /// The index of this entry. + pub index: Index, +} + +/// An entry from a tree with [`Root`] `R`. +pub type TreeEntry = Entry<::Value, ::Index>; + /// An index that is embeddable within a tree. /// /// An index is a computed value that is stored directly within the B-Tree /// structure. Because these are encoded directly onto the nodes, they should be /// kept shorter for better performance. -pub trait EmbeddedIndex: Serializable + Clone + Debug + Send + Sync + 'static { +pub trait EmbeddedIndex: Serializable + Clone + Debug + Send + Sync + 'static { /// The reduced representation of this index. type Reduced: Serializable + Clone + Debug + Send + Sync + 'static; /// The reducer that reduces arrays of `Self` or `Self::Reduced` into `Self::Reduced`. - type Indexer: Indexer + Reducer; + type Indexer: Indexer + Reducer; } /// A type that can be serialized and deserialized. @@ -2196,13 +2248,13 @@ pub trait Serializable: Send + Sync + Sized + 'static { fn deserialize_from(reader: &mut R) -> Result; } -impl EmbeddedIndex for () { +impl EmbeddedIndex for () { type Reduced = Self; type Indexer = Self; } -impl Indexer<()> for () { - fn index(&self, _key: &ArcBytes<'_>, _value: Option<&ArcBytes<'static>>) -> Self {} +impl Indexer for () { + fn index(&self, _key: &ArcBytes<'_>, _value: Option<&Value>) -> Self {} } impl Serializable for () { @@ -2223,6 +2275,23 @@ pub struct ModificationResult { pub index: Option, } +/// The result of a change to a [`BTreeNode`]. +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +pub enum ChangeResult { + /// No changes were made. + Unchanged, + /// The node modified is now empty and should be removed. + Remove, + /// The node modified is now has fewer entries than the tree should have, + /// and its children should be absorbed into neighbors. + Absorb, + /// The node was changed. + Changed, + /// The node modified is now has more entries than the tree should have, and + /// it should be split. + Split, +} + #[cfg(test)] mod tests { use std::{ @@ -2292,7 +2361,7 @@ mod tests { } } - fn insert_one_record( + fn insert_one_record> + Default, F: ManagedFile>( context: &Context, file_path: &Path, ids: &mut HashSet, @@ -2336,7 +2405,7 @@ mod tests { } } - fn remove_one_record( + fn remove_one_record> + Default, F: ManagedFile>( context: &Context, file_path: &Path, id: u64, @@ -2426,7 +2495,7 @@ mod tests { } } - fn remove(label: &str) { + fn remove> + Default>(label: &str) { const ORDER: usize = 4; // We've seen a couple of failures in CI, but have never been able to @@ -2492,7 +2561,7 @@ mod tests { spam_insert::("std-unversioned"); } - fn spam_insert(name: &str) { + fn spam_insert> + Default, F: ManagedFile>(name: &str) { const RECORDS: usize = 1_000; let mut rng = Pcg64::new_seed(1); let ids = (0..RECORDS).map(|_| rng.generate::()); @@ -2545,7 +2614,10 @@ mod tests { bulk_insert::("any-unversioned", AnyFileManager::std()); } - fn bulk_insert(name: &str, file_manager: M) { + fn bulk_insert> + Default, M: FileManager>( + name: &str, + file_manager: M, + ) { const RECORDS_PER_BATCH: usize = 10; const BATCHES: usize = 1000; let mut rng = Pcg64::new_seed(1); @@ -2640,7 +2712,10 @@ mod tests { assert_eq!(&all_records, &all_through_scan); } - fn compact(label: &str, file_manager: M) { + fn compact> + Default, M: FileManager>( + label: &str, + file_manager: M, + ) { const ORDER: usize = 4; let mut rng = Pcg64::new_seed(1); let context = Context { @@ -2733,8 +2808,8 @@ mod tests { .., true, false, - &mut |_| ScanEvaluation::ReadData, - &mut |sequence, value| { + |_| ScanEvaluation::ReadData, + |sequence, value| { sequences.push((sequence, value)); Ok(()) }, @@ -2833,7 +2908,10 @@ mod tests { assert_eq!(tree.get(b"test", false).unwrap().unwrap(), b"hello world"); } - fn edit_keys(label: &str, file_manager: M) { + fn edit_keys> + Default, M: FileManager>( + label: &str, + file_manager: M, + ) { let context = Context { file_manager, vault: None, @@ -2846,20 +2924,20 @@ mod tests { let mut tree = TreeFile::::write(&file_path, State::default(), &context, None).unwrap(); assert!(matches!( - tree.compare_and_swap(b"test", Some(b"won't match"), None, None) + tree.compare_and_swap(b"test", Some(&b"won't match"[..]), None, None) .unwrap_err(), CompareAndSwapError::Conflict(_) )); tree.compare_and_swap(b"test", None, Some(ArcBytes::from(b"first")), None) .unwrap(); assert!(matches!( - tree.compare_and_swap(b"test", Some(b"won't match"), None, None) + tree.compare_and_swap(b"test", Some(&b"won't match"[..]), None, None) .unwrap_err(), CompareAndSwapError::Conflict(_) )); tree.compare_and_swap( b"test", - Some(b"first"), + Some(&b"first"[..]), Some(ArcBytes::from(b"second")), None, ) @@ -2977,7 +3055,10 @@ mod tests { } } - fn first_last(label: &str, file_manager: M) { + fn first_last> + Default, M: FileManager>( + label: &str, + file_manager: M, + ) { let context = Context { file_manager, vault: None, @@ -3037,7 +3118,10 @@ mod tests { first_last::("any-unversioned", AnyFileManager::memory()); } - fn bulk_compare_swaps(label: &str, file_manager: M) { + fn bulk_compare_swaps> + Default, M: FileManager>( + label: &str, + file_manager: M, + ) { const BATCH: usize = 10_000; let context = Context { file_manager, diff --git a/nebari/src/tree/modify.rs b/nebari/src/tree/modify.rs index 91c8264406..088d98b693 100644 --- a/nebari/src/tree/modify.rs +++ b/nebari/src/tree/modify.rs @@ -3,7 +3,7 @@ use std::{ ops::{Deref, DerefMut}, }; -use super::btree_entry::KeyOperation; +use super::btree::KeyOperation; use crate::{error::Error, transaction::TransactionId, ArcBytes, ErrorKind}; /// A tree modification. @@ -18,7 +18,12 @@ pub struct Modification<'a, T, Index> { } impl<'a, T, Index> Modification<'a, T, Index> { - pub(crate) fn reverse(&mut self) -> Result<(), Error> { + /// Prepares this modification for efficient operation, and ensures that the + /// keys are properly ordered. + /// + /// After calling this function, the keys and values (if applicable) are + /// reversed so that keys and values can be removed by calling [`Vec::pop`]. + pub fn prepare(&mut self) -> Result<(), Error> { if self.keys.windows(2).all(|w| w[0] < w[1]) { self.keys.reverse(); if let Operation::SetEach(values) = &mut self.operation { diff --git a/nebari/src/tree/root.rs b/nebari/src/tree/root.rs index 8c3188bca6..83d0a928ed 100644 --- a/nebari/src/tree/root.rs +++ b/nebari/src/tree/root.rs @@ -15,7 +15,7 @@ use crate::{ roots::AnyTransactionTree, transaction::{TransactionId, TransactionManager}, tree::{ - btree_entry::ScanArgs, state::AnyTreeState, Modification, ModificationResult, PageHeader, + btree::ScanArgs, state::AnyTreeState, Modification, ModificationResult, PageHeader, PagedWriter, Reducer, ScanEvaluation, State, TreeFile, }, vault::AnyVault, @@ -33,6 +33,8 @@ pub trait Root: Debug + Send + Sync + Clone + 'static { type ReducedIndex: Clone + Debug + 'static; /// The reducer that reduces `Index`es and re-reduces `ReducedIndex`es. type Reducer: Reducer + 'static; + /// The value type stored by this root. + type Value: Debug + 'static; /// Returns a new instance with the provided reducer. fn default_with(reducer: Self::Reducer) -> Self; @@ -98,7 +100,7 @@ pub trait Root: Debug + Send + Sync + Clone + 'static { /// indexes, if the keys are still present. fn modify<'a, 'w>( &'a mut self, - modification: Modification<'_, ArcBytes<'static>, Self::Index>, + modification: Modification<'_, Self::Value, Self::Index>, writer: &'a mut PagedWriter<'w>, max_order: Option, ) -> Result>, Error>; @@ -119,7 +121,7 @@ pub trait Root: Debug + Send + Sync + Clone + 'static { ) -> Result<(), Error> where KeyEvaluator: FnMut(&ArcBytes<'static>, &Self::Index) -> ScanEvaluation, - KeyReader: FnMut(ArcBytes<'static>, ArcBytes<'static>, Self::Index) -> Result<(), Error>, + KeyReader: FnMut(ArcBytes<'static>, Self::Value, Self::Index) -> Result<(), Error>, Keys: Iterator; /// Scans the tree over `range`. `args.key_evaluator` is invoked for each key as @@ -137,6 +139,7 @@ pub trait Root: Debug + Send + Sync + Clone + 'static { &self, range: &'keys KeyRangeBounds, args: &mut ScanArgs< + Self::Value, Self::Index, Self::ReducedIndex, CallerError, @@ -155,7 +158,7 @@ pub trait Root: Debug + Send + Sync + Clone + 'static { ScanDataCallback: FnMut( ArcBytes<'static>, &Self::Index, - ArcBytes<'static>, + Self::Value, ) -> Result<(), AbortError>; /// Copies all data from `file` into `writer`, updating `self` with the new diff --git a/nebari/src/tree/unversioned.rs b/nebari/src/tree/unversioned.rs index ececddf077..25953b6396 100644 --- a/nebari/src/tree/unversioned.rs +++ b/nebari/src/tree/unversioned.rs @@ -1,14 +1,13 @@ use std::{ collections::HashMap, fmt::{Debug, Display}, - marker::PhantomData, ops::RangeBounds, }; use byteorder::{BigEndian, ByteOrder, ReadBytesExt, WriteBytesExt}; use super::{ - btree_entry::BTreeEntry, + btree::BTreeEntry, by_id::{ByIdStats, UnversionedByIdIndex}, modify::Modification, serialization::BinarySerialization, @@ -21,11 +20,9 @@ use crate::{ roots::AbortError, transaction::TransactionId, tree::{ - btree_entry::{Indexer, KeyOperation, ModificationContext, NodeInclusion, ScanArgs}, + btree::{Indexer, KeyOperation, ModificationContext, NodeInclusion, ScanArgs}, by_id::ByIdIndexer, - copy_chunk, dynamic_order, - versioned::ChangeResult, - BTreeNode, ModificationResult, PageHeader, Root, + copy_chunk, dynamic_order, BTreeNode, ChangeResult, ModificationResult, PageHeader, Root, }, vault::AnyVault, ArcBytes, ChunkCache, ErrorKind, @@ -40,20 +37,21 @@ pub type Unversioned = UnversionedTreeRoot<()>; #[derive(Clone, Debug)] pub struct UnversionedTreeRoot where - Index: Clone + super::EmbeddedIndex + Debug + 'static, + Index: Clone + super::EmbeddedIndex> + Debug + 'static, { /// The transaction ID of the tree root. If this transaction ID isn't /// present in the transaction log, this root should not be trusted. pub transaction_id: Option, /// The by-id B-Tree. - pub by_id_root: BTreeEntry, ByIdStats>, + pub by_id_root: + BTreeEntry>, ByIdStats>, reducer: ::Reducer, } impl Default for UnversionedTreeRoot where - Index: Clone + super::EmbeddedIndex + Debug + 'static, + Index: Clone + super::EmbeddedIndex> + Debug + 'static, ::Reducer: Default, { fn default() -> Self { @@ -67,15 +65,20 @@ where impl UnversionedTreeRoot where - Index: Clone + super::EmbeddedIndex + Debug + 'static, + Index: Clone + super::EmbeddedIndex> + Debug + 'static, { fn modify_id_root<'a, 'w>( &'a mut self, - mut modification: Modification<'_, ArcBytes<'static>, UnversionedByIdIndex>, + mut modification: Modification< + '_, + ArcBytes<'static>, + UnversionedByIdIndex>, + >, writer: &'a mut PagedWriter<'w>, max_order: Option, - ) -> Result>>, Error> { - modification.reverse()?; + ) -> Result>>>, Error> + { + modification.prepare()?; let total_keys = self.by_id_root.stats(self.reducer()).total_keys() + modification.keys.len() as u64; @@ -91,24 +94,23 @@ where while !modification.keys.is_empty() { match self.by_id_root.modify( &mut modification, - &mut ModificationContext { - current_order: by_id_order, + &mut ModificationContext::new( + by_id_order, minimum_children, - indexer: |key: &ArcBytes<'_>, - value: Option<&ArcBytes<'static>>, - _existing_index, - _changes, - writer: &mut PagedWriter<'_>| { + |key: &ArcBytes<'_>, + value: Option<&ArcBytes<'static>>, + _existing_index, + writer: &mut PagedWriter<'_>| { if let Some(value) = value { let position = writer.write_chunk(value)?; // write_chunk errors if it can't fit within a u32 #[allow(clippy::cast_possible_truncation)] let value_length = value.len() as u32; - let new_index = UnversionedByIdIndex { + let new_index = UnversionedByIdIndex::new( value_length, position, - embedded: reducer.0.index(key, Some(value)), - }; + reducer.0.index(key, Some(value)), + ); results.push(ModificationResult { key: key.to_owned(), index: Some(new_index.clone()), @@ -122,15 +124,13 @@ where Ok(KeyOperation::Remove) } }, - loader: |index, writer| match writer.read_chunk(index.position)? { + |index, writer| match writer.read_chunk(index.position)? { CacheEntry::ArcBytes(buffer) => Ok(Some(buffer.clone())), CacheEntry::Decoded(_) => unreachable!(), }, - reducer: self.reducer().clone(), - _phantom: PhantomData, - }, + self.reducer().clone(), + ), None, - &mut (), writer, )? { ChangeResult::Absorb | ChangeResult::Changed | ChangeResult::Unchanged => {} @@ -150,10 +150,11 @@ where impl Root for UnversionedTreeRoot where - EmbeddedIndex: Clone + super::EmbeddedIndex + 'static, + EmbeddedIndex: Clone + super::EmbeddedIndex> + 'static, { const HEADER: PageHeader = PageHeader::UnversionedHeader; - type Index = UnversionedByIdIndex; + type Value = ArcBytes<'static>; + type Index = UnversionedByIdIndex; type ReducedIndex = ByIdStats; type Reducer = ByIdIndexer; @@ -173,38 +174,16 @@ where self.by_id_root.stats(self.reducer()).alive_keys } - fn initialized(&self) -> bool { - self.transaction_id.is_some() - } - fn dirty(&self) -> bool { self.by_id_root.dirty } - fn initialize_default(&mut self) { - self.transaction_id = Some(TransactionId(0)); + fn initialized(&self) -> bool { + self.transaction_id.is_some() } - fn deserialize(mut bytes: ArcBytes<'_>, reducer: Self::Reducer) -> Result { - let transaction_id = Some(TransactionId(bytes.read_u64::()?)); - let by_id_size = bytes.read_u32::()? as usize; - if by_id_size != bytes.len() { - return Err(Error::data_integrity(format!( - "Header reported index size {}, but data has {} remaining", - by_id_size, - bytes.len() - ))); - }; - - let mut by_id_bytes = bytes.read_bytes(by_id_size)?.to_owned(); - - let by_id_root = BTreeEntry::deserialize_from(&mut by_id_bytes, None)?; - - Ok(Self { - transaction_id, - by_id_root, - reducer, - }) + fn initialize_default(&mut self) { + self.transaction_id = Some(TransactionId(0)); } fn serialize( @@ -229,6 +208,28 @@ where Ok(()) } + fn deserialize(mut bytes: ArcBytes<'_>, reducer: Self::Reducer) -> Result { + let transaction_id = Some(TransactionId(bytes.read_u64::()?)); + let by_id_size = bytes.read_u32::()? as usize; + if by_id_size != bytes.len() { + return Err(Error::data_integrity(format!( + "Header reported index size {}, but data has {} remaining", + by_id_size, + bytes.len() + ))); + }; + + let mut by_id_bytes = bytes.read_bytes(by_id_size)?.to_owned(); + + let by_id_root = BTreeEntry::deserialize_from(&mut by_id_bytes, None)?; + + Ok(Self { + transaction_id, + by_id_root, + reducer, + }) + } + fn transaction_id(&self) -> TransactionId { self.transaction_id.unwrap_or_default() } @@ -280,6 +281,7 @@ where &self, range: &'keys KeyRangeBounds, args: &mut ScanArgs< + Self::Value, Self::Index, Self::ReducedIndex, CallerError, @@ -325,7 +327,7 @@ where vault, &mut scratch, &mut |_key, - index: &mut UnversionedByIdIndex, + index: &mut UnversionedByIdIndex>, from_file, copied_chunks, to_file, diff --git a/nebari/src/tree/versioned.rs b/nebari/src/tree/versioned.rs index 5f12302b25..02d1b62fbd 100644 --- a/nebari/src/tree/versioned.rs +++ b/nebari/src/tree/versioned.rs @@ -2,19 +2,18 @@ use std::{ array::TryFromSliceError, collections::HashMap, fmt::{Debug, Display}, - marker::PhantomData, ops::RangeBounds, }; use byteorder::{BigEndian, ByteOrder, ReadBytesExt, WriteBytesExt}; use super::{ - btree_entry::BTreeEntry, + btree::BTreeEntry, by_id::{ByIdStats, VersionedByIdIndex}, by_sequence::{BySequenceIndex, BySequenceStats}, modify::Modification, serialization::BinarySerialization, - PagedWriter, ScanEvaluation, PAGE_SIZE, + ChangeResult, PagedWriter, ScanEvaluation, PAGE_SIZE, }; use crate::{ chunk_cache::CacheEntry, @@ -23,7 +22,7 @@ use crate::{ roots::AbortError, transaction::TransactionId, tree::{ - btree_entry::{Indexer, KeyOperation, ModificationContext, NodeInclusion, ScanArgs}, + btree::{Indexer, KeyOperation, ModificationContext, NodeInclusion, ScanArgs}, by_id::ByIdIndexer, by_sequence::{BySequenceReducer, SequenceId}, copy_chunk, dynamic_order, @@ -44,7 +43,7 @@ pub type Versioned = VersionedTreeRoot<()>; #[derive(Clone, Debug)] pub struct VersionedTreeRoot where - EmbeddedIndex: super::EmbeddedIndex, + EmbeddedIndex: super::EmbeddedIndex>, { /// The transaction ID of the tree root. If this transaction ID isn't /// present in the transaction log, this root should not be trusted. @@ -54,14 +53,16 @@ where /// The by-sequence B-Tree. pub by_sequence_root: BTreeEntry, BySequenceStats>, /// The by-id B-Tree. - pub by_id_root: - BTreeEntry, ByIdStats>, + pub by_id_root: BTreeEntry< + VersionedByIdIndex>, + ByIdStats, + >, reducer: ByIdIndexer, } impl Default for VersionedTreeRoot where - EmbeddedIndex: super::EmbeddedIndex + Clone + Debug + 'static, + EmbeddedIndex: super::EmbeddedIndex> + Clone + Debug + 'static, EmbeddedIndex::Indexer: Default, { fn default() -> Self { @@ -74,14 +75,6 @@ where } } } -#[derive(Debug, Eq, PartialEq, Clone, Copy)] -pub enum ChangeResult { - Unchanged, - Remove, - Absorb, - Changed, - Split, -} #[derive(Debug)] pub enum Children { @@ -91,9 +84,11 @@ pub enum Children { impl VersionedTreeRoot where - EmbeddedIndex: super::EmbeddedIndex + Clone + Debug + 'static, - ByIdIndexer: - Reducer, ByIdStats>, + EmbeddedIndex: super::EmbeddedIndex> + Clone + Debug + 'static, + ByIdIndexer: Reducer< + VersionedByIdIndex>, + ByIdStats, + >, { fn modify_sequence_root( &mut self, @@ -106,7 +101,7 @@ where max_order: Option, ) -> Result<(), Error> { // Reverse so that pop is efficient. - modification.reverse()?; + modification.prepare()?; let total_sequence_records = self .by_sequence_root @@ -122,24 +117,21 @@ where while !modification.keys.is_empty() { match self.by_sequence_root.modify( &mut modification, - &mut ModificationContext { - current_order: by_sequence_order, - minimum_children: by_sequence_minimum_children, - indexer: - &mut |_key: &ArcBytes<'_>, - value: Option<&BySequenceIndex>, - _existing_index: Option<&BySequenceIndex>, - _changes: &mut EntryChanges, - _writer: &mut PagedWriter<'_>| { - Ok(KeyOperation::Set(value.unwrap().clone())) - }, - loader: |_index: &BySequenceIndex, - _writer: &mut PagedWriter<'_>| Ok(None), - reducer: BySequenceReducer, - _phantom: PhantomData, - }, + &mut ModificationContext::new( + by_sequence_order, + by_sequence_minimum_children, + |_key: &ArcBytes<'_>, + value: Option<&BySequenceIndex>, + _existing_index: Option<&BySequenceIndex>, + _writer: &mut PagedWriter<'_>| { + Ok(KeyOperation::Set(value.unwrap().clone())) + }, + |_index: &BySequenceIndex, _writer: &mut PagedWriter<'_>| { + Ok(None) + }, + BySequenceReducer, + ), None, - &mut EntryChanges::default(), writer, )? { ChangeResult::Absorb @@ -156,12 +148,17 @@ where fn modify_id_root( &mut self, - mut modification: Modification<'_, ArcBytes<'static>, VersionedByIdIndex>, + mut modification: Modification< + '_, + ArcBytes<'static>, + VersionedByIdIndex>, + >, changes: &mut EntryChanges, writer: &mut PagedWriter<'_>, max_order: Option, - ) -> Result>>, Error> { - modification.reverse()?; + ) -> Result>>>, Error> + { + modification.prepare()?; let total_id_records = self.by_id_root.stats(self.reducer()).total_keys() + modification.keys.len() as u64; @@ -177,14 +174,15 @@ where let reducer = self.reducer.clone(); match self.by_id_root.modify( &mut modification, - &mut ModificationContext { - current_order: by_id_order, - minimum_children: by_id_minimum_children, - indexer: &mut |key: &ArcBytes<'_>, - value: Option<&ArcBytes<'static>>, - existing_index: Option<&VersionedByIdIndex>, - changes: &mut EntryChanges, - writer: &mut PagedWriter<'_>| { + &mut ModificationContext::new( + by_id_order, + by_id_minimum_children, + |key: &ArcBytes<'_>, + value: Option<&ArcBytes<'static>>, + existing_index: Option< + &VersionedByIdIndex>, + >, + writer: &mut PagedWriter<'_>| { let (position, value_size) = if let Some(value) = value { let new_position = writer.write_chunk(value)?; // write_chunk errors if it can't fit within a u32 @@ -210,19 +208,19 @@ where value_position: position, value_size, }); - let new_index = VersionedByIdIndex { - sequence_id: changes.current_sequence, + let new_index = VersionedByIdIndex::new( + changes.current_sequence, + value_size, position, - value_length: value_size, embedded, - }; + ); results.push(ModificationResult { key, index: Some(new_index.clone()), }); Ok(KeyOperation::Set(new_index)) }, - loader: |index, writer| { + |index, writer| { if index.position > 0 { match writer.read_chunk(index.position) { Ok(CacheEntry::ArcBytes(buffer)) => Ok(Some(buffer)), @@ -233,11 +231,9 @@ where Ok(None) } }, - reducer: self.reducer().clone(), - _phantom: PhantomData, - }, + self.reducer().clone(), + ), None, - changes, writer, )? { ChangeResult::Absorb | ChangeResult::Changed | ChangeResult::Unchanged => {} @@ -259,10 +255,11 @@ where impl Root for VersionedTreeRoot where - EmbeddedIndex: super::EmbeddedIndex + Clone + Debug + 'static, + EmbeddedIndex: super::EmbeddedIndex> + Clone + Debug + 'static, { const HEADER: PageHeader = PageHeader::VersionedHeader; - type Index = VersionedByIdIndex; + type Value = ArcBytes<'static>; + type Index = VersionedByIdIndex; type ReducedIndex = ByIdStats; type Reducer = ByIdIndexer; @@ -280,20 +277,47 @@ where &self.reducer } - fn initialized(&self) -> bool { - self.sequence.valid() + fn count(&self) -> u64 { + self.by_id_root.stats(self.reducer()).alive_keys } fn dirty(&self) -> bool { self.by_id_root.dirty || self.by_sequence_root.dirty } + fn initialized(&self) -> bool { + self.sequence.valid() + } + fn initialize_default(&mut self) { self.sequence = SequenceId(1); } - fn count(&self) -> u64 { - self.by_id_root.stats(self.reducer()).alive_keys + fn serialize( + &mut self, + paged_writer: &mut PagedWriter<'_>, + output: &mut Vec, + ) -> Result<(), Error> { + output.reserve(PAGE_SIZE); + output.write_u64::(self.transaction_id.0)?; + output.write_u64::(self.sequence.0)?; + // Reserve space for by_sequence and by_id sizes (2xu16). + output.write_u64::(0)?; + + let by_sequence_size = self.by_sequence_root.serialize_to(output, paged_writer)?; + + let by_id_size = self.by_id_root.serialize_to(output, paged_writer)?; + + let by_sequence_size = u32::try_from(by_sequence_size) + .ok() + .ok_or(ErrorKind::Internal(InternalError::HeaderTooLarge))?; + BigEndian::write_u32(&mut output[16..20], by_sequence_size); + let by_id_size = u32::try_from(by_id_size) + .ok() + .ok_or(ErrorKind::Internal(InternalError::HeaderTooLarge))?; + BigEndian::write_u32(&mut output[20..24], by_id_size); + + Ok(()) } fn deserialize(mut bytes: ArcBytes<'_>, reducer: Self::Reducer) -> Result { @@ -325,33 +349,6 @@ where }) } - fn serialize( - &mut self, - paged_writer: &mut PagedWriter<'_>, - output: &mut Vec, - ) -> Result<(), Error> { - output.reserve(PAGE_SIZE); - output.write_u64::(self.transaction_id.0)?; - output.write_u64::(self.sequence.0)?; - // Reserve space for by_sequence and by_id sizes (2xu16). - output.write_u64::(0)?; - - let by_sequence_size = self.by_sequence_root.serialize_to(output, paged_writer)?; - - let by_id_size = self.by_id_root.serialize_to(output, paged_writer)?; - - let by_sequence_size = u32::try_from(by_sequence_size) - .ok() - .ok_or(ErrorKind::Internal(InternalError::HeaderTooLarge))?; - BigEndian::write_u32(&mut output[16..20], by_sequence_size); - let by_id_size = u32::try_from(by_id_size) - .ok() - .ok_or(ErrorKind::Internal(InternalError::HeaderTooLarge))?; - BigEndian::write_u32(&mut output[20..24], by_id_size); - - Ok(()) - } - fn transaction_id(&self) -> TransactionId { self.transaction_id } @@ -432,6 +429,7 @@ where &self, range: &'keys KeyRangeBounds, args: &mut ScanArgs< + Self::Value, Self::Index, Self::ReducedIndex, CallerError, @@ -481,7 +479,7 @@ where vault, &mut scratch, &mut |key, - index: &mut VersionedByIdIndex, + index: &mut VersionedByIdIndex>, from_file, copied_chunks, to_file, @@ -529,22 +527,19 @@ where // This modification copies the `sequence_indexes` into the sequence root. self.by_sequence_root.modify( &mut modification, - &mut ModificationContext { - current_order: by_sequence_order, + &mut ModificationContext::new( + by_sequence_order, minimum_children, - indexer: &mut |_key: &ArcBytes<'_>, + |_key: &ArcBytes<'_>, value: Option<&BySequenceIndex>, _existing_index: Option<&BySequenceIndex>, - _changes: &mut EntryChanges, _writer: &mut PagedWriter<'_>| { Ok(KeyOperation::Set(value.unwrap().clone())) }, - loader: |_index: &BySequenceIndex, _writer: &mut PagedWriter<'_>| unreachable!(), - reducer: BySequenceReducer, - _phantom: PhantomData, - }, + |_index: &BySequenceIndex, _writer: &mut PagedWriter<'_>| unreachable!(), + BySequenceReducer, + ), None, - &mut EntryChanges::default(), writer, )?; diff --git a/nebari/src/vault.rs b/nebari/src/vault.rs index 7925bb1fa1..d2a27c9317 100644 --- a/nebari/src/vault.rs +++ b/nebari/src/vault.rs @@ -14,6 +14,7 @@ pub trait Vault: std::fmt::Debug + Send + Sync + 'static { fn decrypt(&self, payload: &[u8]) -> Result, Self::Error>; } +/// A [`Vault`] that can be boxed. pub trait AnyVault: std::fmt::Debug + Send + Sync + 'static { /// Encrypts `payload`, returning a new buffer that contains all information /// necessary to decrypt it in the future.