From 4f24295223f233b9c79bff7b38c4c2df8410b6ab Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 7 Feb 2024 21:13:13 +0800 Subject: [PATCH] feat: single data part iter (#15) * wip: read data parts * wip: change iter method signature * wip: change iter return value to Iter * wip: iter frozen parts * feat: impl single data part iterator * fix: change encoded data to Bytes * fix: some cr comments --- src/mito2/src/error.rs | 9 +- src/mito2/src/memtable/merge_tree/data.rs | 280 ++++++++++++++++++++-- 2 files changed, 268 insertions(+), 21 deletions(-) diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index b732c2d8729d..b833603eaa83 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -556,6 +556,13 @@ pub enum Error { error: parquet::errors::ParquetError, location: Location, }, + + #[snafu(display("Failed to iter data part"))] + ReadDataPart { + #[snafu(source)] + error: parquet::errors::ParquetError, + location: Location, + }, } pub type Result = std::result::Result; @@ -657,7 +664,7 @@ impl ErrorExt for Error { StaleLogEntry { .. } => StatusCode::Unexpected, FilterRecordBatch { source, .. } => source.status_code(), Upload { .. } => StatusCode::StorageUnavailable, - EncodeMemtable { .. } => StatusCode::Internal, + ReadDataPart { .. } | EncodeMemtable { .. } => StatusCode::Internal, } } diff --git a/src/mito2/src/memtable/merge_tree/data.rs b/src/mito2/src/memtable/merge_tree/data.rs index dc6e7d3bdab9..a6ce8339f766 100644 --- a/src/mito2/src/memtable/merge_tree/data.rs +++ b/src/mito2/src/memtable/merge_tree/data.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use bytes::Bytes; use datatypes::arrow; -use datatypes::arrow::array::{RecordBatch, UInt32Array}; +use datatypes::arrow::array::{RecordBatch, UInt16Array, UInt32Array}; use datatypes::arrow::datatypes::{Field, Schema, SchemaRef}; use datatypes::data_type::DataType; use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, Vector, VectorRef}; @@ -31,7 +31,9 @@ use datatypes::vectors::{ TimestampNanosecondVector, TimestampSecondVector, UInt16Vector, UInt16VectorBuilder, UInt64Vector, UInt64VectorBuilder, UInt8VectorBuilder, }; +use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder}; use parquet::arrow::ArrowWriter; +use parquet::file::properties::WriterProperties; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME}; @@ -44,6 +46,8 @@ use crate::memtable::merge_tree::{PkId, PkIndex, ShardId}; pub const PK_INDEX_COLUMN_NAME: &str = "pk_index"; /// Data part batches returns by `DataParts::read`. + +#[derive(Debug)] pub struct DataBatch { /// Primary key index of this batch. pk_index: PkIndex, @@ -53,16 +57,22 @@ pub struct DataBatch { range: Range, } +impl DataBatch { + pub(crate) fn as_record_batch(&self) -> RecordBatch { + self.rb.slice(self.range.start, self.range.len()) + } +} + /// Data parts including an active writing part and several frozen parts. pub struct DataParts { active: DataBuffer, frozen: Vec, // todo(hl): merge all frozen parts into one parquet-encoded bytes. } +pub struct HeapNode {} + /// Iterator for iterating data in `DataParts` -pub struct Iter { - // todo -} +pub struct Iter {} impl Iterator for Iter { type Item = Result; @@ -82,8 +92,7 @@ impl DataParts { /// The returned iterator yields a record batch of one primary key at a time. /// The order of yielding primary keys is determined by provided weights. pub fn iter(&mut self, _pk_weights: &[u16]) -> Result { - let iter = todo!(); - Ok(iter) + todo!() } } @@ -159,16 +168,20 @@ impl DataBuffer { /// Freezes `DataBuffer` to bytes. Use `pk_weights` to convert pk_id to pk sort order. /// `freeze` clears the buffers of builders. pub fn freeze(&mut self, pk_weights: &[u16]) -> Result { - let encoder = DataPartEncoder::new(&self.metadata, pk_weights); + let encoder = DataPartEncoder::new(&self.metadata, pk_weights, None); let encoded = encoder.write(self)?; Ok(DataPart::Parquet(encoded)) } /// Reads batches from data buffer without resetting builder's buffers. - pub fn read(&mut self, pk_weights: &[u16]) -> Result { - let batches = + pub fn iter(&mut self, pk_weights: &[u16]) -> Result { + let batch = data_buffer_to_record_batches(self.data_part_schema.clone(), self, pk_weights, true)?; - todo!(); + Ok(DataBufferIter { + batch, + offset: 0, + current_pk_index: 0, + }) } /// Returns num of rows in data buffer. @@ -182,26 +195,65 @@ impl DataBuffer { } } +pub(crate) struct DataBufferIter { + batch: RecordBatch, + offset: usize, + current_pk_index: PkIndex, +} + +impl Iterator for DataBufferIter { + type Item = Result; + + fn next(&mut self) -> Option { + let pk_index_array = pk_index_array(&self.batch); + search_next_pk_range(pk_index_array, self.offset).map(|(next_pk, range)| { + self.current_pk_index = next_pk; + self.offset = range.end; + Ok(DataBatch { + pk_index: next_pk, + rb: self.batch.clone(), + range, + }) + }) + } +} + struct DataPartEncoder<'a> { schema: SchemaRef, pk_weights: &'a [u16], + row_group_size: Option, } impl<'a> DataPartEncoder<'a> { - pub fn new(metadata: &RegionMetadataRef, pk_weights: &'a [u16]) -> DataPartEncoder<'a> { + pub fn new( + metadata: &RegionMetadataRef, + pk_weights: &'a [u16], + row_group_size: Option, + ) -> DataPartEncoder<'a> { let schema = memtable_schema_to_encoded_schema(metadata); - Self { schema, pk_weights } + Self { + schema, + pk_weights, + row_group_size, + } } - pub fn write(&self, source: &mut DataBuffer) -> Result> { + fn writer_props(&self) -> Option { + self.row_group_size.map(|size| { + WriterProperties::builder() + .set_max_row_group_size(size) + .build() + }) + } + pub fn write(&self, source: &mut DataBuffer) -> Result { let mut bytes = Vec::with_capacity(1024); - let mut writer = ArrowWriter::try_new(&mut bytes, self.schema.clone(), None) + let mut writer = ArrowWriter::try_new(&mut bytes, self.schema.clone(), self.writer_props()) .context(error::EncodeMemtableSnafu)?; let rb = data_buffer_to_record_batches(self.schema.clone(), source, self.pk_weights, false)?; writer.write(&rb).context(error::EncodeMemtableSnafu)?; let _file_meta = writer.close().context(error::EncodeMemtableSnafu)?; - Ok(bytes) + Ok(Bytes::from(bytes)) } } @@ -393,15 +445,112 @@ fn build_rows_to_sort( /// Format of immutable data part. pub enum DataPart { - Parquet(Vec), + Parquet(Bytes), +} + +pub struct DataPartIter { + inner: ParquetRecordBatchReader, + current_offset: usize, + current_pk_index: Option, + current_batch: Option, +} + +impl DataPartIter { + pub fn new(data: Bytes, batch_size: Option) -> Result { + let mut builder = + ParquetRecordBatchReaderBuilder::try_new(data).context(error::ReadDataPartSnafu)?; + if let Some(batch_size) = batch_size { + builder = builder.with_batch_size(batch_size); + } + let mut reader = builder.build().context(error::ReadDataPartSnafu)?; + let batch = reader + .next() + .transpose() + .context(error::ComputeArrowSnafu)?; + let pk_index = batch.as_ref().map(|b| pk_index_array(b).value(0)); + Ok(Self { + inner: reader, + current_pk_index: pk_index, + current_offset: 0, + current_batch: batch, + }) + } + + /// Searches next primary key along with it's offset range inside record batch. + fn search_next_pk_range(&self) -> Option<(PkIndex, Range)> { + self.current_batch.as_ref().and_then(|b| { + // safety: PK_INDEX_COLUMN_NAME must present in record batch yielded by data part. + let pk_array = pk_index_array(b); + search_next_pk_range(pk_array, self.current_offset) + }) + } +} + +impl Iterator for DataPartIter { + type Item = Result; + + fn next(&mut self) -> Option { + if let Some((next_pk, range)) = self.search_next_pk_range() { + self.current_pk_index = Some(next_pk); + self.current_offset = range.end; + return Some(Ok(DataBatch { + pk_index: next_pk, + rb: self.current_batch.as_ref().unwrap().clone(), // safety: current batch won't be none. + range, + })); + } else if let Some(res) = self.inner.next() { + let batch = match res { + Ok(b) => b, + Err(e) => { + return Some(Err(e).context(error::ComputeArrowSnafu)); + } + }; + self.current_batch = Some(batch); + self.current_offset = 0; + self.next() + } else { + return None; + } + } } impl DataPart { - pub fn read(&self, _pk_weights: &[u16]) -> Result { - todo!() + /// Iterates frozen data parts and yields record batches. + /// Returned record batches are ga + pub fn iter(&self, _pk_weights: &[u16]) -> Result { + match self { + DataPart::Parquet(data_bytes) => DataPartIter::new(data_bytes.clone(), None), + } } } +/// Searches for next pk index and it's offset range in a sorted `UInt16Array`. +fn search_next_pk_range(array: &UInt16Array, start: usize) -> Option<(PkIndex, Range)> { + let num_rows = array.len(); + if start >= num_rows { + return None; + } + + let next_pk = array.value(start); + for idx in start..num_rows { + if array.value(idx) != next_pk { + return Some((next_pk, start..idx)); + } + } + Some((next_pk, start..num_rows)) +} + +/// Gets `pk_index` array from record batch. +/// # Panics +/// If pk index column is not the first column or the type is not `UInt16Array`. +fn pk_index_array(batch: &RecordBatch) -> &UInt16Array { + batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() +} + #[cfg(test)] mod tests { use datafusion::arrow::array::Float64Array; @@ -523,15 +672,106 @@ mod tests { assert_eq!(4, buffer.num_rows()); - let mut encoder = DataPartEncoder::new(&meta, &[0, 1, 2]); + let mut encoder = DataPartEncoder::new(&meta, &[0, 1, 2], None); let encoded = encoder.write(&mut buffer).unwrap(); let s = String::from_utf8_lossy(encoded.as_bytes()); assert!(s.starts_with("PAR1")); assert!(s.ends_with("PAR1")); - let mut builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(encoded)).unwrap(); + let mut builder = ParquetRecordBatchReaderBuilder::try_new(encoded).unwrap(); let mut reader = builder.build().unwrap(); let batch = reader.next().unwrap().unwrap(); assert_eq!(3, batch.num_rows()); } + + fn check_values_equal( + mut iter: impl Iterator>, + expected_values: &[Vec], + ) { + let mut output = Vec::with_capacity(expected_values.len()); + for res in iter.by_ref() { + let batch = res.unwrap().as_record_batch(); + let values = batch + .column_by_name("v1") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.unwrap()) + .collect::>(); + output.push(values) + } + assert_eq!(expected_values, output); + } + + #[test] + fn test_iter_data_part() { + let meta = metadata_for_test(); + let mut buffer = DataBuffer::with_capacity(meta.clone(), 10); + + // write rows with null values. + write_rows_to_buffer( + &mut buffer, + &meta, + 2, + vec![0, 1, 2], + vec![Some(1.0), Some(2.0), Some(3.0)], + 2, + ); + + // write rows with null values. + write_rows_to_buffer( + &mut buffer, + &meta, + 3, + vec![1, 2, 3], + vec![Some(1.1), Some(2.1), Some(3.1)], + 3, + ); + + let mut encoder = DataPartEncoder::new(&meta, &[0, 1, 2, 3], Some(4)); + let encoded = encoder.write(&mut buffer).unwrap(); + + let mut iter = DataPartIter::new(encoded, Some(4)).unwrap(); + + check_values_equal(&mut iter, &[vec![1.0, 2.0, 3.0], vec![1.1], vec![2.1, 3.1]]); + } + #[test] + fn test_search_next_pk_range() { + let a = UInt16Array::from_iter_values([1, 1, 3, 3, 4, 6]); + assert_eq!((1, 0..2), search_next_pk_range(&a, 0).unwrap()); + assert_eq!((3, 2..4), search_next_pk_range(&a, 2).unwrap()); + assert_eq!((4, 4..5), search_next_pk_range(&a, 4).unwrap()); + assert_eq!((6, 5..6), search_next_pk_range(&a, 5).unwrap()); + + assert_eq!(None, search_next_pk_range(&a, 6)); + } + + #[test] + fn test_iter_data_buffer() { + let meta = metadata_for_test(); + let mut buffer = DataBuffer::with_capacity(meta.clone(), 10); + + write_rows_to_buffer( + &mut buffer, + &meta, + 3, + vec![1, 2, 3], + vec![Some(1.1), Some(2.1), Some(3.1)], + 3, + ); + + write_rows_to_buffer( + &mut buffer, + &meta, + 2, + vec![0, 1, 2], + vec![Some(1.0), Some(2.0), Some(3.0)], + 2, + ); + + let mut iter = buffer.iter(&[0, 1, 3, 2]).unwrap(); + check_values_equal(&mut iter, &[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0]]); + } }