Skip to content

Commit

Permalink
feat: Implement iter for the new memtable (GreptimeTeam#3373)
Browse files Browse the repository at this point in the history
* chore: read shard builder

* chore: reuse pk weights

* chore: prune key

* chore: shard reader wip

* refactor: shard builder DataBatch

* feat: merge shard readers

* feat: return shard id in shard readers

* feat: impl partition reader

* chore: impl partition read

* feat: impl iter tree

* chore: save last yield pk id

* style: fix clippy

* refactor: rename ShardReaderImpl to ShardReader

* chore: address CR comment
  • Loading branch information
evenyag authored Feb 25, 2024
1 parent afe4633 commit 8059b95
Show file tree
Hide file tree
Showing 6 changed files with 666 additions and 89 deletions.
106 changes: 98 additions & 8 deletions src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ impl Memtable for MergeTreeMemtable {

fn iter(
&self,
_projection: Option<&[ColumnId]>,
_predicate: Option<Predicate>,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Result<BoxedBatchIterator> {
todo!()
self.tree.read(projection, predicate)
}

fn is_empty(&self) -> bool {
Expand Down Expand Up @@ -275,18 +275,22 @@ impl MemtableBuilder for MergeTreeMemtableBuilder {

#[cfg(test)]
mod tests {
use std::collections::BTreeSet;

use common_time::Timestamp;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{Int64Vector, TimestampMillisecondVector};

use super::*;
use crate::test_util::memtable_util;

#[test]
fn test_memtable_sorted_input() {
write_sorted_input(true);
write_sorted_input(false);
write_iter_sorted_input(true);
write_iter_sorted_input(false);
}

fn write_sorted_input(has_pk: bool) {
fn write_iter_sorted_input(has_pk: bool) {
let metadata = if has_pk {
memtable_util::metadata_with_primary_key(vec![1, 0], true)
} else {
Expand All @@ -298,7 +302,27 @@ mod tests {
let memtable = MergeTreeMemtable::new(1, metadata, None, &MergeTreeConfig::default());
memtable.write(&kvs).unwrap();

// TODO(yingwen): Test iter.
let expected_ts = kvs
.iter()
.map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
.collect::<BTreeSet<_>>();

let iter = memtable.iter(None, None).unwrap();
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<BTreeSet<_>>();
assert_eq!(expected_ts, read);

let stats = memtable.stats();
assert!(stats.bytes_allocated() > 0);
Expand Down Expand Up @@ -344,7 +368,36 @@ mod tests {
);
memtable.write(&kvs).unwrap();

// TODO(yingwen): Test iter.
let iter = memtable.iter(None, None).unwrap();
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.timestamps()
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap().0.value())
.collect::<Vec<_>>();
assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read);

let iter = memtable.iter(None, None).unwrap();
let read = iter
.flat_map(|batch| {
batch
.unwrap()
.sequences()
.iter_data()
.collect::<Vec<_>>()
.into_iter()
})
.map(|v| v.unwrap())
.collect::<Vec<_>>();
assert_eq!(vec![8, 0, 6, 1, 7, 5, 4, 9], read);

let stats = memtable.stats();
assert!(stats.bytes_allocated() > 0);
Expand All @@ -353,4 +406,41 @@ mod tests {
stats.time_range()
);
}

#[test]
fn test_memtable_projection() {
write_iter_projection(true);
write_iter_projection(false);
}

fn write_iter_projection(has_pk: bool) {
let metadata = if has_pk {
memtable_util::metadata_with_primary_key(vec![1, 0], true)
} else {
memtable_util::metadata_with_primary_key(vec![], false)
};
// Try to build a memtable via the builder.
let memtable = MergeTreeMemtableBuilder::new(None).build(1, &metadata);

let expect = (0..100).collect::<Vec<_>>();
let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
memtable.write(&kvs).unwrap();
let iter = memtable.iter(Some(&[3]), None).unwrap();

let mut v0_all = vec![];
for res in iter {
let batch = res.unwrap();
assert_eq!(1, batch.fields().len());
let v0 = batch
.fields()
.first()
.unwrap()
.data
.as_any()
.downcast_ref::<Int64Vector>()
.unwrap();
v0_all.extend(v0.iter_data().map(|v| v.unwrap()));
}
assert_eq!(expect, v0_all);
}
}
13 changes: 7 additions & 6 deletions src/mito2/src/memtable/merge_tree/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ impl DictBuilderReader {
}

/// Returns pk weights to sort a data part and replaces pk indices.
pub(crate) fn pk_weights_to_sort_data(&self) -> Vec<u16> {
compute_pk_weights(&self.sorted_pk_indices)
pub(crate) fn pk_weights_to_sort_data(&self, pk_weights: &mut Vec<u16>) {
compute_pk_weights(&self.sorted_pk_indices, pk_weights)
}

/// Returns pk indices sorted by keys.
Expand All @@ -199,12 +199,11 @@ impl DictBuilderReader {
}

/// Returns pk weights to sort a data part and replaces pk indices.
fn compute_pk_weights(sorted_pk_indices: &[PkIndex]) -> Vec<u16> {
let mut pk_weights = vec![0; sorted_pk_indices.len()];
fn compute_pk_weights(sorted_pk_indices: &[PkIndex], pk_weights: &mut Vec<u16>) {
pk_weights.resize(sorted_pk_indices.len(), 0);
for (weight, pk_index) in sorted_pk_indices.iter().enumerate() {
pk_weights[*pk_index as usize] = weight as u16;
}
pk_weights
}

/// A key dictionary.
Expand Down Expand Up @@ -240,7 +239,9 @@ impl KeyDict {

/// Returns pk weights to sort a data part and replaces pk indices.
pub(crate) fn pk_weights_to_sort_data(&self) -> Vec<u16> {
compute_pk_weights(&self.key_positions)
let mut pk_weights = Vec::with_capacity(self.key_positions.len());
compute_pk_weights(&self.key_positions, &mut pk_weights);
pk_weights
}

/// Returns the shared memory size.
Expand Down
Loading

0 comments on commit 8059b95

Please sign in to comment.