Skip to content

Commit

Permalink
feat: packed struct encoding (#3186)
Browse files Browse the repository at this point in the history
This PR tries to add packed struct encoding.

During encoding, it packs a struct with fixed width fields, producing a
row oriented `FixedWidthDataBlock`, then use `ValueCompressor` to
compressor to a `MiniBlock Layout`.

during decoding, it first uses `ValueDecompressor` to get the
row-oriented `FixedWidthDataBlock`, then construct a `StructDataBlock`
for output.

#3173 #2601
  • Loading branch information
broccoliSpicy authored Dec 10, 2024
1 parent ef9d0c2 commit c4cb87a
Show file tree
Hide file tree
Showing 15 changed files with 447 additions and 52 deletions.
6 changes: 6 additions & 0 deletions protos/encodings.proto
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ message PackedStruct {
Buffer buffer = 2;
}

message PackedStructFixedWidthMiniBlock {
ArrayEncoding Flat = 1;
repeated uint32 bits_per_values = 2;
}

message FixedSizeBinary {
ArrayEncoding bytes = 1;
uint32 byte_width = 2;
Expand All @@ -283,6 +288,7 @@ message ArrayEncoding {
BinaryMiniBlock binary_mini_block = 15;
FsstMiniBlock fsst_mini_block = 16;
BinaryBlock binary_block = 17;
PackedStructFixedWidthMiniBlock packed_struct_fixed_width_mini_block = 18;
}
}

Expand Down
31 changes: 19 additions & 12 deletions python/python/benchmarks/test_packed_struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@

NUM_ROWS = 10_000_000
RANDOM_ACCESS = "indices"
NUM_INDICES = 100
NUM_INDICES = 1000
NUM_ROUNDS = 10
BATCH_SIZE = 16 * 1024

# This file compares benchmarks for reading and writing a StructArray column using
# (i) parquet
Expand All @@ -31,15 +32,12 @@ def test_data(tmp_path_factory):
{
"struct_col": pa.StructArray.from_arrays(
[
pc.random(NUM_ROWS).cast(pa.float32()),
pa.array(range(NUM_ROWS), type=pa.int32()),
pa.FixedSizeListArray.from_arrays(
pc.random(NUM_ROWS * 5).cast(pa.float32()), 5
),
pa.array(range(NUM_ROWS), type=pa.int32()),
pa.array(range(NUM_ROWS), type=pa.int32()),
pc.random(NUM_ROWS).cast(pa.float32()), # f1
pc.random(NUM_ROWS).cast(pa.float32()), # f2
pc.random(NUM_ROWS).cast(pa.float32()), # f3
pc.random(NUM_ROWS).cast(pa.float32()), # f4
],
["f", "i", "fsl", "i2", "i3"],
["f1", "f2", "f3", "f4"],
)
}
)
Expand All @@ -51,6 +49,7 @@ def test_data(tmp_path_factory):
@pytest.fixture(scope="module")
def random_indices():
random_indices = [random.randint(0, NUM_ROWS) for _ in range(NUM_INDICES)]
random_indices.sort()
return random_indices


Expand All @@ -59,12 +58,18 @@ def test_parquet_read(tmp_path: Path, benchmark, test_data, random_indices):
parquet_path = tmp_path / "data.parquet"
pq.write_table(test_data, parquet_path)

def read_parquet():
parquet_file = pq.ParquetFile(parquet_path)
batches = parquet_file.iter_batches(batch_size=BATCH_SIZE)
tab_parquet = pa.Table.from_batches(batches)
return tab_parquet

if RANDOM_ACCESS == "indices":
benchmark.pedantic(
lambda: pq.read_table(parquet_path).take(random_indices), rounds=5
)
elif RANDOM_ACCESS == "full":
benchmark.pedantic(lambda: pq.read_table(parquet_path), rounds=5)
benchmark.pedantic(lambda: read_parquet(), rounds=5)


