diff --git a/CHANGELOG.md b/CHANGELOG.md index 6586a401b9..1474c9ef90 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,6 +84,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 you wish to remove the overhead of synchronizing, passing `PersistenceMode::Flush` will only ensure all application-level caches are flushed before confirming the write is successful. +- `BySequenceIndex` now has a generic parameter and a new field: the embedded + index. When a versioned tree is written, the current embedded index is copied + to the `BySequenceIndex`, allowing for historical retrieval of index values. + + For example, BonsaiDb is using the embedded index to store the document's + hash. This change allows for historical lookups to retrieve the document hash + without fetching the value from disk. + + When retrieving a `BySequenceIndex` that was stored prior to this change, the + embedded index will be None. For all indexes written after this change, the + embedded index will be present. If you are only working with files written + after this change, it is safe to unwrap the embedded index. ### Added @@ -99,6 +111,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `Tree` and `TransactionTree` now both have `current_sequence_id()` and `scan_sequences()` functions. These functions serve the same purpose as those already existing on `TreeFile`. +- `TreeFile`, `Tree`, and `TransactionTree` now have additional methods that + allow fetching versioned tree's information by `SequenceId``: + + - `get_multiple_by_sequence` - retrieve one or more values by their sequence + id. This retrieves the value at the time of the sequence id. + - `get_multiple_indexes_by_sequence` - retrieve one or more indexes by their + sequence id. + - `get_multiple_with_indexes_by_sequence` - retrieve one or more values and + indexes by their sequence id. + + When a sequence is found in any of these functions, a result is returned. + Because an index will still be present for deleted keys, all retrievals of + values via this method will return an Option. This allows callers to + distinguish between `SequenceId` being not found and the value being deleted. ## v0.5.3 diff --git a/nebari/src/error.rs b/nebari/src/error.rs index 05659436db..3a7e141916 100644 --- a/nebari/src/error.rs +++ b/nebari/src/error.rs @@ -1,4 +1,8 @@ -use std::{convert::Infallible, fmt::Display}; +use std::{ + array::TryFromSliceError, + convert::Infallible, + fmt::{Debug, Display}, +}; use backtrace::Backtrace; use parking_lot::{Mutex, MutexGuard}; @@ -7,7 +11,6 @@ use thiserror::Error; use crate::AbortError; /// An error from Nebari as well as an associated backtrace. -#[derive(Debug)] pub struct Error { /// The error that occurred. pub kind: ErrorKind, @@ -29,6 +32,50 @@ impl Error { backtrace.resolve(); backtrace } + + fn format_backtrace_frames(&self) -> Vec { + let mut backtrace = self.backtrace.lock(); + backtrace.resolve(); + backtrace + .frames() + .iter() + .filter_map(|frame| frame.symbols().first()) + .enumerate() + .map(|(index, symbol)| { + let mut line = format!("{index}: "); + if let Some(name) = symbol.name() { + line.push_str(&name.to_string()); + line.push(' '); + } else if let Some(addr) = symbol.addr() { + line.push_str(&format!("{:x}", addr as usize)); + line.push(' '); + } else { + // Give up on formatting this one. + line.push_str(&format!("{symbol:?}")); + return line; + } + + if let Some(file) = symbol.filename() { + if let Some(file) = file.to_str() { + line.push_str("at "); + line.push_str(file); + } else { + line.push_str(&format!("at {file:?}")); + } + + if let Some(lineno) = symbol.lineno() { + line.push(':'); + line.push_str(&lineno.to_string()); + if let Some(col) = symbol.colno() { + line.push(':'); + line.push_str(&col.to_string()); + } + } + } + line + }) + .collect() + } } impl std::error::Error for Error { @@ -39,16 +86,14 @@ impl std::error::Error for Error { impl Display for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.kind.fmt(f)?; + Display::fmt(&self.kind, f)?; #[cfg(debug_assertions)] { f.write_str("\nstack backtrace:")?; - let mut backtrace = self.backtrace.lock(); - backtrace.resolve(); - for (index, frame) in backtrace.frames().iter().enumerate() { - write!(f, "\n#{}: {:?}", index, frame)?; + for (index, frame) in self.format_backtrace_frames().into_iter().enumerate() { + write!(f, "{index}: {frame}")?; } } @@ -56,6 +101,16 @@ impl Display for Error { } } +impl Debug for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let frames = self.format_backtrace_frames(); + f.debug_struct("Error") + .field("kind", &self.kind) + .field("backtrace", &&frames[..]) + .finish() + } +} + impl From for Error { fn from(kind: ErrorKind) -> Self { Self { @@ -116,6 +171,15 @@ impl From for Error { } } +impl From for Error { + fn from(_: TryFromSliceError) -> Self { + Self { + kind: ErrorKind::Internal(InternalError::IncorrectByteLength), + backtrace: Mutex::new(Backtrace::new_unresolved()), + } + } +} + /// An error from Nebari. #[derive(Debug, Error)] #[error(transparent)] @@ -219,4 +283,6 @@ pub enum InternalError { TransactionManagerStopped, #[error("an error on an internal channel has occurred")] InternalCommunication, + #[error("an unexpected byte length was encountered")] + IncorrectByteLength, } diff --git a/nebari/src/roots.rs b/nebari/src/roots.rs index 028a16dba8..9bb4df4ae0 100644 --- a/nebari/src/roots.rs +++ b/nebari/src/roots.rs @@ -27,8 +27,8 @@ use crate::{ root::{AnyReducer, AnyTreeRoot}, state::AnyTreeState, EmbeddedIndex, KeySequence, Modification, ModificationResult, Operation, PersistenceMode, - ScanEvaluation, SequenceId, State, TransactableCompaction, TreeFile, TreeRoot, - VersionedTreeRoot, + ScanEvaluation, SequenceEntry, SequenceId, SequenceIndex, State, TransactableCompaction, + TreeFile, TreeRoot, VersionedTreeRoot, }, vault::AnyVault, ArcBytes, ChunkCache, ErrorKind, @@ -446,13 +446,56 @@ where ) -> Result<(), AbortError> where Range: RangeBounds + Debug + 'static, - KeyEvaluator: FnMut(KeySequence) -> ScanEvaluation, - DataCallback: FnMut(KeySequence, ArcBytes<'static>) -> Result<(), AbortError>, + KeyEvaluator: FnMut(KeySequence) -> ScanEvaluation, + DataCallback: + FnMut(KeySequence, ArcBytes<'static>) -> Result<(), AbortError>, CallerError: Display + Debug, { self.tree .scan_sequences(range, forwards, true, key_evaluator, data_callback) } + + /// Retrieves the keys and values associated with one or more `sequences`. + /// The value retrieved is the value of the key at the given [`SequenceId`]. + /// If a sequence is not found, it will not appear in the result map. If + /// the value was removed, None is returned for the value. + pub fn get_multiple_by_sequence( + &mut self, + sequences: Sequences, + ) -> Result, Option>)>, Error> + where + Sequences: Iterator, + { + self.tree.get_multiple_by_sequence(sequences, true) + } + + /// Retrieves the keys and indexes associated with one or more `sequences`. + /// The value retrieved is the value of the key at the given [`SequenceId`]. + /// If a sequence is not found, it will not appear in the result list. + pub fn get_multiple_indexes_by_sequence( + &mut self, + sequences: Sequences, + ) -> Result>, Error> + where + Sequences: Iterator, + { + self.tree.get_multiple_indexes_by_sequence(sequences, true) + } + + /// Retrieves the keys, values, and indexes associated with one or more + /// `sequences`. The value retrieved is the value of the key at the given + /// [`SequenceId`]. If a sequence is not found, it will not appear in the + /// result list. + pub fn get_multiple_with_indexes_by_sequence( + &mut self, + sequences: Sequences, + ) -> Result>, Error> + where + Sequences: Iterator, + { + self.tree + .get_multiple_with_indexes_by_sequence(sequences, true) + } } impl TransactionTree { @@ -1367,7 +1410,7 @@ where /// invoked with the key and values. The callback may not be invoked in the /// same order as the keys are scanned. pub fn scan_sequences( - &mut self, + &self, range: Range, forwards: bool, key_evaluator: &mut KeyEvaluator, @@ -1375,8 +1418,9 @@ where ) -> Result<(), AbortError> where Range: Clone + RangeBounds + Debug + 'static, - KeyEvaluator: FnMut(KeySequence) -> ScanEvaluation, - DataCallback: FnMut(KeySequence, ArcBytes<'static>) -> Result<(), AbortError>, + KeyEvaluator: FnMut(KeySequence) -> ScanEvaluation, + DataCallback: + FnMut(KeySequence, ArcBytes<'static>) -> Result<(), AbortError>, CallerError: Display + Debug, { catch_compaction_and_retry_abortable(|| { @@ -1390,6 +1434,77 @@ where tree.scan_sequences(range.clone(), forwards, false, key_evaluator, data_callback) }) } + + /// Retrieves the keys and values associated with one or more `sequences`. + /// The value retrieved is the value of the key at the given [`SequenceId`]. + /// If a sequence is not found, it will not appear in the result map. If + /// the value was removed, None is returned for the value. + #[allow(clippy::needless_pass_by_value)] + pub fn get_multiple_by_sequence( + &mut self, + sequences: Sequences, + ) -> Result, Option>)>, Error> + where + Sequences: Iterator + Clone, + { + catch_compaction_and_retry(|| { + let mut tree = TreeFile::, File>::read( + self.path(), + self.state.clone(), + self.roots.context(), + Some(self.roots.transactions()), + )?; + + tree.get_multiple_by_sequence(sequences.clone(), false) + }) + } + + /// Retrieves the keys and indexes associated with one or more `sequences`. + /// The value retrieved is the value of the key at the given [`SequenceId`]. + /// If a sequence is not found, it will not appear in the result list. + #[allow(clippy::needless_pass_by_value)] + pub fn get_multiple_indexes_by_sequence( + &mut self, + sequences: Sequences, + ) -> Result>, Error> + where + Sequences: Iterator + Clone, + { + catch_compaction_and_retry(|| { + let mut tree = TreeFile::, File>::read( + self.path(), + self.state.clone(), + self.roots.context(), + Some(self.roots.transactions()), + )?; + + tree.get_multiple_indexes_by_sequence(sequences.clone(), false) + }) + } + + /// Retrieves the keys, values, and indexes associated with one or more + /// `sequences`. The value retrieved is the value of the key at the given + /// [`SequenceId`]. If a sequence is not found, it will not appear in the + /// result list. + #[allow(clippy::needless_pass_by_value)] + pub fn get_multiple_with_indexes_by_sequence( + &mut self, + sequences: Sequences, + ) -> Result>, Error> + where + Sequences: Iterator + Clone, + { + catch_compaction_and_retry(|| { + let mut tree = TreeFile::, File>::read( + self.path(), + self.state.clone(), + self.roots.context(), + Some(self.roots.transactions()), + )?; + + tree.get_multiple_with_indexes_by_sequence(sequences.clone(), false) + }) + } } /// An error that could come from user code or Nebari. diff --git a/nebari/src/tree/btree_entry.rs b/nebari/src/tree/btree_entry.rs index 90f7e6b821..673ea78188 100644 --- a/nebari/src/tree/btree_entry.rs +++ b/nebari/src/tree/btree_entry.rs @@ -1205,13 +1205,62 @@ where Ok(true) } + pub(crate) fn get_multiple( + &self, + keys: &mut Keys, + key_evaluator: &mut KeyEvaluator, + key_reader: &mut KeyReader, + file: &mut dyn File, + vault: Option<&dyn AnyVault>, + cache: Option<&ChunkCache>, + ) -> Result<(), Error> + where + KeyEvaluator: for<'k> FnMut(&'k ArcBytes<'static>, &'k Index) -> ScanEvaluation, + KeyReader: FnMut(ArcBytes<'static>, ArcBytes<'static>, Index) -> Result<(), Error>, + Keys: Iterator, + Bytes: AsRef<[u8]>, + { + let mut positions_to_read = Vec::new(); + self.get( + &mut KeyRange::new(keys), + &mut |key, index| key_evaluator(key, index), + &mut |key, index| { + // Deleted keys are stored with a 0 position. + if index.position() > 0 { + positions_to_read.push((key, index.clone())); + } + Ok(()) + }, + file, + vault, + cache, + )?; + + // Sort by position on disk + positions_to_read.sort_by(|a, b| a.1.position().cmp(&b.1.position())); + + for (key, index) in positions_to_read { + if index.position() > 0 { + match read_chunk(index.position(), false, file, vault, cache)? { + CacheEntry::ArcBytes(contents) => { + key_reader(key, contents, index)?; + } + CacheEntry::Decoded(_) => unreachable!(), + }; + } else { + key_reader(key, ArcBytes::default(), index)?; + } + } + Ok(()) + } + #[cfg_attr( feature = "tracing", tracing::instrument(skip(self, key_evaluator, keys, key_reader, file, vault, cache)) )] - pub(crate) fn get<'keys, KeyEvaluator, KeyReader, Keys>( + pub(crate) fn get( &self, - keys: &mut KeyRange<'keys, Keys>, + keys: &mut KeyRange, key_evaluator: &mut KeyEvaluator, key_reader: &mut KeyReader, file: &mut dyn File, @@ -1221,7 +1270,8 @@ where where KeyEvaluator: FnMut(&ArcBytes<'static>, &Index) -> ScanEvaluation, KeyReader: FnMut(ArcBytes<'static>, &Index) -> Result<(), AbortError>, - Keys: Iterator, + Keys: Iterator, + Bytes: AsRef<[u8]>, { match &self.node { BTreeNode::Leaf(children) => { diff --git a/nebari/src/tree/by_sequence.rs b/nebari/src/tree/by_sequence.rs index c67a2ec04a..2425cabdbb 100644 --- a/nebari/src/tree/by_sequence.rs +++ b/nebari/src/tree/by_sequence.rs @@ -44,7 +44,7 @@ impl Display for SequenceId { /// The index stored within [`VersionedTreeRoot::by_sequence_root`](crate::tree::VersionedTreeRoot::by_sequence_root). #[derive(Clone, Debug)] -pub struct BySequenceIndex { +pub struct BySequenceIndex { /// The key associated with this sequence id. pub key: ArcBytes<'static>, /// The previous sequence of this key. @@ -53,9 +53,17 @@ pub struct BySequenceIndex { pub value_length: u32, /// The position of the value on disk. pub position: u64, + /// The embeded index at the time of the sequence being written. This value + /// is always present on data written from v0.6.0 onwards. If the tree being + /// used was created after v0.6.0 or has had compaction run on v0.6.0, it is + /// safe to unwrap this value. + pub embedded: Option, } -impl BinarySerialization for BySequenceIndex { +impl BinarySerialization for BySequenceIndex +where + Embedded: super::EmbeddedIndex, +{ fn serialize_to( &mut self, writer: &mut Vec, @@ -74,6 +82,11 @@ impl BinarySerialization for BySequenceIndex { bytes_written += 2; writer.extend_from_slice(&self.key); bytes_written += key_length as usize; + + if let Some(embedded) = &self.embedded { + bytes_written += embedded.serialize_to(writer)?; + } + Ok(bytes_written) } @@ -94,6 +107,10 @@ impl BinarySerialization for BySequenceIndex { } let key = reader.read_bytes(key_length)?.into_owned(); + let embedded = (!reader.is_empty()) + .then(|| Embedded::deserialize_from(reader)) + .transpose()?; + Ok(Self { key, last_sequence: if last_sequence.valid() { @@ -103,11 +120,12 @@ impl BinarySerialization for BySequenceIndex { }, value_length, position, + embedded, }) } } -impl ValueIndex for BySequenceIndex { +impl ValueIndex for BySequenceIndex { fn position(&self) -> u64 { self.position } @@ -144,13 +162,13 @@ impl BinarySerialization for BySequenceStats { #[derive(Clone, Default, Debug)] pub struct BySequenceReducer; -impl Reducer for BySequenceReducer { +impl Reducer, BySequenceStats> for BySequenceReducer { fn reduce<'a, Indexes, IndexesIter>(&self, indexes: Indexes) -> BySequenceStats where - BySequenceIndex: 'a, - Indexes: - IntoIterator + ExactSizeIterator, - IndexesIter: Iterator + ExactSizeIterator + Clone, + BySequenceIndex: 'a, + Indexes: IntoIterator, IntoIter = IndexesIter> + + ExactSizeIterator, + IndexesIter: Iterator> + ExactSizeIterator + Clone, { BySequenceStats { total_sequences: indexes.len() as u64, diff --git a/nebari/src/tree/mod.rs b/nebari/src/tree/mod.rs index 4606cf88e3..069e77ca39 100644 --- a/nebari/src/tree/mod.rs +++ b/nebari/src/tree/mod.rs @@ -99,7 +99,7 @@ pub use self::{ root::{AnyTreeRoot, Root, TreeRoot}, state::{ActiveState, State}, unversioned::{Unversioned, UnversionedTreeRoot}, - versioned::{KeySequence, Versioned, VersionedTreeRoot}, + versioned::{KeySequence, SequenceEntry, SequenceIndex, Versioned, VersionedTreeRoot}, }; /// The number of bytes in each page on-disk. @@ -1057,8 +1057,9 @@ where ) -> Result<(), AbortError> where Range: RangeBounds + Debug + 'static, - KeyEvaluator: FnMut(KeySequence) -> ScanEvaluation, - DataCallback: FnMut(KeySequence, ArcBytes<'static>) -> Result<(), AbortError>, + KeyEvaluator: FnMut(KeySequence) -> ScanEvaluation, + DataCallback: + FnMut(KeySequence, ArcBytes<'static>) -> Result<(), AbortError>, CallerError: Display + Debug, { self.file.execute(TreeSequenceScanner { @@ -1068,18 +1069,127 @@ where vault: self.vault.as_deref(), cache: self.cache.as_ref(), range: &U64Range::new(range).borrow_as_bytes(), - key_evaluator: &mut move |key: &ArcBytes<'_>, index: &BySequenceIndex| { + key_evaluator: &mut move |key: &ArcBytes<'_>, index: &BySequenceIndex| { let id = SequenceId(BigEndian::read_u64(key)); key_evaluator(KeySequence { key: index.key.clone(), sequence: id, last_sequence: index.last_sequence, + embedded: index.embedded.clone(), }) }, data_callback, })?; Ok(()) } + + /// Retrieves the keys and values associated with one or more `sequences`. + /// The value retrieved is the value of the key at the given [`SequenceId`]. + /// If a sequence is not found, it will not appear in the result map. If + /// the value was removed, None is returned for the value. + pub fn get_multiple_by_sequence( + &mut self, + sequences: Sequences, + in_transaction: bool, + ) -> Result, Option>)>, Error> + where + Sequences: Iterator, + { + let results = RefCell::new(HashMap::new()); + self.file.execute(TreeSequenceGetter { + keys: sequences, + from_transaction: in_transaction, + state: &self.state, + vault: self.vault.as_deref(), + cache: self.cache.as_ref(), + key_evaluator: |sequence, index| { + results + .borrow_mut() + .insert(sequence, (index.key.clone(), None)); + ScanEvaluation::ReadData + }, + key_reader: |sequence, _index, value| { + results + .borrow_mut() + .get_mut(&sequence) + .expect("reader can't be invoked without evaluator") + .1 = Some(value); + Ok(()) + }, + })?; + Ok(results.into_inner()) + } + + /// Retrieves the keys and indexes associated with one or more `sequences`. + /// The value retrieved is the value of the key at the given [`SequenceId`]. + /// If a sequence is not found, it will not appear in the result list. + pub fn get_multiple_indexes_by_sequence( + &mut self, + sequences: Sequences, + in_transaction: bool, + ) -> Result>, Error> + where + Sequences: Iterator, + { + let mut results = Vec::new(); + self.file.execute(TreeSequenceGetter { + keys: sequences, + from_transaction: in_transaction, + state: &self.state, + vault: self.vault.as_deref(), + cache: self.cache.as_ref(), + key_evaluator: |sequence, index| { + results.push(SequenceIndex { + sequence, + index: index.clone(), + }); + ScanEvaluation::Skip + }, + key_reader: |_, _, _| unreachable!(), + })?; + Ok(results) + } + + /// Retrieves the keys, values, and indexes associated with one or more + /// `sequences`. The value retrieved is the value of the key at the given + /// [`SequenceId`]. If a sequence is not found, it will not appear in the + /// result list. + pub fn get_multiple_with_indexes_by_sequence( + &mut self, + sequences: Sequences, + in_transaction: bool, + ) -> Result>, Error> + where + Sequences: Iterator, + { + let results = RefCell::new(HashMap::new()); + self.file.execute(TreeSequenceGetter { + keys: sequences, + from_transaction: in_transaction, + state: &self.state, + vault: self.vault.as_deref(), + cache: self.cache.as_ref(), + key_evaluator: |sequence, index| { + results.borrow_mut().insert( + sequence, + SequenceEntry { + index: index.clone(), + value: None, + }, + ); + ScanEvaluation::ReadData + }, + key_reader: |sequence, _index, value| { + results + .borrow_mut() + .get_mut(&sequence) + .expect("reader can't be invoked without evaluator") + .value = Some(value); + Ok(()) + }, + })?; + Ok(results.into_inner()) + } } /// A compaction process that runs in concert with a transaction manager. @@ -1322,30 +1432,32 @@ fn save_tree( /// One or more keys. #[derive(Debug)] -pub struct KeyRange<'a, I: Iterator> { +pub struct KeyRange, Bytes: AsRef<[u8]>> { remaining_keys: I, - current_key: Option<&'a [u8]>, + current_key: Option, + _bytes: PhantomData, } -impl<'a, I: Iterator> KeyRange<'a, I> { +impl, Bytes: AsRef<[u8]>> KeyRange { fn new(mut keys: I) -> Self { Self { current_key: keys.next(), remaining_keys: keys, + _bytes: PhantomData, } } - fn current_key(&self) -> Option<&'a [u8]> { - self.current_key + fn current_key(&self) -> Option<&[u8]> { + self.current_key.as_ref().map(Bytes::as_ref) } } -impl<'a, I: Iterator> Iterator for KeyRange<'a, I> { - type Item = &'a [u8]; - fn next(&mut self) -> Option<&'a [u8]> { - let key_to_return = self.current_key; - self.current_key = self.remaining_keys.next(); - key_to_return +impl, Bytes: AsRef<[u8]>> Iterator for KeyRange { + type Item = Bytes; + fn next(&mut self) -> Option { + let mut key = self.remaining_keys.next(); + std::mem::swap(&mut key, &mut self.current_key); + key } } @@ -1521,6 +1633,78 @@ where } } } + +struct TreeSequenceGetter< + 'a, + Index: EmbeddedIndex + Clone + Debug + 'static, + KeyEvaluator: for<'k> FnMut(SequenceId, &'k BySequenceIndex) -> ScanEvaluation, + KeyReader: FnMut(SequenceId, BySequenceIndex, ArcBytes<'static>) -> Result<(), Error>, + Keys: Iterator, +> { + from_transaction: bool, + state: &'a State>, + vault: Option<&'a dyn AnyVault>, + cache: Option<&'a ChunkCache>, + keys: Keys, + key_evaluator: KeyEvaluator, + key_reader: KeyReader, +} + +impl<'a, KeyEvaluator, KeyReader, Index, Keys> FileOp> + for TreeSequenceGetter<'a, Index, KeyEvaluator, KeyReader, Keys> +where + KeyEvaluator: for<'k> FnMut(SequenceId, &'k BySequenceIndex) -> ScanEvaluation, + KeyReader: FnMut(SequenceId, BySequenceIndex, ArcBytes<'static>) -> Result<(), Error>, + Keys: Iterator, + Index: EmbeddedIndex + Clone + Debug + 'static, +{ + fn execute(mut self, file: &mut dyn File) -> Result<(), Error> { + if self.from_transaction { + let state = self.state.lock(); + if state.file_id != file.id() { + return Err(Error::from(ErrorKind::TreeCompacted)); + } + + state.root.by_sequence_root.get_multiple( + &mut self + .keys + .into_iter() + .map(|sequence| sequence.0.to_be_bytes()), + &mut |key, index| { + (self.key_evaluator)(SequenceId::try_from(key.as_slice()).unwrap(), index) + }, + &mut |key, value, index| { + (self.key_reader)(SequenceId::try_from(key.as_slice()).unwrap(), index, value) + }, + file, + self.vault, + self.cache, + ) + } else { + let state = self.state.read(); + if state.file_id != file.id() { + return Err(Error::from(ErrorKind::TreeCompacted)); + } + + state.root.by_sequence_root.get_multiple( + &mut self + .keys + .into_iter() + .map(|sequence| sequence.0.to_be_bytes()), + &mut |key, index| { + (self.key_evaluator)(SequenceId::try_from(key.as_slice()).unwrap(), index) + }, + &mut |key, value, index| { + (self.key_reader)(SequenceId::try_from(key.as_slice()).unwrap(), index, value) + }, + file, + self.vault, + self.cache, + ) + } + } +} + struct TreeSequenceScanner< 'a, 'keys, @@ -1530,9 +1714,10 @@ struct TreeSequenceScanner< CallerError, Index, > where - KeyEvaluator: FnMut(&ArcBytes<'static>, &BySequenceIndex) -> ScanEvaluation, + KeyEvaluator: FnMut(&ArcBytes<'static>, &BySequenceIndex) -> ScanEvaluation, KeyRangeBounds: RangeBounds<&'keys [u8]> + ?Sized, - DataCallback: FnMut(KeySequence, ArcBytes<'static>) -> Result<(), AbortError>, + DataCallback: + FnMut(KeySequence, ArcBytes<'static>) -> Result<(), AbortError>, CallerError: Display + Debug, Index: EmbeddedIndex + Clone + Debug + 'static, { @@ -1558,9 +1743,10 @@ impl<'a, 'keys, KeyEvaluator, KeyRangeBounds, DataCallback, CallerError, Index> Index, > where - KeyEvaluator: FnMut(&ArcBytes<'static>, &BySequenceIndex) -> ScanEvaluation, + KeyEvaluator: FnMut(&ArcBytes<'static>, &BySequenceIndex) -> ScanEvaluation, KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized, - DataCallback: FnMut(KeySequence, ArcBytes<'static>) -> Result<(), AbortError>, + DataCallback: + FnMut(KeySequence, ArcBytes<'static>) -> Result<(), AbortError>, CallerError: Display + Debug, Index: EmbeddedIndex + Clone + Debug + 'static, { @@ -1577,13 +1763,14 @@ where .. } = self; let mapped_data_callback = - |key: ArcBytes<'static>, index: &BySequenceIndex, data: ArcBytes<'static>| { + |key: ArcBytes<'static>, index: &BySequenceIndex, data: ArcBytes<'static>| { let sequence = SequenceId(BigEndian::read_u64(&key)); (data_callback)( KeySequence { key: index.key.clone(), sequence, last_sequence: index.last_sequence, + embedded: index.embedded.clone(), }, data, ) diff --git a/nebari/src/tree/unversioned.rs b/nebari/src/tree/unversioned.rs index ba05442f96..ececddf077 100644 --- a/nebari/src/tree/unversioned.rs +++ b/nebari/src/tree/unversioned.rs @@ -11,9 +11,8 @@ use super::{ btree_entry::BTreeEntry, by_id::{ByIdStats, UnversionedByIdIndex}, modify::Modification, - read_chunk, serialization::BinarySerialization, - KeyRange, PagedWriter, ScanEvaluation, + PagedWriter, ScanEvaluation, }; use crate::{ chunk_cache::CacheEntry, @@ -266,38 +265,8 @@ where KeyReader: FnMut(ArcBytes<'static>, ArcBytes<'static>, Self::Index) -> Result<(), Error>, Keys: Iterator, { - let mut positions_to_read = Vec::new(); - self.by_id_root.get( - &mut KeyRange::new(keys), - &mut |key, index| key_evaluator(key, index), - &mut |key, index| { - // Deleted keys are stored with a 0 position. - if index.position > 0 { - positions_to_read.push((key, index.clone())); - } - Ok(()) - }, - file, - vault, - cache, - )?; - - // Sort by position on disk - positions_to_read.sort_by(|a, b| a.1.position.cmp(&b.1.position)); - - for (key, index) in positions_to_read { - if index.position > 0 { - match read_chunk(index.position, false, file, vault, cache)? { - CacheEntry::ArcBytes(contents) => { - key_reader(key, contents, index)?; - } - CacheEntry::Decoded(_) => unreachable!(), - }; - } else { - key_reader(key, ArcBytes::default(), index)?; - } - } - Ok(()) + self.by_id_root + .get_multiple(keys, key_evaluator, key_reader, file, vault, cache) } fn scan< diff --git a/nebari/src/tree/versioned.rs b/nebari/src/tree/versioned.rs index eae9a9abe1..5f12302b25 100644 --- a/nebari/src/tree/versioned.rs +++ b/nebari/src/tree/versioned.rs @@ -1,4 +1,5 @@ use std::{ + array::TryFromSliceError, collections::HashMap, fmt::{Debug, Display}, marker::PhantomData, @@ -12,9 +13,8 @@ use super::{ by_id::{ByIdStats, VersionedByIdIndex}, by_sequence::{BySequenceIndex, BySequenceStats}, modify::Modification, - read_chunk, serialization::BinarySerialization, - KeyRange, PagedWriter, ScanEvaluation, PAGE_SIZE, + PagedWriter, ScanEvaluation, PAGE_SIZE, }; use crate::{ chunk_cache::CacheEntry, @@ -52,7 +52,7 @@ where /// The last sequence ID inside of this root. pub sequence: SequenceId, /// The by-sequence B-Tree. - pub by_sequence_root: BTreeEntry, + pub by_sequence_root: BTreeEntry, BySequenceStats>, /// The by-id B-Tree. pub by_id_root: BTreeEntry, ByIdStats>, @@ -97,7 +97,11 @@ where { fn modify_sequence_root( &mut self, - mut modification: Modification<'_, BySequenceIndex, BySequenceIndex>, + mut modification: Modification< + '_, + BySequenceIndex, + BySequenceIndex, + >, writer: &mut PagedWriter<'_>, max_order: Option, ) -> Result<(), Error> { @@ -123,13 +127,14 @@ where minimum_children: by_sequence_minimum_children, indexer: &mut |_key: &ArcBytes<'_>, - value: Option<&BySequenceIndex>, - _existing_index: Option<&BySequenceIndex>, - _changes: &mut EntryChanges, + value: Option<&BySequenceIndex>, + _existing_index: Option<&BySequenceIndex>, + _changes: &mut EntryChanges, _writer: &mut PagedWriter<'_>| { Ok(KeyOperation::Set(value.unwrap().clone())) }, - loader: |_index: &BySequenceIndex, _writer: &mut PagedWriter<'_>| Ok(None), + loader: |_index: &BySequenceIndex, + _writer: &mut PagedWriter<'_>| Ok(None), reducer: BySequenceReducer, _phantom: PhantomData, }, @@ -152,7 +157,7 @@ where fn modify_id_root( &mut self, mut modification: Modification<'_, ArcBytes<'static>, VersionedByIdIndex>, - changes: &mut EntryChanges, + changes: &mut EntryChanges, writer: &mut PagedWriter<'_>, max_order: Option, ) -> Result>>, Error> { @@ -178,7 +183,7 @@ where indexer: &mut |key: &ArcBytes<'_>, value: Option<&ArcBytes<'static>>, existing_index: Option<&VersionedByIdIndex>, - changes: &mut EntryChanges, + changes: &mut EntryChanges, writer: &mut PagedWriter<'_>| { let (position, value_size) = if let Some(value) = value { let new_position = writer.write_chunk(value)?; @@ -200,6 +205,7 @@ where key: key.clone(), sequence: changes.current_sequence, last_sequence: existing_index.map(|idx| idx.sequence_id), + embedded: Some(embedded.clone()), }, value_position: position, value_size, @@ -376,6 +382,7 @@ where last_sequence: change.key_sequence.last_sequence, value_length: change.value_size, position: change.value_position, + embedded: change.key_sequence.embedded, }); ArcBytes::from(change.key_sequence.sequence.0.to_be_bytes()) }) @@ -410,38 +417,8 @@ where KeyReader: FnMut(ArcBytes<'static>, ArcBytes<'static>, Self::Index) -> Result<(), Error>, Keys: Iterator, { - let mut positions_to_read = Vec::new(); - self.by_id_root.get( - &mut KeyRange::new(keys), - &mut |key, index| key_evaluator(key, index), - &mut |key, index| { - // Deleted keys are stored with a 0 position. - if index.position > 0 { - positions_to_read.push((key, index.clone())); - } - Ok(()) - }, - file, - vault, - cache, - )?; - - // Sort by position on disk - positions_to_read.sort_by(|a, b| a.1.position.cmp(&b.1.position)); - - for (key, index) in positions_to_read { - if index.position > 0 { - match read_chunk(index.position, false, file, vault, cache)? { - CacheEntry::ArcBytes(contents) => { - key_reader(key, contents, index)?; - } - CacheEntry::Decoded(_) => unreachable!(), - }; - } else { - key_reader(key, ArcBytes::default(), index)?; - } - } - Ok(()) + self.by_id_root + .get_multiple(keys, key_evaluator, key_reader, file, vault, cache) } fn scan< @@ -519,6 +496,7 @@ where last_sequence: None, value_length: index.value_length, position: new_position, + embedded: Some(index.embedded.clone()), }, )); @@ -555,13 +533,13 @@ where current_order: by_sequence_order, minimum_children, indexer: &mut |_key: &ArcBytes<'_>, - value: Option<&BySequenceIndex>, - _existing_index: Option<&BySequenceIndex>, - _changes: &mut EntryChanges, + value: Option<&BySequenceIndex>, + _existing_index: Option<&BySequenceIndex>, + _changes: &mut EntryChanges, _writer: &mut PagedWriter<'_>| { Ok(KeyOperation::Set(value.unwrap().clone())) }, - loader: |_index: &BySequenceIndex, _writer: &mut PagedWriter<'_>| unreachable!(), + loader: |_index: &BySequenceIndex, _writer: &mut PagedWriter<'_>| unreachable!(), reducer: BySequenceReducer, _phantom: PhantomData, }, @@ -574,24 +552,61 @@ where } } -#[derive(Default)] -pub struct EntryChanges { +pub struct EntryChanges { pub current_sequence: SequenceId, - pub changes: Vec, + pub changes: Vec>, +} + +impl Default for EntryChanges { + fn default() -> Self { + Self { + current_sequence: SequenceId::default(), + changes: Vec::default(), + } + } } -pub struct EntryChange { - pub key_sequence: KeySequence, + +pub struct EntryChange { + pub key_sequence: KeySequence, pub value_position: u64, pub value_size: u32, } /// A stored revision of a key. #[derive(Debug)] -pub struct KeySequence { +pub struct KeySequence { /// The key that this entry was written for. pub key: ArcBytes<'static>, /// The unique sequence id. pub sequence: SequenceId, /// The previous sequence id for this key, if any. pub last_sequence: Option, + /// The embedded index stored for this sequence. + pub embedded: Option, +} + +impl<'a> TryFrom<&'a [u8]> for SequenceId { + type Error = TryFromSliceError; + + fn try_from(value: &'a [u8]) -> Result { + value.try_into().map(u64::from_be_bytes).map(Self) + } +} + +/// A stored entry in a versioned tree. +#[derive(Debug)] +pub struct SequenceEntry { + /// The stored index for this sequence id. + pub index: BySequenceIndex, + /// The value stored for this sequence id, if still present. + pub value: Option>, +} + +/// A stored index in a versioned tree. +#[derive(Debug)] +pub struct SequenceIndex { + /// The unique sequence id. + pub sequence: SequenceId, + /// The stored index for this sequence id. + pub index: BySequenceIndex, }