Skip to content

Commit

Permalink
perf: stable row id prefilter (#2706)
Browse files Browse the repository at this point in the history
Cache the row id mask prefilter and make it faster to construct.
  • Loading branch information
wjones127 authored Aug 22, 2024
1 parent ef4632f commit eb87bfa
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 59 deletions.
16 changes: 15 additions & 1 deletion rust/lance-core/src/utils/deletion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ impl DeletionVector {
}
}

pub fn iter(&self) -> Box<dyn Iterator<Item = u32> + Send + '_> {
match self {
Self::NoDeletions => Box::new(std::iter::empty()),
Self::Set(set) => Box::new(set.iter().copied()),
Self::Bitmap(bitmap) => Box::new(bitmap.iter()),
}
}

pub fn into_sorted_iter(self) -> Box<dyn Iterator<Item = u32> + Send + 'static> {
match self {
Self::NoDeletions => Box::new(std::iter::empty()),
Expand Down Expand Up @@ -183,7 +191,13 @@ impl IntoIterator for DeletionVector {
fn into_iter(self) -> Self::IntoIter {
match self {
Self::NoDeletions => Box::new(std::iter::empty()),
Self::Set(set) => Box::new(set.into_iter()),
Self::Set(set) => {
// In many cases, it's much better if this is sorted. It's
// guaranteed to be small, so the cost is low.
let mut sorted = set.into_iter().collect::<Vec<_>>();
sorted.sort();
Box::new(sorted.into_iter())
}
Self::Bitmap(bitmap) => Box::new(bitmap.into_iter()),
}
}
Expand Down
14 changes: 12 additions & 2 deletions rust/lance-core/src/utils/mask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{collections::BTreeMap, io::Read};
use arrow_array::{Array, BinaryArray, GenericBinaryArray};
use arrow_buffer::{Buffer, NullBuffer, OffsetBuffer};
use byteorder::{ReadBytesExt, WriteBytesExt};
use deepsize::DeepSizeOf;
use roaring::RoaringBitmap;

use crate::Result;
Expand All @@ -23,7 +24,7 @@ use super::address::RowAddress;
///
/// If both the allow_list and the block_list are None (the default) then
/// all row ids are selected
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, DeepSizeOf)]
pub struct RowIdMask {
/// If Some then only these row ids are selected
pub allow_list: Option<RowIdTreeMap>,
Expand Down Expand Up @@ -273,7 +274,7 @@ impl std::ops::BitOr for RowIdMask {
///
/// This is similar to a [RoaringTreemap] but it is optimized for the case where
/// entire fragments are selected or deselected.
#[derive(Clone, Debug, Default, PartialEq)]
#[derive(Clone, Debug, Default, PartialEq, DeepSizeOf)]
pub struct RowIdTreeMap {
/// The contents of the set. If there is a pair (k, Full) then the entire
/// fragment k is selected. If there is a pair (k, Partial(v)) then the
Expand All @@ -287,6 +288,15 @@ enum RowIdSelection {
Partial(RoaringBitmap),
}

impl DeepSizeOf for RowIdSelection {
fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
match self {
Self::Full => 0,
Self::Partial(bitmap) => bitmap.serialized_size(),
}
}
}

impl RowIdTreeMap {
/// Create an empty set
pub fn new() -> Self {
Expand Down
111 changes: 94 additions & 17 deletions rust/lance-table/src/rowids.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,23 +153,33 @@ impl RowIdSequence {
}

/// Delete row ids by position.
pub fn mask(&mut self, positions: impl IntoIterator<Item = usize>) -> Result<()> {
let row_ids = positions
.into_iter()
.map(|pos| {
self.get(pos).ok_or_else(|| {
Error::invalid_input(
format!(
"position out of bounds: {} on sequence of length {}",
pos,
self.len()
),
location!(),
)
})
})
.collect::<Result<Vec<_>>>()?;
self.delete(row_ids);
pub fn mask(&mut self, positions: impl IntoIterator<Item = u32>) -> Result<()> {
let mut local_positions = Vec::new();
let mut positions_iter = positions.into_iter();
let mut curr_position = positions_iter.next();
let mut offset = 0;
let mut cutoff = 0;

for segment in &mut self.0 {
// Make vector of local positions
cutoff += segment.len() as u32;
while let Some(position) = curr_position {
if position >= cutoff {
break;
}
local_positions.push(position - offset);
curr_position = positions_iter.next();
}

if !local_positions.is_empty() {
segment.mask(&local_positions);
local_positions.clear();
}
offset = cutoff;
}

self.0.retain(|segment| segment.len() != 0);

Ok(())
}

Expand Down Expand Up @@ -753,4 +763,71 @@ mod test {
.collect::<RowIdTreeMap>();
assert_eq!(tree_map, expected);
}

#[test]
fn test_row_id_mask() {
// 0, 1, 2, 3, 4
// 50, 51, 52, 55, 56, 57, 58, 59
// 7, 9
// 10, 12, 14
// 35, 39
let sequence = RowIdSequence(vec![
U64Segment::Range(0..5),
U64Segment::RangeWithHoles {
range: 50..60,
holes: vec![53, 54].into(),
},
U64Segment::SortedArray(vec![7, 9].into()),
U64Segment::RangeWithBitmap {
range: 10..15,
bitmap: [true, false, true, false, true].as_slice().into(),
},
U64Segment::Array(vec![35, 39].into()),
]);

// Masking one in each segment
let values_to_remove = [4, 55, 7, 12, 39];
let positions_to_remove = sequence
.iter()
.enumerate()
.filter_map(|(i, val)| {
if values_to_remove.contains(&val) {
Some(i as u32)
} else {
None
}
})
.collect::<Vec<_>>();
let mut sequence = sequence;
sequence.mask(positions_to_remove).unwrap();
let expected = RowIdSequence(vec![
U64Segment::Range(0..4),
U64Segment::RangeWithBitmap {
range: 50..60,
bitmap: [
true, true, true, false, false, false, true, true, true, true,
]
.as_slice()
.into(),
},
U64Segment::Range(9..10),
U64Segment::RangeWithBitmap {
range: 10..15,
bitmap: [true, false, false, false, true].as_slice().into(),
},
U64Segment::Array(vec![35].into()),
]);
assert_eq!(sequence, expected);
}

#[test]
fn test_row_id_mask_everything() {
let mut sequence = RowIdSequence(vec![
U64Segment::Range(0..5),
U64Segment::SortedArray(vec![7, 9].into()),
]);
sequence.mask(0..sequence.len() as u32).unwrap();
let expected = RowIdSequence(vec![]);
assert_eq!(sequence, expected);
}
}
53 changes: 53 additions & 0 deletions rust/lance-table/src/rowids/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,59 @@ impl U64Segment {
let stats = Self::compute_stats(make_new_iter());
Self::from_stats_and_sequence(stats, make_new_iter())
}

pub fn mask(&mut self, positions: &[u32]) {
if positions.is_empty() {
return;
}
if positions.len() == self.len() {
*self = Self::Range(0..0);
return;
}
let count = (self.len() - positions.len()) as u64;
let sorted = match self {
Self::Range(_) => true,
Self::RangeWithHoles { .. } => true,
Self::RangeWithBitmap { .. } => true,
Self::SortedArray(_) => true,
Self::Array(_) => false,
};
// To get minimum, need to find the first value that is not masked.
let first_unmasked = (0..self.len())
.zip(positions.iter().cycle())
.find(|(sequential_i, i)| **i != *sequential_i as u32)
.map(|(sequential_i, _)| sequential_i)
.unwrap();
let min = self.get(first_unmasked).unwrap();

let last_unmasked = (0..self.len())
.rev()
.zip(positions.iter().rev().cycle())
.filter(|(sequential_i, i)| **i != *sequential_i as u32)
.map(|(sequential_i, _)| sequential_i)
.next()
.unwrap();
let max = self.get(last_unmasked).unwrap();

let stats = SegmentStats {
min,
max,
count,
sorted,
};

let mut positions = positions.iter().copied().peekable();
let sequence = self.iter().enumerate().filter_map(move |(i, val)| {
if let Some(next_pos) = positions.peek() {
if *next_pos == i as u32 {
positions.next();
return None;
}
}
Some(val)
});
*self = Self::from_stats_and_sequence(stats, sequence)
}
}

#[cfg(test)]
Expand Down
14 changes: 13 additions & 1 deletion rust/lance/src/dataset/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,13 +738,25 @@ async fn rechunk_stable_row_ids(
let deletions = read_deletion_file(&dataset.base, frag, dataset.object_store()).await?;
if let Some(deletions) = deletions {
let mut new_seq = seq.as_ref().clone();
new_seq.mask(deletions.into_iter().map(|x| x as usize))?;
new_seq.mask(deletions.into_iter())?;
*seq = Arc::new(new_seq);
}
Ok::<(), crate::Error>(())
})
.await?;

debug_assert_eq!(
{ old_sequences.iter().map(|(_, seq)| seq.len()).sum::<u64>() },
{
new_fragments
.iter()
.map(|frag| frag.physical_rows.unwrap() as u64)
.sum::<u64>()
},
"{:?}",
old_sequences
);

let new_sequences = lance_table::rowids::rechunk_sequences(
old_sequences
.into_iter()
Expand Down
Loading

0 comments on commit eb87bfa

Please sign in to comment.