def read_lance_file_random(lance_path, random_indices):
Expand All @@ -75,7 +80,9 @@ def read_lance_file_random(lance_path, random_indices):


def read_lance_file_full(lance_path):
for batch in LanceFileReader(lance_path).read_all(batch_size=1000).to_batches():
for batch in (
LanceFileReader(lance_path).read_all(batch_size=BATCH_SIZE).to_batches()
):
pass


Expand Down Expand Up @@ -127,7 +134,7 @@ def test_parquet_write(tmp_path: Path, benchmark, test_data):


def write_lance_file(lance_path, test_data):
with LanceFileWriter(lance_path, test_data.schema) as writer:
with LanceFileWriter(lance_path, test_data.schema, version="2.1") as writer:
for batch in test_data.to_batches():
writer.write_batch(batch)

Expand Down
11 changes: 11 additions & 0 deletions rust/lance-arrow/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub trait FieldExt {
///
/// This is intended for display purposes and not for serialization
fn to_compact_string(&self, indent: Indentation) -> String;

fn is_packed_struct(&self) -> bool;
}

impl FieldExt for Field {
Expand Down Expand Up @@ -79,6 +81,15 @@ impl FieldExt for Field {
}
result
}

// Check if field has metadata `packed` set to true, this check is case insensitive.
fn is_packed_struct(&self) -> bool {
let field_metadata = self.metadata();
field_metadata
.get("packed")
.map(|v| v.to_lowercase() == "true")
.unwrap_or(false)
}
}

/// Extends the functionality of [arrow_schema::Schema].
Expand Down
9 changes: 9 additions & 0 deletions rust/lance-core/src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,15 @@ impl Field {
}
None
}

// Check if field has metadata `packed` set to true, this check is case insensitive.
pub fn is_packed_struct(&self) -> bool {
let field_metadata = &self.metadata;
field_metadata
.get("packed")
.map(|v| v.to_lowercase() == "true")
.unwrap_or(false)
}
}

impl fmt::Display for Field {
Expand Down
75 changes: 74 additions & 1 deletion rust/lance-encoding/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,53 @@ impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder {
}
}

#[derive(Debug)]
struct StructDataBlockBuilder {
children: Vec<Box<dyn DataBlockBuilderImpl>>,
}

impl StructDataBlockBuilder {
// Currently only Struct with fixed-width fields are supported.
// And the assumption that all fields have `bits_per_value % 8 == 0` is made here.
fn new(bits_per_values: Vec<u32>, estimated_size_bytes: u64) -> Self {
let mut children = vec![];

debug_assert!(bits_per_values.iter().all(|bpv| bpv % 8 == 0));

let bytes_per_row: u32 = bits_per_values.iter().sum::<u32>() / 8;
let bytes_per_row = bytes_per_row as u64;

for bits_per_value in bits_per_values.iter() {
let this_estimated_size_bytes =
estimated_size_bytes / bytes_per_row * (*bits_per_value as u64) / 8;
let child =
FixedWidthDataBlockBuilder::new(*bits_per_value as u64, this_estimated_size_bytes);
children.push(Box::new(child) as Box<dyn DataBlockBuilderImpl>);
}
Self { children }
}
}

impl DataBlockBuilderImpl for StructDataBlockBuilder {
fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
let data_block = data_block.as_struct_ref().unwrap();
for i in 0..self.children.len() {
self.children[i].append(&data_block.children[i], selection.clone());
}
}

