Skip to content

Commit

Permalink
Added get_multiple[*]_by_sequence
Browse files Browse the repository at this point in the history
  • Loading branch information
ecton committed May 12, 2022
1 parent 493a70e commit 9507fe5
Show file tree
Hide file tree
Showing 8 changed files with 577 additions and 131 deletions.
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
you wish to remove the overhead of synchronizing, passing
`PersistenceMode::Flush` will only ensure all application-level caches are
flushed before confirming the write is successful.
- `BySequenceIndex` now has a generic parameter and a new field: the embedded
index. When a versioned tree is written, the current embedded index is copied
to the `BySequenceIndex`, allowing for historical retrieval of index values.

For example, BonsaiDb is using the embedded index to store the document's
hash. This change allows for historical lookups to retrieve the document hash
without fetching the value from disk.

When retrieving a `BySequenceIndex` that was stored prior to this change, the
embedded index will be None. For all indexes written after this change, the
embedded index will be present. If you are only working with files written
after this change, it is safe to unwrap the embedded index.

### Added

Expand All @@ -99,6 +111,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `Tree` and `TransactionTree` now both have `current_sequence_id()` and
`scan_sequences()` functions. These functions serve the same purpose as those
already existing on `TreeFile`.
- `TreeFile`, `Tree`, and `TransactionTree` now have additional methods that
allow fetching versioned tree's information by `SequenceId``:

- `get_multiple_by_sequence` - retrieve one or more values by their sequence
id. This retrieves the value at the time of the sequence id.
- `get_multiple_indexes_by_sequence` - retrieve one or more indexes by their
sequence id.
- `get_multiple_with_indexes_by_sequence` - retrieve one or more values and
indexes by their sequence id.

When a sequence is found in any of these functions, a result is returned.
Because an index will still be present for deleted keys, all retrievals of
values via this method will return an Option. This allows callers to
distinguish between `SequenceId` being not found and the value being deleted.

## v0.5.3

Expand Down
80 changes: 73 additions & 7 deletions nebari/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::{convert::Infallible, fmt::Display};
use std::{
array::TryFromSliceError,
convert::Infallible,
fmt::{Debug, Display},
};

use backtrace::Backtrace;
use parking_lot::{Mutex, MutexGuard};
Expand All @@ -7,7 +11,6 @@ use thiserror::Error;
use crate::AbortError;

/// An error from Nebari as well as an associated backtrace.
#[derive(Debug)]
pub struct Error {
/// The error that occurred.
pub kind: ErrorKind,
Expand All @@ -29,6 +32,50 @@ impl Error {
backtrace.resolve();
backtrace
}

fn format_backtrace_frames(&self) -> Vec<String> {
let mut backtrace = self.backtrace.lock();
backtrace.resolve();
backtrace
.frames()
.iter()
.filter_map(|frame| frame.symbols().first())
.enumerate()
.map(|(index, symbol)| {
let mut line = format!("{index}: ");
if let Some(name) = symbol.name() {
line.push_str(&name.to_string());
line.push(' ');
} else if let Some(addr) = symbol.addr() {
line.push_str(&format!("{:x}", addr as usize));
line.push(' ');
} else {
// Give up on formatting this one.
line.push_str(&format!("{symbol:?}"));
return line;
}

if let Some(file) = symbol.filename() {
if let Some(file) = file.to_str() {
line.push_str("at ");
line.push_str(file);
} else {
line.push_str(&format!("at {file:?}"));
}

if let Some(lineno) = symbol.lineno() {
line.push(':');
line.push_str(&lineno.to_string());
if let Some(col) = symbol.colno() {
line.push(':');
line.push_str(&col.to_string());
}
}
}
line
})
.collect()
}
}

impl std::error::Error for Error {
Expand All @@ -39,23 +86,31 @@ impl std::error::Error for Error {

impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.kind.fmt(f)?;
Display::fmt(&self.kind, f)?;

#[cfg(debug_assertions)]
{
f.write_str("\nstack backtrace:")?;
let mut backtrace = self.backtrace.lock();
backtrace.resolve();

for (index, frame) in backtrace.frames().iter().enumerate() {
write!(f, "\n#{}: {:?}", index, frame)?;
for (index, frame) in self.format_backtrace_frames().into_iter().enumerate() {
write!(f, "{index}: {frame}")?;
}
}

Ok(())
}
}

impl Debug for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let frames = self.format_backtrace_frames();
f.debug_struct("Error")
.field("kind", &self.kind)
.field("backtrace", &&frames[..])
.finish()
}
}

