Skip to content

Commit

Permalink
address a couple of feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
wgtmac committed Sep 20, 2024
1 parent d4fbf92 commit a1e80eb
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 42 deletions.
1 change: 1 addition & 0 deletions cpp/src/parquet/column_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class DataPage : public Page {
EncodedStatistics statistics_;
/// Row ordinal within the row group to the first row in the data page.
std::optional<int64_t> first_row_index_;
/// Size statistics for the data page. It may be null if unavailable.
std::shared_ptr<SizeStatistics> size_statistics_;
};

Expand Down
28 changes: 18 additions & 10 deletions cpp/src/parquet/column_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,11 @@ class ColumnWriterImpl {
// Serializes Dictionary Page if enabled
virtual void WriteDictionaryPage() = 0;

using StatisticsPair = std::pair<EncodedStatistics, std::shared_ptr<SizeStatistics>>;
// A convenience struct to combine the encoded statistics and the size statistics
struct StatisticsPair {
EncodedStatistics encoded_stats; // required
std::shared_ptr<SizeStatistics> size_statistics; // may be null if disabled
};

// Plain-encoded statistics of the current page
virtual StatisticsPair GetPageStatistics() = 0;
Expand Down Expand Up @@ -1091,9 +1095,7 @@ int64_t ColumnWriterImpl::Close() {

FlushBufferedDataPages();

EncodedStatistics chunk_statistics;
std::shared_ptr<SizeStatistics> chunk_size_stats;
std::tie(chunk_statistics, chunk_size_stats) = GetChunkStatistics();
auto [chunk_statistics, chunk_size_stats] = GetChunkStatistics();
chunk_statistics.ApplyStatSizeLimits(
properties_->max_statistics_size(descr_->path()));
chunk_statistics.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
Expand Down Expand Up @@ -1372,17 +1374,23 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<

StatisticsPair GetPageStatistics() override {
StatisticsPair result;
if (page_statistics_) result.first = page_statistics_->Encode();
if (page_statistics_) {
result.encoded_stats = page_statistics_->Encode();
}
if (properties_->size_statistics_level() == SizeStatisticsLevel::Page) {
result.second = page_size_stats_builder_->Build();
result.size_statistics = page_size_stats_builder_->Build();
}
return result;
}

StatisticsPair GetChunkStatistics() override {
StatisticsPair result;
if (chunk_statistics_) result.first = chunk_statistics_->Encode();
if (chunk_size_stats_) result.second = chunk_size_stats_;
if (chunk_statistics_) {
result.encoded_stats = chunk_statistics_->Encode();
}
if (chunk_size_stats_) {
result.size_statistics = chunk_size_stats_;
}
return result;
}

Expand Down Expand Up @@ -1602,14 +1610,14 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
page_size_stats_builder_->AddDefinitionLevels(
{def_levels, static_cast<size_t>(num_levels)});
} else {
page_size_stats_builder_->AddDefinitionLevel(num_levels, /*def_level=*/0);
page_size_stats_builder_->AddRepeatedDefinitionLevels(num_levels, /*def_level=*/0);
}

if (descr_->max_repetition_level() > 0) {
page_size_stats_builder_->AddRepetitionLevels(
{rep_levels, static_cast<size_t>(num_levels)});
} else {
page_size_stats_builder_->AddRepetitionLevel(num_levels, /*rep_level=*/0);
page_size_stats_builder_->AddRepeatedRepetitionLevels(num_levels, /*rep_level=*/0);
}
}

Expand Down
20 changes: 9 additions & 11 deletions cpp/src/parquet/page_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -574,16 +574,14 @@ class ColumnIndexBuilderImpl final : public ColumnIndexBuilder {
const int64_t num_pages = column_index_.null_pages.size();
const int64_t rep_level_hist_size = column_index_.repetition_level_histograms.size();
const int64_t def_level_hist_size = column_index_.definition_level_histograms.size();
if (rep_level_hist_size == (descr_->max_repetition_level() + 1) * num_pages) {
column_index_.__isset.repetition_level_histograms = true;
} else {
column_index_.repetition_level_histograms.clear();
}
if (def_level_hist_size == (descr_->max_definition_level() + 1) * num_pages) {
column_index_.__isset.definition_level_histograms = true;
} else {
column_index_.definition_level_histograms.clear();
}
ARROW_CHECK(rep_level_hist_size == 0 ||
rep_level_hist_size == (descr_->max_repetition_level() + 1) * num_pages);
ARROW_CHECK(def_level_hist_size == 0 ||
def_level_hist_size == (descr_->max_definition_level() + 1) * num_pages);
column_index_.__isset.repetition_level_histograms =
!column_index_.repetition_level_histograms.empty();
column_index_.__isset.definition_level_histograms =
!column_index_.definition_level_histograms.empty();
}

void WriteTo(::arrow::io::OutputStream* sink, Encryptor* encryptor) const override {
Expand Down Expand Up @@ -692,7 +690,7 @@ class OffsetIndexBuilderImpl final : public OffsetIndexBuilder {
}
}

