From 8059b95e37fd2792890b4d4b467549fc011ee8e4 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Sun, 25 Feb 2024 15:42:16 +0800 Subject: [PATCH] feat: Implement iter for the new memtable (#3373) * chore: read shard builder * chore: reuse pk weights * chore: prune key * chore: shard reader wip * refactor: shard builder DataBatch * feat: merge shard readers * feat: return shard id in shard readers * feat: impl partition reader * chore: impl partition read * feat: impl iter tree * chore: save last yield pk id * style: fix clippy * refactor: rename ShardReaderImpl to ShardReader * chore: address CR comment --- src/mito2/src/memtable/merge_tree.rs | 106 ++++++- src/mito2/src/memtable/merge_tree/dict.rs | 13 +- .../src/memtable/merge_tree/partition.rs | 271 +++++++++++++++--- src/mito2/src/memtable/merge_tree/shard.rs | 201 ++++++++++++- .../src/memtable/merge_tree/shard_builder.rs | 107 +++++-- src/mito2/src/memtable/merge_tree/tree.rs | 57 +++- 6 files changed, 666 insertions(+), 89 deletions(-) diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index d075a9deb380..8244862a4256 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -110,10 +110,10 @@ impl Memtable for MergeTreeMemtable { fn iter( &self, - _projection: Option<&[ColumnId]>, - _predicate: Option, + projection: Option<&[ColumnId]>, + predicate: Option, ) -> Result { - todo!() + self.tree.read(projection, predicate) } fn is_empty(&self) -> bool { @@ -275,18 +275,22 @@ impl MemtableBuilder for MergeTreeMemtableBuilder { #[cfg(test)] mod tests { + use std::collections::BTreeSet; + use common_time::Timestamp; + use datatypes::scalars::ScalarVector; + use datatypes::vectors::{Int64Vector, TimestampMillisecondVector}; use super::*; use crate::test_util::memtable_util; #[test] fn test_memtable_sorted_input() { - write_sorted_input(true); - write_sorted_input(false); + write_iter_sorted_input(true); + write_iter_sorted_input(false); } - fn write_sorted_input(has_pk: bool) { + fn write_iter_sorted_input(has_pk: bool) { let metadata = if has_pk { memtable_util::metadata_with_primary_key(vec![1, 0], true) } else { @@ -298,7 +302,27 @@ mod tests { let memtable = MergeTreeMemtable::new(1, metadata, None, &MergeTreeConfig::default()); memtable.write(&kvs).unwrap(); - // TODO(yingwen): Test iter. + let expected_ts = kvs + .iter() + .map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value()) + .collect::>(); + + let iter = memtable.iter(None, None).unwrap(); + let read = iter + .flat_map(|batch| { + batch + .unwrap() + .timestamps() + .as_any() + .downcast_ref::() + .unwrap() + .iter_data() + .collect::>() + .into_iter() + }) + .map(|v| v.unwrap().0.value()) + .collect::>(); + assert_eq!(expected_ts, read); let stats = memtable.stats(); assert!(stats.bytes_allocated() > 0); @@ -344,7 +368,36 @@ mod tests { ); memtable.write(&kvs).unwrap(); - // TODO(yingwen): Test iter. + let iter = memtable.iter(None, None).unwrap(); + let read = iter + .flat_map(|batch| { + batch + .unwrap() + .timestamps() + .as_any() + .downcast_ref::() + .unwrap() + .iter_data() + .collect::>() + .into_iter() + }) + .map(|v| v.unwrap().0.value()) + .collect::>(); + assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], read); + + let iter = memtable.iter(None, None).unwrap(); + let read = iter + .flat_map(|batch| { + batch + .unwrap() + .sequences() + .iter_data() + .collect::>() + .into_iter() + }) + .map(|v| v.unwrap()) + .collect::>(); + assert_eq!(vec![8, 0, 6, 1, 7, 5, 4, 9], read); let stats = memtable.stats(); assert!(stats.bytes_allocated() > 0); @@ -353,4 +406,41 @@ mod tests { stats.time_range() ); } + + #[test] + fn test_memtable_projection() { + write_iter_projection(true); + write_iter_projection(false); + } + + fn write_iter_projection(has_pk: bool) { + let metadata = if has_pk { + memtable_util::metadata_with_primary_key(vec![1, 0], true) + } else { + memtable_util::metadata_with_primary_key(vec![], false) + }; + // Try to build a memtable via the builder. + let memtable = MergeTreeMemtableBuilder::new(None).build(1, &metadata); + + let expect = (0..100).collect::>(); + let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1); + memtable.write(&kvs).unwrap(); + let iter = memtable.iter(Some(&[3]), None).unwrap(); + + let mut v0_all = vec![]; + for res in iter { + let batch = res.unwrap(); + assert_eq!(1, batch.fields().len()); + let v0 = batch + .fields() + .first() + .unwrap() + .data + .as_any() + .downcast_ref::() + .unwrap(); + v0_all.extend(v0.iter_data().map(|v| v.unwrap())); + } + assert_eq!(expect, v0_all); + } } diff --git a/src/mito2/src/memtable/merge_tree/dict.rs b/src/mito2/src/memtable/merge_tree/dict.rs index 5c1c3c3a57f6..c2f5d170dc1e 100644 --- a/src/mito2/src/memtable/merge_tree/dict.rs +++ b/src/mito2/src/memtable/merge_tree/dict.rs @@ -188,8 +188,8 @@ impl DictBuilderReader { } /// Returns pk weights to sort a data part and replaces pk indices. - pub(crate) fn pk_weights_to_sort_data(&self) -> Vec { - compute_pk_weights(&self.sorted_pk_indices) + pub(crate) fn pk_weights_to_sort_data(&self, pk_weights: &mut Vec) { + compute_pk_weights(&self.sorted_pk_indices, pk_weights) } /// Returns pk indices sorted by keys. @@ -199,12 +199,11 @@ impl DictBuilderReader { } /// Returns pk weights to sort a data part and replaces pk indices. -fn compute_pk_weights(sorted_pk_indices: &[PkIndex]) -> Vec { - let mut pk_weights = vec![0; sorted_pk_indices.len()]; +fn compute_pk_weights(sorted_pk_indices: &[PkIndex], pk_weights: &mut Vec) { + pk_weights.resize(sorted_pk_indices.len(), 0); for (weight, pk_index) in sorted_pk_indices.iter().enumerate() { pk_weights[*pk_index as usize] = weight as u16; } - pk_weights } /// A key dictionary. @@ -240,7 +239,9 @@ impl KeyDict { /// Returns pk weights to sort a data part and replaces pk indices. pub(crate) fn pk_weights_to_sort_data(&self) -> Vec { - compute_pk_weights(&self.key_positions) + let mut pk_weights = Vec::with_capacity(self.key_positions.len()); + compute_pk_weights(&self.key_positions, &mut pk_weights); + pk_weights } /// Returns the shared memory size. diff --git a/src/mito2/src/memtable/merge_tree/partition.rs b/src/mito2/src/memtable/merge_tree/partition.rs index 89302906b27e..428efa53e53f 100644 --- a/src/mito2/src/memtable/merge_tree/partition.rs +++ b/src/mito2/src/memtable/merge_tree/partition.rs @@ -19,6 +19,7 @@ use std::collections::HashSet; use std::sync::{Arc, RwLock}; +use api::v1::SemanticType; use common_recordbatch::filter::SimpleFilterEvaluator; use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME; @@ -26,11 +27,13 @@ use store_api::storage::ColumnId; use crate::error::Result; use crate::memtable::key_values::KeyValue; -use crate::memtable::merge_tree::data::{DataParts, DATA_INIT_CAP}; +use crate::memtable::merge_tree::data::{DataBatch, DataParts, DATA_INIT_CAP}; use crate::memtable::merge_tree::metrics::WriteMetrics; -use crate::memtable::merge_tree::shard::Shard; +use crate::memtable::merge_tree::shard::{Shard, ShardMerger, ShardNode, ShardSource}; use crate::memtable::merge_tree::shard_builder::ShardBuilder; -use crate::memtable::merge_tree::{MergeTreeConfig, PkId, ShardId}; +use crate::memtable::merge_tree::{MergeTreeConfig, PkId}; +use crate::read::{Batch, BatchBuilder}; +use crate::row_converter::{McmpRowCodec, RowCodec}; /// Key of a partition. pub type PartitionKey = u32; @@ -40,13 +43,13 @@ pub struct Partition { inner: RwLock, } +pub type PartitionRef = Arc; + impl Partition { /// Creates a new partition. pub fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> Self { - let shard_builder = ShardBuilder::new(metadata.clone(), config); - Partition { - inner: RwLock::new(Inner::new(metadata, shard_builder, config.dedup)), + inner: RwLock::new(Inner::new(metadata, config)), } } @@ -83,7 +86,7 @@ impl Partition { let mut inner = self.inner.write().unwrap(); // If no primary key, always write to the first shard. debug_assert!(!inner.shards.is_empty()); - debug_assert_eq!(1, inner.active_shard_id); + debug_assert_eq!(1, inner.shard_builder.current_shard_id()); // A dummy pk id. let pk_id = PkId { @@ -95,12 +98,31 @@ impl Partition { } /// Scans data in the partition. - pub fn scan( - &self, - _projection: HashSet, - _filters: Vec, - ) -> Result { - unimplemented!() + pub fn read(&self, mut context: ReadPartitionContext) -> Result { + // TODO(yingwen): Change to acquire read lock if `read()` takes `&self`. + let nodes = { + let mut inner = self.inner.write().unwrap(); + let mut nodes = Vec::with_capacity(inner.shards.len() + 1); + let bulder_reader = inner.shard_builder.read(&mut context.pk_weights)?; + nodes.push(ShardNode::new(ShardSource::Builder(bulder_reader))); + for shard in &mut inner.shards { + let shard_reader = shard.read()?; + nodes.push(ShardNode::new(ShardSource::Shard(shard_reader))); + } + nodes + }; + + // Creating a shard merger will invoke next so we do it outside of the lock. + let shard_merger = ShardMerger::try_new(nodes)?; + Ok(PartitionReader { + metadata: context.metadata, + row_codec: context.row_codec, + projection: context.projection, + filters: context.filters, + pk_weights: context.pk_weights, + shard_merger, + last_yield_pk_id: None, + }) } /// Freezes the partition. @@ -111,10 +133,17 @@ impl Partition { } /// Forks the partition. + /// + /// Must freeze the partition before fork. pub fn fork(&self, metadata: &RegionMetadataRef, config: &MergeTreeConfig) -> Partition { let inner = self.inner.read().unwrap(); + debug_assert!(inner.shard_builder.is_empty()); // TODO(yingwen): TTL or evict shards. - let shard_builder = ShardBuilder::new(metadata.clone(), config); + let shard_builder = ShardBuilder::new( + metadata.clone(), + config, + inner.shard_builder.current_shard_id(), + ); let shards = inner .shards .iter() @@ -125,7 +154,6 @@ impl Partition { inner: RwLock::new(Inner { metadata: metadata.clone(), shard_builder, - active_shard_id: inner.active_shard_id, shards, num_rows: 0, dedup: config.dedup, @@ -180,9 +208,187 @@ impl Partition { /// Reader to scan rows in a partition. /// /// It can merge rows from multiple shards. -pub struct PartitionReader {} +pub struct PartitionReader { + metadata: RegionMetadataRef, + row_codec: Arc, + projection: HashSet, + filters: Vec, + pk_weights: Vec, + shard_merger: ShardMerger, + last_yield_pk_id: Option, +} -pub type PartitionRef = Arc; +impl PartitionReader { + pub fn is_valid(&self) -> bool { + self.shard_merger.is_valid() + } + + pub fn next(&mut self) -> Result<()> { + self.shard_merger.next()?; + + if self.metadata.primary_key.is_empty() { + // Nothing to prune. + return Ok(()); + } + + while self.shard_merger.is_valid() { + let pk_id = self.shard_merger.current_pk_id(); + if let Some(yield_pk_id) = self.last_yield_pk_id { + if pk_id == yield_pk_id { + // If this batch has the same key as last returned batch. + // We can return it without evaluating filters. + break; + } + } + let key = self.shard_merger.current_key().unwrap(); + // Prune batch by primary key. + if prune_primary_key(&self.metadata, &self.filters, &self.row_codec, key) { + // We need this key. + self.last_yield_pk_id = Some(pk_id); + break; + } + self.shard_merger.next()?; + } + + Ok(()) + } + + pub fn convert_current_batch(&self) -> Result { + let data_batch = self.shard_merger.current_data_batch(); + data_batch_to_batch( + &self.metadata, + &self.projection, + self.shard_merger.current_key(), + data_batch, + ) + } + + pub(crate) fn into_context(self) -> ReadPartitionContext { + ReadPartitionContext { + metadata: self.metadata, + row_codec: self.row_codec, + projection: self.projection, + filters: self.filters, + pk_weights: self.pk_weights, + } + } +} + +// TODO(yingwen): Improve performance of key prunning. Now we need to find index and +// then decode and convert each value. +/// Returns true if the `pk` is still needed. +fn prune_primary_key( + metadata: &RegionMetadataRef, + filters: &[SimpleFilterEvaluator], + codec: &McmpRowCodec, + pk: &[u8], +) -> bool { + if filters.is_empty() { + return true; + } + + // no primary key, we simply return true. + if metadata.primary_key.is_empty() { + return true; + } + + let pk_values = match codec.decode(pk) { + Ok(values) => values, + Err(e) => { + common_telemetry::error!(e; "Failed to decode primary key"); + return true; + } + }; + + // evaluate filters against primary key values + let mut result = true; + for filter in filters { + let Some(column) = metadata.column_by_name(filter.column_name()) else { + continue; + }; + // ignore filters that are not referencing primary key columns + if column.semantic_type != SemanticType::Tag { + continue; + } + // index of the column in primary keys. + // Safety: A tag column is always in primary key. + let index = metadata.primary_key_index(column.column_id).unwrap(); + // Safety: arrow schema and datatypes are constructed from the same source. + let scalar_value = pk_values[index] + .try_to_scalar_value(&column.column_schema.data_type) + .unwrap(); + result &= filter.evaluate_scalar(&scalar_value).unwrap_or(true); + } + + result +} + +/// Structs to reuse across readers to avoid allocating for each reader. +pub(crate) struct ReadPartitionContext { + metadata: RegionMetadataRef, + row_codec: Arc, + projection: HashSet, + filters: Vec, + /// Buffer to store pk weights. + pk_weights: Vec, +} + +impl ReadPartitionContext { + pub(crate) fn new( + metadata: RegionMetadataRef, + row_codec: Arc, + projection: HashSet, + filters: Vec, + ) -> ReadPartitionContext { + ReadPartitionContext { + metadata, + row_codec, + projection, + filters, + pk_weights: Vec::new(), + } + } +} + +// TODO(yingwen): Pushdown projection to shard readers. +/// Converts a [DataBatch] to a [Batch]. +fn data_batch_to_batch( + metadata: &RegionMetadataRef, + projection: &HashSet, + key: Option<&[u8]>, + data_batch: DataBatch, +) -> Result { + let record_batch = data_batch.slice_record_batch(); + let primary_key = key.map(|k| k.to_vec()).unwrap_or_default(); + let mut builder = BatchBuilder::new(primary_key); + builder + .timestamps_array(record_batch.column(1).clone())? + .sequences_array(record_batch.column(2).clone())? + .op_types_array(record_batch.column(3).clone())?; + + if record_batch.num_columns() <= 4 { + // No fields. + return builder.build(); + } + + // Iterate all field columns. + for (array, field) in record_batch + .columns() + .iter() + .zip(record_batch.schema().fields().iter()) + .skip(4) + { + // TODO(yingwen): Avoid finding column by name. We know the schema of a DataBatch. + // Safety: metadata should contain all fields. + let column_id = metadata.column_by_name(field.name()).unwrap().column_id; + if !projection.contains(&column_id) { + continue; + } + builder.push_field_array(column_id, array.clone())?; + } + + builder.build() +} /// Inner struct of the partition. /// @@ -191,7 +397,6 @@ struct Inner { metadata: RegionMetadataRef, /// Shard whose dictionary is active. shard_builder: ShardBuilder, - active_shard_id: ShardId, /// Shards with frozen dictionary. shards: Vec, num_rows: usize, @@ -199,23 +404,21 @@ struct Inner { } impl Inner { - fn new(metadata: RegionMetadataRef, shard_builder: ShardBuilder, dedup: bool) -> Self { - let mut inner = Self { + fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> Self { + let (shards, current_shard_id) = if metadata.primary_key.is_empty() { + let data_parts = DataParts::new(metadata.clone(), DATA_INIT_CAP, config.dedup); + (vec![Shard::new(0, None, data_parts, config.dedup)], 1) + } else { + (Vec::new(), 0) + }; + let shard_builder = ShardBuilder::new(metadata.clone(), config, current_shard_id); + Self { metadata, shard_builder, - active_shard_id: 0, - shards: Vec::new(), + shards, num_rows: 0, - dedup, - }; - - if inner.metadata.primary_key.is_empty() { - let data_parts = DataParts::new(inner.metadata.clone(), DATA_INIT_CAP, dedup); - inner.shards.push(Shard::new(0, None, data_parts, dedup)); - inner.active_shard_id = 1; + dedup: config.dedup, } - - inner } fn find_key_in_shards(&self, primary_key: &[u8]) -> Option { @@ -239,11 +442,7 @@ impl Inner { } fn freeze_active_shard(&mut self) -> Result<()> { - if let Some(shard) = self - .shard_builder - .finish(self.active_shard_id, self.metadata.clone())? - { - self.active_shard_id += 1; + if let Some(shard) = self.shard_builder.finish(self.metadata.clone())? { self.shards.push(shard); } Ok(()) diff --git a/src/mito2/src/memtable/merge_tree/shard.rs b/src/mito2/src/memtable/merge_tree/shard.rs index a9ad6e30b822..81ce4cb408dd 100644 --- a/src/mito2/src/memtable/merge_tree/shard.rs +++ b/src/mito2/src/memtable/merge_tree/shard.rs @@ -14,11 +14,16 @@ //! Shard in a partition. +use std::cmp::Ordering; + use store_api::metadata::RegionMetadataRef; +use crate::error::Result; use crate::memtable::key_values::KeyValue; -use crate::memtable::merge_tree::data::{DataParts, DATA_INIT_CAP}; +use crate::memtable::merge_tree::data::{DataBatch, DataParts, DataPartsReader, DATA_INIT_CAP}; use crate::memtable::merge_tree::dict::KeyDictRef; +use crate::memtable::merge_tree::merger::{Merger, Node}; +use crate::memtable::merge_tree::shard_builder::ShardBuilderReader; use crate::memtable::merge_tree::{PkId, ShardId}; /// Shard stores data related to the same key dictionary. @@ -67,8 +72,14 @@ impl Shard { /// Scans the shard. // TODO(yingwen): Push down projection to data parts. - pub fn scan(&self) -> ShardReader { - unimplemented!() + pub fn read(&mut self) -> Result { + let parts_reader = self.data_parts.read()?; + + Ok(ShardReader { + shard_id: self.shard_id, + key_dict: self.key_dict.clone(), + parts_reader, + }) } /// Returns the memory size of the shard part. @@ -91,7 +102,189 @@ impl Shard { } /// Reader to read rows in a shard. -pub struct ShardReader {} +pub struct ShardReader { + shard_id: ShardId, + key_dict: Option, + parts_reader: DataPartsReader, +} + +impl ShardReader { + fn shard_id(&self) -> ShardId { + self.shard_id + } + + fn is_valid(&self) -> bool { + self.parts_reader.is_valid() + } + + fn next(&mut self) -> Result<()> { + self.parts_reader.next() + } + + fn current_key(&self) -> Option<&[u8]> { + let pk_index = self.parts_reader.current_data_batch().pk_index(); + self.key_dict + .as_ref() + .map(|dict| dict.key_by_pk_index(pk_index)) + } + + fn current_pk_id(&self) -> PkId { + let pk_index = self.parts_reader.current_data_batch().pk_index(); + PkId { + shard_id: self.shard_id, + pk_index, + } + } + + fn current_data_batch(&self) -> DataBatch { + self.parts_reader.current_data_batch() + } +} + +pub(crate) struct ShardMerger { + merger: Merger, +} + +impl ShardMerger { + pub(crate) fn try_new(nodes: Vec) -> Result { + let merger = Merger::try_new(nodes)?; + Ok(ShardMerger { merger }) + } + + pub(crate) fn is_valid(&self) -> bool { + self.merger.is_valid() + } + + pub(crate) fn next(&mut self) -> Result<()> { + self.merger.next() + } + + pub(crate) fn current_pk_id(&self) -> PkId { + self.merger.current_node().current_pk_id() + } + + pub(crate) fn current_key(&self) -> Option<&[u8]> { + self.merger.current_node().current_key() + } + + pub(crate) fn current_data_batch(&self) -> DataBatch { + let batch = self.merger.current_node().current_data_batch(); + batch.slice(0, self.merger.current_rows()) + } +} + +pub(crate) enum ShardSource { + Builder(ShardBuilderReader), + Shard(ShardReader), +} + +impl ShardSource { + fn is_valid(&self) -> bool { + match self { + ShardSource::Builder(r) => r.is_valid(), + ShardSource::Shard(r) => r.is_valid(), + } + } + + fn next(&mut self) -> Result<()> { + match self { + ShardSource::Builder(r) => r.next(), + ShardSource::Shard(r) => r.next(), + } + } + + fn current_pk_id(&self) -> PkId { + match self { + ShardSource::Builder(r) => r.current_pk_id(), + ShardSource::Shard(r) => r.current_pk_id(), + } + } + + fn current_key(&self) -> Option<&[u8]> { + match self { + ShardSource::Builder(r) => r.current_key(), + ShardSource::Shard(r) => r.current_key(), + } + } + + fn current_data_batch(&self) -> DataBatch { + match self { + ShardSource::Builder(r) => r.current_data_batch(), + ShardSource::Shard(r) => r.current_data_batch(), + } + } +} + +/// Node for the merger to get items. +pub(crate) struct ShardNode { + source: ShardSource, +} + +impl ShardNode { + pub(crate) fn new(source: ShardSource) -> Self { + Self { source } + } + + fn current_pk_id(&self) -> PkId { + self.source.current_pk_id() + } + + fn current_key(&self) -> Option<&[u8]> { + self.source.current_key() + } + + fn current_data_batch(&self) -> DataBatch { + self.source.current_data_batch() + } +} + +impl PartialEq for ShardNode { + fn eq(&self, other: &Self) -> bool { + self.source.current_key() == other.source.current_key() + } +} + +impl Eq for ShardNode {} + +impl Ord for ShardNode { + fn cmp(&self, other: &Self) -> Ordering { + self.source + .current_key() + .cmp(&other.source.current_key()) + .reverse() + } +} + +impl PartialOrd for ShardNode { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Node for ShardNode { + fn is_valid(&self) -> bool { + self.source.is_valid() + } + + fn is_behind(&self, other: &Self) -> bool { + // We expect a key only belongs to one shard. + debug_assert_ne!(self.source.current_key(), other.source.current_key()); + self.source.current_key() < other.source.current_key() + } + + fn advance(&mut self, len: usize) -> Result<()> { + debug_assert_eq!(self.source.current_data_batch().num_rows(), len); + self.source.next() + } + + fn current_item_len(&self) -> usize { + self.current_data_batch().num_rows() + } + + fn search_key_in_current_item(&self, _other: &Self) -> Result { + Err(self.source.current_data_batch().num_rows()) + } +} #[cfg(test)] mod tests { diff --git a/src/mito2/src/memtable/merge_tree/shard_builder.rs b/src/mito2/src/memtable/merge_tree/shard_builder.rs index 68ebac37a2f5..d48310409b40 100644 --- a/src/mito2/src/memtable/merge_tree/shard_builder.rs +++ b/src/mito2/src/memtable/merge_tree/shard_builder.rs @@ -14,24 +14,25 @@ //! Builder of a shard. -use std::collections::HashSet; use std::sync::Arc; -use common_recordbatch::filter::SimpleFilterEvaluator; use store_api::metadata::RegionMetadataRef; -use store_api::storage::ColumnId; use crate::error::Result; use crate::memtable::key_values::KeyValue; -use crate::memtable::merge_tree::data::{DataBuffer, DataParts, DATA_INIT_CAP}; -use crate::memtable::merge_tree::dict::KeyDictBuilder; +use crate::memtable::merge_tree::data::{ + DataBatch, DataBuffer, DataBufferReader, DataParts, DATA_INIT_CAP, +}; +use crate::memtable::merge_tree::dict::{DictBuilderReader, KeyDictBuilder}; use crate::memtable::merge_tree::metrics::WriteMetrics; use crate::memtable::merge_tree::shard::Shard; -use crate::memtable::merge_tree::{MergeTreeConfig, ShardId}; +use crate::memtable::merge_tree::{MergeTreeConfig, PkId, ShardId}; /// Builder to write keys and data to a shard that the key dictionary /// is still active. pub struct ShardBuilder { + /// Id of the current shard to build. + current_shard_id: ShardId, /// Builder for the key dictionary. dict_builder: KeyDictBuilder, /// Buffer to store data. @@ -43,13 +44,17 @@ pub struct ShardBuilder { impl ShardBuilder { /// Returns a new builder. - pub fn new(metadata: RegionMetadataRef, config: &MergeTreeConfig) -> ShardBuilder { - let dedup = config.dedup; + pub fn new( + metadata: RegionMetadataRef, + config: &MergeTreeConfig, + shard_id: ShardId, + ) -> ShardBuilder { ShardBuilder { + current_shard_id: shard_id, dict_builder: KeyDictBuilder::new(config.index_max_keys_per_shard), - data_buffer: DataBuffer::with_capacity(metadata, DATA_INIT_CAP, dedup), + data_buffer: DataBuffer::with_capacity(metadata, DATA_INIT_CAP, config.dedup), data_freeze_threshold: config.data_freeze_threshold, - dedup, + dedup: config.dedup, } } @@ -65,15 +70,16 @@ impl ShardBuilder { self.dict_builder.is_full() || self.data_buffer.num_rows() == self.data_freeze_threshold } + /// Returns the current shard id of the builder. + pub fn current_shard_id(&self) -> ShardId { + self.current_shard_id + } + /// Builds a new shard and resets the builder. /// /// Returns `None` if the builder is empty. - pub fn finish( - &mut self, - shard_id: ShardId, - metadata: RegionMetadataRef, - ) -> Result> { - if self.data_buffer.is_empty() { + pub fn finish(&mut self, metadata: RegionMetadataRef) -> Result> { + if self.is_empty() { return Ok(None); } @@ -93,24 +99,68 @@ impl ShardBuilder { let data_parts = DataParts::new(metadata, DATA_INIT_CAP, self.dedup).with_frozen(vec![data_part]); let key_dict = key_dict.map(Arc::new); + let shard_id = self.current_shard_id; + self.current_shard_id += 1; Ok(Some(Shard::new(shard_id, key_dict, data_parts, self.dedup))) } /// Scans the shard builder. - pub fn scan( - &mut self, - _projection: &HashSet, - _filters: &[SimpleFilterEvaluator], - ) -> Result { - unimplemented!() + pub fn read(&mut self, pk_weights_buffer: &mut Vec) -> Result { + let dict_reader = self.dict_builder.read(); + dict_reader.pk_weights_to_sort_data(pk_weights_buffer); + let data_reader = self.data_buffer.read(Some(pk_weights_buffer))?; + + Ok(ShardBuilderReader { + shard_id: self.current_shard_id, + dict_reader, + data_reader, + }) + } + + /// Returns true if the builder is empty. + pub fn is_empty(&self) -> bool { + self.data_buffer.is_empty() } } /// Reader to scan a shard builder. -pub struct ShardBuilderReader {} +pub struct ShardBuilderReader { + shard_id: ShardId, + dict_reader: DictBuilderReader, + data_reader: DataBufferReader, +} + +impl ShardBuilderReader { + pub fn shard_id(&self) -> ShardId { + self.shard_id + } -// TODO(yingwen): Can we use generic for data reader? + pub fn is_valid(&self) -> bool { + self.data_reader.is_valid() + } + + pub fn next(&mut self) -> Result<()> { + self.data_reader.next() + } + + pub fn current_key(&self) -> Option<&[u8]> { + let pk_index = self.data_reader.current_data_batch().pk_index(); + Some(self.dict_reader.key_by_pk_index(pk_index)) + } + + pub fn current_pk_id(&self) -> PkId { + let pk_index = self.data_reader.current_data_batch().pk_index(); + PkId { + shard_id: self.shard_id, + pk_index, + } + } + + pub fn current_data_batch(&self) -> DataBatch { + self.data_reader.current_data_batch() + } +} #[cfg(test)] mod tests { @@ -179,9 +229,10 @@ mod tests { let metadata = metadata_for_test(); let input = input_with_key(&metadata); let config = MergeTreeConfig::default(); - let mut shard_builder = ShardBuilder::new(metadata.clone(), &config); + let mut shard_builder = ShardBuilder::new(metadata.clone(), &config, 1); let mut metrics = WriteMetrics::default(); - assert!(shard_builder.finish(1, metadata.clone()).unwrap().is_none()); + assert!(shard_builder.finish(metadata.clone()).unwrap().is_none()); + assert_eq!(1, shard_builder.current_shard_id); for key_values in &input { for kv in key_values.iter() { @@ -189,6 +240,8 @@ mod tests { shard_builder.write_with_key(&key, kv, &mut metrics); } } - shard_builder.finish(1, metadata).unwrap().unwrap(); + let shard = shard_builder.finish(metadata).unwrap().unwrap(); + assert_eq!(1, shard.shard_id); + assert_eq!(2, shard_builder.current_shard_id); } } diff --git a/src/mito2/src/memtable/merge_tree/tree.rs b/src/mito2/src/memtable/merge_tree/tree.rs index 4ae7d197b2e7..afa79463e591 100644 --- a/src/mito2/src/memtable/merge_tree/tree.rs +++ b/src/mito2/src/memtable/merge_tree/tree.rs @@ -32,7 +32,7 @@ use crate::error::{PrimaryKeyLengthMismatchSnafu, Result}; use crate::memtable::key_values::KeyValue; use crate::memtable::merge_tree::metrics::WriteMetrics; use crate::memtable::merge_tree::partition::{ - Partition, PartitionKey, PartitionReader, PartitionRef, + Partition, PartitionKey, PartitionReader, PartitionRef, ReadPartitionContext, }; use crate::memtable::merge_tree::MergeTreeConfig; use crate::memtable::time_series::primary_key_schema; @@ -122,7 +122,7 @@ impl MergeTree { } /// Scans the tree. - pub fn scan( + pub fn read( &self, projection: Option<&[ColumnId]>, predicate: Option, @@ -151,16 +151,21 @@ impl MergeTree { .map(|pk| pk.column_schema.data_type.clone()) .collect(); - let iter = TreeIter { + let mut iter = TreeIter { metadata: self.metadata.clone(), pk_schema, pk_datatypes, - projection, - filters, row_codec: self.row_codec.clone(), partitions, current_reader: None, }; + let context = ReadPartitionContext::new( + self.metadata.clone(), + self.row_codec.clone(), + projection, + filters, + ); + iter.fetch_next_partition(context)?; Ok(Box::new(iter)) } @@ -281,8 +286,6 @@ struct TreeIter { metadata: RegionMetadataRef, pk_schema: arrow::datatypes::SchemaRef, pk_datatypes: Vec, - projection: HashSet, - filters: Vec, row_codec: Arc, partitions: VecDeque, current_reader: Option, @@ -292,6 +295,44 @@ impl Iterator for TreeIter { type Item = Result; fn next(&mut self) -> Option { - unimplemented!() + self.next_batch().transpose() + } +} + +impl TreeIter { + /// Fetch next partition. + fn fetch_next_partition(&mut self, mut context: ReadPartitionContext) -> Result<()> { + while let Some(partition) = self.partitions.pop_front() { + let part_reader = partition.read(context)?; + if !part_reader.is_valid() { + context = part_reader.into_context(); + continue; + } + self.current_reader = Some(part_reader); + break; + } + + Ok(()) + } + + /// Fetches next batch. + fn next_batch(&mut self) -> Result> { + let Some(part_reader) = &mut self.current_reader else { + return Ok(None); + }; + + debug_assert!(part_reader.is_valid()); + let batch = part_reader.convert_current_batch()?; + part_reader.next()?; + if part_reader.is_valid() { + return Ok(Some(batch)); + } + + // Safety: current reader is Some. + let part_reader = self.current_reader.take().unwrap(); + let context = part_reader.into_context(); + self.fetch_next_partition(context)?; + + Ok(Some(batch)) } }