fn finish(self: Box<Self>) -> DataBlock {
let mut children_data_block = Vec::new();
for child in self.children {
let child_data_block = child.finish();
children_data_block.push(child_data_block);
}
DataBlock::Struct(StructDataBlock {
children: children_data_block,
block_info: BlockInfo::new(),
})
}
}
/// A data block to represent a fixed size list
#[derive(Debug)]
pub struct FixedSizeListBlock {
Expand Down Expand Up @@ -586,6 +633,7 @@ impl VariableWidthBlock {
pub struct StructDataBlock {
/// The child arrays
pub children: Vec<DataBlock>,
pub block_info: BlockInfo,
}

impl StructDataBlock {
Expand Down Expand Up @@ -619,6 +667,7 @@ impl StructDataBlock {
.into_iter()
.map(|c| c.remove_validity())
.collect(),
block_info: self.block_info,
}
}

Expand All @@ -636,6 +685,7 @@ impl StructDataBlock {
.iter_mut()
.map(|c| c.borrow_and_clone())
.collect(),
block_info: self.block_info.clone(),
}
}

Expand All @@ -646,8 +696,16 @@ impl StructDataBlock {
.iter()
.map(|c| c.try_clone())
.collect::<Result<_>>()?,
block_info: self.block_info.clone(),
})
}

pub fn data_size(&self) -> u64 {
self.children
.iter()
.map(|data_block| data_block.data_size())
.sum()
}
}

/// A data block for dictionary encoded data
Expand Down Expand Up @@ -900,6 +958,18 @@ impl DataBlock {
inner.dimension,
))
}
Self::Struct(struct_data_block) => {
let mut bits_per_values = vec![];
for child in struct_data_block.children.iter() {
let child = child.as_fixed_width_ref().
expect("Currently StructDataBlockBuilder is only used in packed-struct encoding, and currently in packed-struct encoding, only fixed-width fields are supported.");
bits_per_values.push(child.bits_per_value as u32);
}
Box::new(StructDataBlockBuilder::new(
bits_per_values,
estimated_size_bytes,
))
}
_ => todo!(),
}
}
Expand Down Expand Up @@ -1359,7 +1429,10 @@ impl DataBlock {
.collect::<Vec<_>>();
children.push(Self::from_arrays(&child_vec, num_values));
}
Self::Struct(StructDataBlock { children })
Self::Struct(StructDataBlock {
children,
block_info: BlockInfo::default(),
})
}
DataType::FixedSizeList(_, dim) => {
let children = arrays
Expand Down
21 changes: 21 additions & 0 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ use crate::encodings::physical::binary::{BinaryBlockDecompressor, BinaryMiniBloc
use crate::encodings::physical::bitpack_fastlanes::BitpackMiniBlockDecompressor;
use crate::encodings::physical::fixed_size_list::FslPerValueDecompressor;
use crate::encodings::physical::fsst::FsstMiniBlockDecompressor;
use crate::encodings::physical::struct_encoding::PackedStructFixedWidthMiniBlockDecompressor;
use crate::encodings::physical::value::{ConstantDecompressor, ValueDecompressor};
use crate::encodings::physical::{ColumnBuffers, FileBuffers};
use crate::format::pb::{self, column_encoding};
Expand Down Expand Up @@ -512,6 +513,11 @@ impl DecompressorStrategy for CoreDecompressorStrategy {
pb::array_encoding::ArrayEncoding::FsstMiniBlock(description) => {
Ok(Box::new(FsstMiniBlockDecompressor::new(description)))
}
pb::array_encoding::ArrayEncoding::PackedStructFixedWidthMiniBlock(description) => {
Ok(Box::new(PackedStructFixedWidthMiniBlockDecompressor::new(
description,
)))
}
_ => todo!(),
}
}
Expand Down Expand Up @@ -752,11 +758,26 @@ impl CoreFieldDecoderStrategy {
column_info.as_ref(),
self.decompressor_strategy.as_ref(),
)?);

// advance to the next top level column
column_infos.next_top_level();

