Skip to content

Commit

Permalink
feat: merge tree dedup reader (GreptimeTeam#3375)
Browse files Browse the repository at this point in the history
* feat: add dedup option to merge tree component

* feat: impl dedup reader for shard reader

* refactor: DedupReader::new to DedupReader::try_new

* refactor: remove DedupReader::current_key field

* fix: some cr comments

* fix: fmt

* fix: remove shard_id method from DedupSource
  • Loading branch information
v0y4g3r authored Feb 24, 2024
1 parent abbfd23 commit afe4633
Show file tree
Hide file tree
Showing 8 changed files with 425 additions and 87 deletions.
6 changes: 5 additions & 1 deletion src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

//! Memtable implementation based on a merge tree.
mod data;
pub(crate) mod data;
mod dedup;
mod dict;
mod merger;
mod metrics;
Expand Down Expand Up @@ -59,13 +60,16 @@ pub struct MergeTreeConfig {
pub index_max_keys_per_shard: usize,
/// Number of rows to freeze a data part.
pub data_freeze_threshold: usize,
/// Whether to delete duplicates rows.
pub dedup: bool,
}

impl Default for MergeTreeConfig {
fn default() -> Self {
Self {
index_max_keys_per_shard: 8192,
data_freeze_threshold: 102400,
dedup: true,
}
}
}
Expand Down
156 changes: 99 additions & 57 deletions src/mito2/src/memtable/merge_tree/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ pub(crate) struct DataBatchRange {

impl DataBatchRange {
pub(crate) fn len(&self) -> usize {
(self.start..self.end).len()
self.end - self.start
}

pub(crate) fn is_empty(&self) -> bool {
(self.start..self.end).is_empty()
self.len() == 0
}
}

Expand Down Expand Up @@ -163,6 +163,10 @@ impl<'a> DataBatch<'a> {
},
}
}

pub(crate) fn num_rows(&self) -> usize {
self.range.len()
}
}

/// Buffer for the value part (pk_index, ts, sequence, op_type, field columns) in a shard.
Expand All @@ -180,11 +184,13 @@ pub struct DataBuffer {
op_type_builder: UInt8VectorBuilder,
/// Builders for field columns.
field_builders: Vec<LazyMutableVectorBuilder>,

dedup: bool,
}