impl From<ErrorKind> for Error {
fn from(kind: ErrorKind) -> Self {
Self {
Expand Down Expand Up @@ -116,6 +171,15 @@ impl From<String> for Error {
}
}

impl From<TryFromSliceError> for Error {
fn from(_: TryFromSliceError) -> Self {
Self {
kind: ErrorKind::Internal(InternalError::IncorrectByteLength),
backtrace: Mutex::new(Backtrace::new_unresolved()),
}
}
}

/// An error from Nebari.
#[derive(Debug, Error)]
#[error(transparent)]
Expand Down Expand Up @@ -219,4 +283,6 @@ pub enum InternalError {
TransactionManagerStopped,
#[error("an error on an internal channel has occurred")]
InternalCommunication,
#[error("an unexpected byte length was encountered")]
IncorrectByteLength,
}
129 changes: 122 additions & 7 deletions nebari/src/roots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use crate::{
root::{AnyReducer, AnyTreeRoot},
state::AnyTreeState,
EmbeddedIndex, KeySequence, Modification, ModificationResult, Operation, PersistenceMode,
ScanEvaluation, SequenceId, State, TransactableCompaction, TreeFile, TreeRoot,
VersionedTreeRoot,
ScanEvaluation, SequenceEntry, SequenceId, SequenceIndex, State, TransactableCompaction,
TreeFile, TreeRoot, VersionedTreeRoot,
},
vault::AnyVault,
ArcBytes, ChunkCache, ErrorKind,
Expand Down Expand Up @@ -446,13 +446,56 @@ where
) -> Result<(), AbortError<CallerError>>
where
Range: RangeBounds<SequenceId> + Debug + 'static,
KeyEvaluator: FnMut(KeySequence) -> ScanEvaluation,
DataCallback: FnMut(KeySequence, ArcBytes<'static>) -> Result<(), AbortError<CallerError>>,
KeyEvaluator: FnMut(KeySequence<Index>) -> ScanEvaluation,
DataCallback:
FnMut(KeySequence<Index>, ArcBytes<'static>) -> Result<(), AbortError<CallerError>>,
CallerError: Display + Debug,
{
self.tree
.scan_sequences(range, forwards, true, key_evaluator, data_callback)
}

/// Retrieves the keys and values associated with one or more `sequences`.
/// The value retrieved is the value of the key at the given [`SequenceId`].
/// If a sequence is not found, it will not appear in the result map. If
/// the value was removed, None is returned for the value.
pub fn get_multiple_by_sequence<Sequences>(
&mut self,
sequences: Sequences,
) -> Result<HashMap<SequenceId, (ArcBytes<'static>, Option<ArcBytes<'static>>)>, Error>
where
Sequences: Iterator<Item = SequenceId>,
{
self.tree.get_multiple_by_sequence(sequences, true)
}

/// Retrieves the keys and indexes associated with one or more `sequences`.
/// The value retrieved is the value of the key at the given [`SequenceId`].
/// If a sequence is not found, it will not appear in the result list.
pub fn get_multiple_indexes_by_sequence<Sequences>(
&mut self,
sequences: Sequences,
) -> Result<Vec<SequenceIndex<Index>>, Error>
where
Sequences: Iterator<Item = SequenceId>,
{
self.tree.get_multiple_indexes_by_sequence(sequences, true)
}

/// Retrieves the keys, values, and indexes associated with one or more
/// `sequences`. The value retrieved is the value of the key at the given
/// [`SequenceId`]. If a sequence is not found, it will not appear in the
/// result list.
pub fn get_multiple_with_indexes_by_sequence<Sequences>(
&mut self,
sequences: Sequences,
) -> Result<HashMap<SequenceId, SequenceEntry<Index>>, Error>
where
Sequences: Iterator<Item = SequenceId>,
{
self.tree
.get_multiple_with_indexes_by_sequence(sequences, true)
}
}

impl<Root: tree::Root, File: ManagedFile> TransactionTree<Root, File> {
Expand Down Expand Up @@ -1367,16 +1410,17 @@ where
/// invoked with the key and values. The callback may not be invoked in the
/// same order as the keys are scanned.
pub fn scan_sequences<CallerError, Range, KeyEvaluator, DataCallback>(
&mut self,
&self,
range: Range,
forwards: bool,
key_evaluator: &mut KeyEvaluator,
data_callback: &mut DataCallback,
) -> Result<(), AbortError<CallerError>>
where
Range: Clone + RangeBounds<SequenceId> + Debug + 'static,
KeyEvaluator: FnMut(KeySequence) -> ScanEvaluation,
DataCallback: FnMut(KeySequence, ArcBytes<'static>) -> Result<(), AbortError<CallerError>>,
KeyEvaluator: FnMut(KeySequence<Index>) -> ScanEvaluation,
DataCallback:
FnMut(KeySequence<Index>, ArcBytes<'static>) -> Result<(), AbortError<CallerError>>,
CallerError: Display + Debug,
{
catch_compaction_and_retry_abortable(|| {
Expand All @@ -1390,6 +1434,77 @@ where
tree.scan_sequences(range.clone(), forwards, false, key_evaluator, data_callback)
})
}

/// Retrieves the keys and values associated with one or more `sequences`.
/// The value retrieved is the value of the key at the given [`SequenceId`].
/// If a sequence is not found, it will not appear in the result map. If
/// the value was removed, None is returned for the value.
#[allow(clippy::needless_pass_by_value)]
pub fn get_multiple_by_sequence<Sequences>(
&mut self,
sequences: Sequences,
) -> Result<HashMap<SequenceId, (ArcBytes<'static>, Option<ArcBytes<'static>>)>, Error>
where
Sequences: Iterator<Item = SequenceId> + Clone,
{
catch_compaction_and_retry(|| {
let mut tree = TreeFile::<VersionedTreeRoot<Index>, File>::read(
self.path(),
self.state.clone(),
self.roots.context(),
Some(self.roots.transactions()),
)?;

tree.get_multiple_by_sequence(sequences.clone(), false)
})
}

/// Retrieves the keys and indexes associated with one or more `sequences`.
/// The value retrieved is the value of the key at the given [`SequenceId`].
/// If a sequence is not found, it will not appear in the result list.
#[allow(clippy::needless_pass_by_value)]
pub fn get_multiple_indexes_by_sequence<Sequences>(
&mut self,
sequences: Sequences,
) -> Result<Vec<SequenceIndex<Index>>, Error>
where
Sequences: Iterator<Item = SequenceId> + Clone,
{
catch_compaction_and_retry(|| {
let mut tree = TreeFile::<VersionedTreeRoot<Index>, File>::read(
self.path(),
self.state.clone(),
self.roots.context(),
Some(self.roots.transactions()),
)?;

tree.get_multiple_indexes_by_sequence(sequences.clone(), false)
})
}

/// Retrieves the keys, values, and indexes associated with one or more
/// `sequences`. The value retrieved is the value of the key at the given
/// [`SequenceId`]. If a sequence is not found, it will not appear in the
/// result list.
#[allow(clippy::needless_pass_by_value)]
pub fn get_multiple_with_indexes_by_sequence<Sequences>(
&mut self,
sequences: Sequences,
) -> Result<HashMap<SequenceId, SequenceEntry<Index>>, Error>
where
Sequences: Iterator<Item = SequenceId> + Clone,
{
catch_compaction_and_retry(|| {
let mut tree = TreeFile::<VersionedTreeRoot<Index>, File>::read(
self.path(),
self.state.clone(),
self.roots.context(),
Some(self.roots.transactions()),
)?;

tree.get_multiple_with_indexes_by_sequence(sequences.clone(), false)
})
}
}

/// An error that could come from user code or Nebari.
Expand Down
Loading

0 comments on commit 9507fe5

Please sign in to comment.