Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
wgtmac committed Jul 10, 2024
1 parent cc1e950 commit 90caf32
Show file tree
Hide file tree
Showing 5 changed files with 373 additions and 20 deletions.
3 changes: 2 additions & 1 deletion cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1250,7 +1250,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
properties->data_page_version() == ParquetDataPageVersion::V2 ||
properties->page_index_enabled(descr_->path());

if (properties->size_statistics_level() != SizeStatisticsLevel::NONE) {
if (properties->size_statistics_level() == SizeStatisticsLevel::CHUNK ||
properties->size_statistics_level() == SizeStatisticsLevel::PAGE) {
page_size_stats_builder_ = SizeStatisticsBuilder::Make(descr_);
chunk_size_stats_ = page_size_stats_builder_->Build();
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/parquet/page_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ class ColumnIndexBuilderImpl final : public ColumnIndexBuilder {
column_index_.null_counts.clear();
}

if (size_stats) {
if (size_stats != nullptr) {
const auto& page_ref_level_hist = size_stats->repetition_level_histogram();
const auto& page_def_level_hist = size_stats->definition_level_histogram();
column_index_.repetition_level_histograms.insert(
Expand Down Expand Up @@ -696,7 +696,7 @@ class OffsetIndexBuilderImpl final : public OffsetIndexBuilder {
if (offset_index_.page_locations.size() ==
offset_index_.unencoded_byte_array_data_bytes.size()) {
offset_index_.__isset.unencoded_byte_array_data_bytes = true;
} else {
} else if (!offset_index_.unencoded_byte_array_data_bytes.empty()) {
std::stringstream ss;
ss << "Invalid count of unencoded BYTE_ARRAY data bytes: "
<< offset_index_.unencoded_byte_array_data_bytes.size()
Expand Down
111 changes: 102 additions & 9 deletions cpp/src/parquet/page_index_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -419,15 +419,20 @@ TEST(PageIndex, DeterminePageIndexRangesInRowGroupWithMissingPageIndex) {
-1);
}

TEST(PageIndex, WriteOffsetIndex) {
void TestWriteOffsetIndex(bool write_size_stats) {
/// Create offset index via the OffsetIndexBuilder interface.
auto builder = OffsetIndexBuilder::Make();
const size_t num_pages = 5;
const std::vector<int64_t> offsets = {100, 200, 300, 400, 500};
const std::vector<int32_t> page_sizes = {1024, 2048, 3072, 4096, 8192};
const std::vector<int64_t> first_row_indices = {0, 10000, 20000, 30000, 40000};
const std::vector<int64_t> unencoded_byte_array_lengths = {1111, 2222, 0, 3333, 4444};
for (size_t i = 0; i < num_pages; ++i) {
builder->AddPage(offsets[i], page_sizes[i], first_row_indices[i]);
auto unencoded_byte_array_length =
write_size_stats ? std::make_optional(unencoded_byte_array_lengths[i])
: std::nullopt;
builder->AddPage(offsets[i], page_sizes[i], first_row_indices[i],
unencoded_byte_array_length);
}
const int64_t final_position = 4096;
builder->Finish(final_position);
Expand All @@ -446,23 +451,76 @@ TEST(PageIndex, WriteOffsetIndex) {
/// Verify the data of the offset index.
for (const auto& offset_index : offset_indexes) {
ASSERT_EQ(num_pages, offset_index->page_locations().size());
if (write_size_stats) {
ASSERT_EQ(num_pages, offset_index->unencoded_byte_array_data_bytes().size());
} else {
ASSERT_TRUE(offset_index->unencoded_byte_array_data_bytes().empty());
}
for (size_t i = 0; i < num_pages; ++i) {
const auto& page_location = offset_index->page_locations().at(i);
ASSERT_EQ(offsets[i] + final_position, page_location.offset);
ASSERT_EQ(page_sizes[i], page_location.compressed_page_size);
ASSERT_EQ(first_row_indices[i], page_location.first_row_index);
if (write_size_stats) {
ASSERT_EQ(unencoded_byte_array_lengths[i],
offset_index->unencoded_byte_array_data_bytes()[i]);
}
}
}
}

TEST(PageIndex, WriteOffsetIndexWithoutSizeStats) {
TestWriteOffsetIndex(/*write_size_stats=*/false);
}

TEST(PageIndex, WriteOffsetIndexWithSizeStats) {
TestWriteOffsetIndex(/*write_size_stats=*/true);
}

struct PageLevelHistogram {
std::vector<int16_t> rep_levels;
std::vector<int16_t> def_levels;
};

std::unique_ptr<SizeStatistics> ConstructFakeSizeStatistics(
const ColumnDescriptor* descr, const PageLevelHistogram& page_level_histogram) {
auto builder = SizeStatisticsBuilder::Make(descr);
for (int16_t level = 0; level <= descr->max_repetition_level(); ++level) {
builder->AddRepetitionLevel(page_level_histogram.rep_levels[level], level);
}
for (int16_t level = 0; level <= descr->max_definition_level(); ++level) {
builder->AddDefinitionLevel(page_level_histogram.def_levels[level], level);
}
return builder->Build();
}

void VerifyPageLevelHistogram(int16_t max_level, size_t page_id,
const std::vector<int16_t>& expected_page_levels,
const std::vector<int64_t>& all_page_levels) {
const size_t offset = page_id * (max_level + 1);
for (int16_t level = 0; level <= max_level; ++level) {
ASSERT_EQ(expected_page_levels[level], all_page_levels[offset + level]);
}
}

void TestWriteTypedColumnIndex(schema::NodePtr node,
const std::vector<EncodedStatistics>& page_stats,
BoundaryOrder::type boundary_order, bool has_null_counts) {
auto descr = std::make_unique<ColumnDescriptor>(node, /*max_definition_level=*/1, 0);

BoundaryOrder::type boundary_order, bool has_null_counts,
int16_t max_definition_level = 1,
int16_t max_repetition_level = 0,
const std::vector<PageLevelHistogram>& page_levels = {}) {
const bool build_size_stats = !page_levels.empty();
if (build_size_stats) {
ASSERT_EQ(page_levels.size(), page_stats.size());
}
auto descr = std::make_unique<ColumnDescriptor>(node, max_definition_level,
max_repetition_level);
auto builder = ColumnIndexBuilder::Make(descr.get());
for (const auto& stats : page_stats) {
builder->AddPage(stats);
for (size_t i = 0; i < page_stats.size(); ++i) {
auto size_stats = build_size_stats
? ConstructFakeSizeStatistics(descr.get(), page_levels[i])
: nullptr;
builder->AddPage(page_stats[i], size_stats.get());
}
ASSERT_NO_THROW(builder->Finish());

Expand All @@ -482,13 +540,28 @@ void TestWriteTypedColumnIndex(schema::NodePtr node,
ASSERT_EQ(boundary_order, column_index->boundary_order());
ASSERT_EQ(has_null_counts, column_index->has_null_counts());
const size_t num_pages = column_index->null_pages().size();
if (build_size_stats) {
ASSERT_EQ(num_pages * (max_repetition_level + 1),
column_index->repetition_level_histograms().size());
ASSERT_EQ(num_pages * (max_definition_level + 1),
column_index->definition_level_histograms().size());
}

for (size_t i = 0; i < num_pages; ++i) {
ASSERT_EQ(page_stats[i].all_null_value, column_index->null_pages()[i]);
ASSERT_EQ(page_stats[i].min(), column_index->encoded_min_values()[i]);
ASSERT_EQ(page_stats[i].max(), column_index->encoded_max_values()[i]);
if (has_null_counts) {
ASSERT_EQ(page_stats[i].null_count, column_index->null_counts()[i]);
}
if (build_size_stats) {
ASSERT_NO_FATAL_FAILURE(
VerifyPageLevelHistogram(max_repetition_level, i, page_levels[i].rep_levels,
column_index->repetition_level_histograms()));
ASSERT_NO_FATAL_FAILURE(
VerifyPageLevelHistogram(max_definition_level, i, page_levels[i].def_levels,
column_index->definition_level_histograms()));
}
}
}
}
Expand Down Expand Up @@ -651,6 +724,28 @@ TEST(PageIndex, WriteColumnIndexWithCorruptedStats) {
EXPECT_EQ(0, buffer->size());
}

TEST(PageIndex, WriteInt64ColumnIndexWithSizeStats) {
auto encode = [=](int64_t value) {
return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
};

// Integer values in the descending order.
std::vector<EncodedStatistics> page_stats(3);
page_stats.at(0).set_null_count(4).set_min(encode(-1)).set_max(encode(-2));
page_stats.at(1).set_null_count(0).set_min(encode(-2)).set_max(encode(-3));
page_stats.at(2).set_null_count(4).set_min(encode(-3)).set_max(encode(-4));

// Page level histograms.
std::vector<PageLevelHistogram> page_levels;
page_levels.push_back(PageLevelHistogram{{1, 2, 3}, {2, 4, 6, 8}});
page_levels.push_back(PageLevelHistogram{{2, 3, 4}, {1, 3, 5, 7}});
page_levels.push_back(PageLevelHistogram{{3, 4, 5}, {0, 2, 4, 6}});

TestWriteTypedColumnIndex(schema::Int64("c1"), page_stats, BoundaryOrder::Descending,
/*has_null_counts=*/true, /*max_definition_level=*/3,
/*max_repetition_level=*/2, page_levels);
}

TEST(PageIndex, TestPageIndexBuilderWithZeroRowGroup) {
schema::NodeVector fields = {schema::Int32("c1"), schema::ByteArray("c2")};
schema::NodePtr root = schema::GroupNode::Make("schema", Repetition::REPEATED, fields);
Expand Down Expand Up @@ -852,6 +947,4 @@ TEST_F(PageIndexBuilderTest, TwoRowGroups) {
CheckOffsetIndex(/*row_group=*/1, /*column=*/1, page_locations[1][1], final_position);
}

// TODO: add test for size stats

} // namespace parquet
3 changes: 2 additions & 1 deletion cpp/src/parquet/size_statistics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ std::unique_ptr<SizeStatistics> SizeStatistics::Make(const void* size_statistics

class SizeStatisticsBuilder::SizeStatisticsBuilderImpl {
public:
SizeStatisticsBuilderImpl(const ColumnDescriptor* descr)
explicit SizeStatisticsBuilderImpl(const ColumnDescriptor* descr)
: rep_level_histogram_(descr->max_repetition_level() + 1, 0),
def_level_histogram_(descr->max_definition_level() + 1, 0) {
if (descr->physical_type() == Type::BYTE_ARRAY) {
Expand Down Expand Up @@ -178,6 +178,7 @@ class SizeStatisticsBuilder::SizeStatisticsBuilderImpl {
::arrow::VisitArraySpanInline<::arrow::LargeBinaryType>(
*values.data(), std::move(valid_func), std::move(null_func));
} else {
// TODO: support StringViewType and BinaryViewType
throw ParquetException("Unsupported type: " + values.type()->ToString());
}

Expand Down
Loading

0 comments on commit 90caf32

Please sign in to comment.