/// Finalize unencoded_byte_array_data_bytes and make sure page sizes match.
// Finalize unencoded_byte_array_data_bytes and make sure page sizes match.
if (offset_index_.page_locations.size() ==
offset_index_.unencoded_byte_array_data_bytes.size()) {
offset_index_.__isset.unencoded_byte_array_data_bytes = true;
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/parquet/page_index_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -486,10 +486,10 @@ std::unique_ptr<SizeStatistics> ConstructFakeSizeStatistics(
const ColumnDescriptor* descr, const PageLevelHistogram& page_level_histogram) {
auto builder = SizeStatisticsBuilder::Make(descr);
for (int16_t level = 0; level <= descr->max_repetition_level(); ++level) {
builder->AddRepetitionLevel(page_level_histogram.rep_levels[level], level);
builder->AddRepeatedRepetitionLevels(page_level_histogram.rep_levels[level], level);
}
for (int16_t level = 0; level <= descr->max_definition_level(); ++level) {
builder->AddDefinitionLevel(page_level_histogram.def_levels[level], level);
builder->AddRepeatedDefinitionLevels(page_level_histogram.def_levels[level], level);
}
return builder->Build();
}
Expand Down
26 changes: 14 additions & 12 deletions cpp/src/parquet/size_statistics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,26 +120,26 @@ class SizeStatisticsBuilder::Impl {
}
}

void WriteRepetitionLevels(::arrow::util::span<const int16_t> rep_levels) {
void AddRepetitionLevels(::arrow::util::span<const int16_t> rep_levels) {
for (int16_t rep_level : rep_levels) {
ARROW_DCHECK_LT(rep_level, static_cast<int16_t>(rep_level_histogram_.size()));
rep_level_histogram_[rep_level]++;
++rep_level_histogram_[rep_level];
}
}

void WriteDefinitionLevels(::arrow::util::span<const int16_t> def_levels) {
void AddDefinitionLevels(::arrow::util::span<const int16_t> def_levels) {
for (int16_t def_level : def_levels) {
ARROW_DCHECK_LT(def_level, static_cast<int16_t>(def_level_histogram_.size()));
def_level_histogram_[def_level]++;
++def_level_histogram_[def_level];
}
}

void WriteRepetitionLevel(int64_t num_levels, int16_t rep_level) {
void AddRepeatedRepetitionLevels(int64_t num_levels, int16_t rep_level) {
ARROW_DCHECK_LT(rep_level, static_cast<int16_t>(rep_level_histogram_.size()));
rep_level_histogram_[rep_level] += num_levels;
}

void WriteDefinitionLevel(int64_t num_levels, int16_t def_level) {
void AddRepeatedDefinitionLevels(int64_t num_levels, int16_t def_level) {
ARROW_DCHECK_LT(def_level, static_cast<int16_t>(def_level_histogram_.size()));
def_level_histogram_[def_level] += num_levels;
}
Expand Down Expand Up @@ -218,20 +218,22 @@ class SizeStatisticsBuilder::Impl {

void SizeStatisticsBuilder::AddRepetitionLevels(
::arrow::util::span<const int16_t> rep_levels) {
impl_->WriteRepetitionLevels(rep_levels);
impl_->AddRepetitionLevels(rep_levels);
}

void SizeStatisticsBuilder::AddDefinitionLevels(
::arrow::util::span<const int16_t> def_levels) {
impl_->WriteDefinitionLevels(def_levels);
impl_->AddDefinitionLevels(def_levels);
}

void SizeStatisticsBuilder::AddRepetitionLevel(int64_t num_levels, int16_t rep_level) {
impl_->WriteRepetitionLevel(num_levels, rep_level);
void SizeStatisticsBuilder::AddRepeatedRepetitionLevels(int64_t num_levels,
int16_t rep_level) {
impl_->AddRepeatedRepetitionLevels(num_levels, rep_level);
}

void SizeStatisticsBuilder::AddDefinitionLevel(int64_t num_levels, int16_t def_level) {
impl_->WriteDefinitionLevel(num_levels, def_level);
void SizeStatisticsBuilder::AddRepeatedDefinitionLevels(int64_t num_levels,
int16_t def_level) {
impl_->AddRepeatedDefinitionLevels(num_levels, def_level);
}

void SizeStatisticsBuilder::AddValuesSpaced(const ByteArray* values,
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/parquet/size_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

#include <optional>

#include "arrow/util/span.h"
#include "parquet/platform.h"
#include "parquet/type_fwd.h"
#include "arrow/util/span.h"

namespace parquet {

Expand Down Expand Up @@ -120,12 +120,12 @@ class PARQUET_EXPORT SizeStatisticsBuilder {
/// \brief Add repeated repetition level to the histogram.
/// \param num_levels number of repetition levels to add.
/// \param rep_level repeated repetition level value.
void AddRepetitionLevel(int64_t num_levels, int16_t rep_level);
void AddRepeatedRepetitionLevels(int64_t num_levels, int16_t rep_level);

/// \brief Add repeated definition level to the histogram.
/// \param num_levels number of definition levels to add.
/// \param def_level repeated definition level value.
void AddDefinitionLevel(int64_t num_levels, int16_t def_level);
void AddRepeatedDefinitionLevels(int64_t num_levels, int16_t def_level);

/// \brief Add spaced BYTE_ARRAY values.
/// \param[in] values pointer to values of BYTE_ARRAY type.
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/parquet/size_statistics_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ TEST(SizeStatistics, WriteRepeatedLevels) {
constexpr int64_t kNumRounds = 10;
for (int64_t round = 1; round <= kNumRounds; round++) {
for (int16_t def_level = 0; def_level <= kMaxDefLevel; def_level++) {
builder->AddDefinitionLevel(/*num_levels=*/round + def_level, def_level);
builder->AddRepeatedDefinitionLevels(/*num_levels=*/round + def_level, def_level);
}
for (int16_t rep_level = 0; rep_level <= kMaxRepLevel; rep_level++) {
builder->AddRepetitionLevel(/*num_levels=*/round + rep_level * rep_level,
rep_level);
builder->AddRepeatedRepetitionLevels(/*num_levels=*/round + rep_level * rep_level,
rep_level);
}
}

Expand Down

0 comments on commit a1e80eb

Please sign in to comment.