impl DataBuffer {
/// Creates a `DataBuffer` instance with given schema and capacity.
pub fn with_capacity(metadata: RegionMetadataRef, init_capacity: usize) -> Self {
pub fn with_capacity(metadata: RegionMetadataRef, init_capacity: usize, dedup: bool) -> Self {
let ts_builder = metadata
.time_index_column()
.column_schema
Expand All @@ -209,6 +215,7 @@ impl DataBuffer {
sequence_builder,
op_type_builder,
field_builders,
dedup,
}
}

Expand Down Expand Up @@ -237,7 +244,13 @@ impl DataBuffer {
pk_weights: Option<&[u16]>,
replace_pk_index: bool,
) -> Result<DataPart> {
let encoder = DataPartEncoder::new(&self.metadata, pk_weights, None, replace_pk_index);
let encoder = DataPartEncoder::new(
&self.metadata,
pk_weights,
None,
replace_pk_index,
self.dedup,
);
let parts = encoder.write(self)?;
Ok(parts)
}
Expand All @@ -246,13 +259,12 @@ impl DataBuffer {
/// If pk_weights is present, yielded rows are sorted according to weights,
/// otherwise rows are sorted by "pk_weights" values as they are actually weights.
pub fn read(&mut self, pk_weights: Option<&[u16]>) -> Result<DataBufferReader> {
// todo(hl): control whether to dedup while invoking `read`.
let batch = data_buffer_to_record_batches(
self.data_part_schema.clone(),
self,
pk_weights,
true,
true,
self.dedup,
// replace_pk_index is always set to false since:
// - for DataBuffer in ShardBuilder, pk dict is not frozen
// - for DataBuffer in Shard, values in pk_index column has already been replaced during `freeze`.
Expand Down Expand Up @@ -629,6 +641,7 @@ struct DataPartEncoder<'a> {
pk_weights: Option<&'a [u16]>,
row_group_size: Option<usize>,
replace_pk_index: bool,
dedup: bool,
}

impl<'a> DataPartEncoder<'a> {
Expand All @@ -637,13 +650,15 @@ impl<'a> DataPartEncoder<'a> {
pk_weights: Option<&'a [u16]>,
row_group_size: Option<usize>,
replace_pk_index: bool,
dedup: bool,
) -> DataPartEncoder<'a> {
let schema = memtable_schema_to_encoded_schema(metadata);
Self {
schema,
pk_weights,
row_group_size,
replace_pk_index,
dedup,
}
}

Expand All @@ -663,7 +678,7 @@ impl<'a> DataPartEncoder<'a> {
source,
self.pk_weights,
false,
true,
self.dedup,
self.replace_pk_index,
)?;
writer.write(&rb).context(error::EncodeMemtableSnafu)?;
Expand Down Expand Up @@ -803,9 +818,9 @@ pub struct DataParts {
}

impl DataParts {
pub(crate) fn new(metadata: RegionMetadataRef, capacity: usize) -> Self {
pub(crate) fn new(metadata: RegionMetadataRef, capacity: usize, dedup: bool) -> Self {
Self {
active: DataBuffer::with_capacity(metadata, capacity),
active: DataBuffer::with_capacity(metadata, capacity, dedup),
frozen: Vec::new(),
}
}
Expand Down Expand Up @@ -868,6 +883,29 @@ impl DataPartsReader {
}
}

#[cfg(test)]
pub(crate) fn write_rows_to_buffer(
buffer: &mut DataBuffer,
schema: &RegionMetadataRef,
pk_index: u16,
ts: Vec<i64>,
v0: Vec<Option<f64>>,
sequence: u64,
) {
let kvs = crate::test_util::memtable_util::build_key_values_with_ts_seq_values(
schema,
"whatever".to_string(),
1,
ts.into_iter(),
v0.into_iter(),
sequence,
);

for kv in kvs.iter() {
buffer.write_row(pk_index, kv);
}
}

#[cfg(test)]
mod tests {
use datafusion::arrow::array::Float64Array;
Expand All @@ -876,7 +914,7 @@ mod tests {
use parquet::data_type::AsBytes;

use super::*;
use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
use crate::test_util::memtable_util::{extract_data_batch, metadata_for_test};

#[test]
fn test_lazy_mutable_vector_builder() {
Expand All @@ -900,7 +938,7 @@ mod tests {

fn check_test_data_buffer_to_record_batches(keep_data: bool) {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);

write_rows_to_buffer(&mut buffer, &meta, 0, vec![1, 2], vec![Some(0.1), None], 1);
write_rows_to_buffer(&mut buffer, &meta, 1, vec![1, 2], vec![Some(1.1), None], 2);
Expand Down Expand Up @@ -968,10 +1006,50 @@ mod tests {
check_test_data_buffer_to_record_batches(false);
}

fn check_data_buffer_dedup(dedup: bool) {
let metadata = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(metadata.clone(), 10, dedup);
write_rows_to_buffer(
&mut buffer,
&metadata,
0,
vec![2, 3],
vec![Some(1.0), Some(2.0)],
0,
);
write_rows_to_buffer(
&mut buffer,
&metadata,
0,
vec![1, 2],
vec![Some(1.1), Some(2.1)],
2,
);

let mut reader = buffer.read(Some(&[0])).unwrap();
let mut res = vec![];
while reader.is_valid() {
let batch = reader.current_data_batch();
res.push(extract_data_batch(&batch));
reader.next().unwrap();
}
if dedup {
assert_eq!(vec![(0, vec![(1, 2), (2, 3), (3, 1)])], res);
} else {
assert_eq!(vec![(0, vec![(1, 2), (2, 3), (2, 0), (3, 1)])], res);
}
}

#[test]
fn test_data_buffer_dedup() {
check_data_buffer_dedup(true);
check_data_buffer_dedup(false);
}

#[test]
fn test_data_buffer_to_record_batches_with_dedup() {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);

write_rows_to_buffer(&mut buffer, &meta, 0, vec![1, 2], vec![Some(0.1), None], 1);
write_rows_to_buffer(&mut buffer, &meta, 1, vec![2], vec![Some(1.1)], 2);
Expand Down Expand Up @@ -1026,7 +1104,7 @@ mod tests {
#[test]
fn test_data_buffer_to_record_batches_without_dedup() {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);

write_rows_to_buffer(&mut buffer, &meta, 0, vec![1, 2], vec![Some(0.1), None], 1);
write_rows_to_buffer(&mut buffer, &meta, 1, vec![1, 2], vec![Some(1.1), None], 2);
Expand Down Expand Up @@ -1064,35 +1142,13 @@ mod tests {
);
}

fn write_rows_to_buffer(
buffer: &mut DataBuffer,
schema: &RegionMetadataRef,
pk_index: u16,
ts: Vec<i64>,
v0: Vec<Option<f64>>,
sequence: u64,
) {
let kvs = build_key_values_with_ts_seq_values(
schema,
"whatever".to_string(),
1,
ts.into_iter(),
v0.into_iter(),
sequence,
);

for kv in kvs.iter() {
buffer.write_row(pk_index, kv);
}
}

fn check_data_buffer_freeze(
pk_weights: Option<&[u16]>,
replace_pk_weights: bool,
expected: &[(u16, Vec<(i64, u64)>)],
) {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);

// write rows with null values.
write_rows_to_buffer(
Expand All @@ -1113,21 +1169,7 @@ mod tests {
.unwrap();
while reader.is_valid() {
let batch = reader.current_data_batch();
let rb = batch.slice_record_batch();
let ts = timestamp_array_to_i64_slice(rb.column(1));
let sequence = rb
.column(2)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap()
.values();
let ts_and_seq = ts
.iter()
.zip(sequence.iter())
.map(|(ts, seq)| (*ts, *seq))
.collect::<Vec<_>>();
res.push((batch.pk_index(), ts_and_seq));

res.push(extract_data_batch(&batch));
reader.next().unwrap();
}
assert_eq!(expected, res);
Expand Down Expand Up @@ -1163,7 +1205,7 @@ mod tests {
#[test]
fn test_encode_data_buffer() {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);

// write rows with null values.
write_rows_to_buffer(
Expand All @@ -1181,7 +1223,7 @@ mod tests {

assert_eq!(4, buffer.num_rows());

let encoder = DataPartEncoder::new(&meta, Some(&[0, 1, 2]), None, true);
let encoder = DataPartEncoder::new(&meta, Some(&[0, 1, 2]), None, true, true);
let encoded = match encoder.write(&mut buffer).unwrap() {
DataPart::Parquet(data) => data.data,
};
Expand Down Expand Up @@ -1228,7 +1270,7 @@ mod tests {

fn check_iter_data_buffer(pk_weights: Option<&[u16]>, expected: &[Vec<f64>]) {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);

write_rows_to_buffer(
&mut buffer,
Expand Down Expand Up @@ -1268,7 +1310,7 @@ mod tests {
#[test]
fn test_iter_empty_data_buffer() {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
let mut iter = buffer.read(Some(&[0, 1, 3, 2])).unwrap();
check_buffer_values_equal(&mut iter, &[]);
}
Expand All @@ -1294,7 +1336,7 @@ mod tests {

fn check_iter_data_part(weights: &[u16], expected_values: &[Vec<f64>]) {
let meta = metadata_for_test();
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10);
let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);

write_rows_to_buffer(
&mut buffer,
Expand Down Expand Up @@ -1323,7 +1365,7 @@ mod tests {
4,
);

let encoder = DataPartEncoder::new(&meta, Some(weights), Some(4), true);
let encoder = DataPartEncoder::new(&meta, Some(weights), Some(4), true, true);
let encoded = encoder.write(&mut buffer).unwrap();

let mut iter = encoded.read().unwrap();
Expand Down
Loading

0 comments on commit afe4633

Please sign in to comment.