return Ok(scheduler);
}
match &data_type {
DataType::Struct(fields) => {
if field.is_packed_struct() {
let column_info = column_infos.expect_next()?;
let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
column_info.as_ref(),
self.decompressor_strategy.as_ref(),
)?);

// advance to the next top level column
column_infos.next_top_level();

return Ok(scheduler);
}
let mut child_schedulers = Vec::with_capacity(field.children.len());
for field in field.children.iter() {
let field_scheduler =
Expand Down
20 changes: 14 additions & 6 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::encodings::physical::dictionary::AlreadyDictionaryEncoder;
use crate::encodings::physical::fixed_size_list::FslPerValueCompressor;
use crate::encodings::physical::fsst::{FsstArrayEncoder, FsstMiniBlockEncoder};
use crate::encodings::physical::packed_struct::PackedStructEncoder;
use crate::encodings::physical::struct_encoding::PackedStructFixedWidthMiniBlockEncoder;
use crate::format::ProtobufUtils;
use crate::repdef::RepDefBuilder;
use crate::statistics::{GetStat, Stat};
Expand Down Expand Up @@ -832,6 +833,18 @@ impl CompressionStrategy for CoreArrayEncodingStrategy {
return Ok(Box::new(BinaryMiniBlockEncoder::default()));
}
}
if let DataBlock::Struct(ref struct_data_block) = data {
// this condition is actually checked at `PrimitiveStructuralEncoder::do_flush`,
// just being cautious here.
if struct_data_block
.children
.iter()
.any(|child| !matches!(child, DataBlock::FixedWidth(_)))
{
panic!("packed struct encoding currently only supports fixed-width fields.")
}
return Ok(Box::new(PackedStructFixedWidthMiniBlockEncoder::default()));
}
Ok(Box::new(ValueEncoder::default()))
}

Expand Down Expand Up @@ -1225,12 +1238,7 @@ impl FieldEncodingStrategy for StructuralEncodingStrategy {
Ok(Box::new(ListStructuralEncoder::new(child_encoder)))
}
DataType::Struct(_) => {
let field_metadata = &field.metadata;
if field_metadata
.get("packed")
.map(|v| v == "true")
.unwrap_or(false)
{
if field.is_packed_struct() {
Ok(Box::new(PrimitiveStructuralEncoder::try_new(
options,
self.compression_strategy.clone(),
Expand Down
12 changes: 12 additions & 0 deletions rust/lance-encoding/src/encodings/logical/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2599,6 +2599,18 @@ impl PrimitiveStructuralEncoder {
Self::encode_simple_all_null(column_idx, num_values, row_number)
} else {
let data_block = DataBlock::from_arrays(&arrays, num_values);

// if the `data_block` is a `StructDataBlock`, then this is a struct with packed struct encoding.
if let DataBlock::Struct(ref struct_data_block) = data_block {
if struct_data_block
.children
.iter()
.any(|child| !matches!(child, DataBlock::FixedWidth(_)))
{
panic!("packed struct encoding currently only supports fixed-width fields.")
}
}

const DICTIONARY_ENCODING_THRESHOLD: u64 = 100;
let cardinality =
if let Some(cardinality_array) = data_block.get_stat(Stat::Cardinality) {
Expand Down
11 changes: 10 additions & 1 deletion rust/lance-encoding/src/encodings/logical/struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use futures::{
FutureExt, StreamExt, TryStreamExt,
};
use itertools::Itertools;
use lance_arrow::FieldExt;
use log::trace;
use snafu::{location, Location};

Expand Down Expand Up @@ -607,7 +608,15 @@ impl StructuralStructDecoder {
should_validate: bool,
) -> Box<dyn StructuralFieldDecoder> {
match field.data_type() {
DataType::Struct(fields) => Box::new(Self::new(fields.clone(), should_validate, false)),
DataType::Struct(fields) => {
if field.is_packed_struct() {
let decoder =
StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate);
Box::new(decoder)
} else {
Box::new(Self::new(fields.clone(), should_validate, false))
}
}
DataType::List(child_field) | DataType::LargeList(child_field) => {
let child_decoder = Self::field_to_decoder(child_field, should_validate);
Box::new(StructuralListDecoder::new(
Expand Down
Loading

0 comments on commit c4cb87a

Please sign in to comment.