Skip to content

Commit

Permalink
[opt](parquet-reader)Implement late materialization of parquet comple…
Browse files Browse the repository at this point in the history
…x types.
  • Loading branch information
kaka11chen committed Nov 18, 2024
1 parent 74238ae commit 5ef9e17
Show file tree
Hide file tree
Showing 7 changed files with 317 additions and 210 deletions.
152 changes: 53 additions & 99 deletions be/src/vec/exec/format/parquet/parquet_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,19 @@ const int32_t ParquetInt96::JULIAN_EPOCH_OFFSET_DAYS = 2440588;
const int64_t ParquetInt96::MICROS_IN_DAY = 86400000000;
const int64_t ParquetInt96::NANOS_PER_MICROSECOND = 1000;

ColumnSelectVector::ColumnSelectVector(const uint8_t* filter_map, size_t filter_map_size,
bool filter_all) {
build(filter_map, filter_map_size, filter_all);
}

void ColumnSelectVector::build(const uint8_t* filter_map, size_t filter_map_size, bool filter_all) {
Status FilterMap::init(const uint8_t* filter_map_data, size_t filter_map_size, bool filter_all) {
_filter_all = filter_all;
_filter_map = filter_map;
_filter_map_data = filter_map_data;
_filter_map_size = filter_map_size;
if (filter_all) {
_has_filter = true;
_filter_ratio = 1;
} else if (filter_map == nullptr) {
} else if (filter_map_data == nullptr) {
_has_filter = false;
_filter_ratio = 0;
} else {
size_t filter_count =
simd::count_zero_num(reinterpret_cast<const int8_t*>(filter_map), filter_map_size);
size_t filter_count = simd::count_zero_num(reinterpret_cast<const int8_t*>(filter_map_data),
filter_map_size);
if (filter_count == filter_map_size) {
_has_filter = true;
_filter_all = true;
Expand All @@ -58,109 +53,68 @@ void ColumnSelectVector::build(const uint8_t* filter_map, size_t filter_map_size
_filter_ratio = 0;
}
}
return Status::OK();
}

void ColumnSelectVector::set_run_length_null_map(const std::vector<uint16_t>& run_length_null_map,
size_t num_values, NullMap* null_map) {
_num_values = num_values;
_num_nulls = 0;
_read_index = 0;
size_t map_index = 0;
bool is_null = false;
if (_has_filter) {
// No run length null map is generated when _filter_all = true
DCHECK(!_filter_all);
_data_map.resize(num_values);
for (auto& run_length : run_length_null_map) {
if (is_null) {
_num_nulls += run_length;
for (int i = 0; i < run_length; ++i) {
_data_map[map_index++] = FILTERED_NULL;
}
} else {
for (int i = 0; i < run_length; ++i) {
_data_map[map_index++] = FILTERED_CONTENT;
}
}
is_null = !is_null;
}
size_t num_read = 0;
DCHECK_LE(_filter_map_index + num_values, _filter_map_size);
for (size_t i = 0; i < num_values; ++i) {
if (_filter_map[_filter_map_index++]) {
_data_map[i] = _data_map[i] == FILTERED_NULL ? NULL_DATA : CONTENT;
num_read++;
}
}
_num_filtered = num_values - num_read;
if (null_map != nullptr && num_read > 0) {
NullMap& map_data_column = *null_map;
auto null_map_index = map_data_column.size();
map_data_column.resize(null_map_index + num_read);
if (_num_nulls == 0) {
memset(map_data_column.data() + null_map_index, 0, num_read);
} else if (_num_nulls == num_values) {
memset(map_data_column.data() + null_map_index, 1, num_read);
} else {
for (size_t i = 0; i < num_values; ++i) {
if (_data_map[i] == CONTENT) {
map_data_column[null_map_index++] = (UInt8) false;
} else if (_data_map[i] == NULL_DATA) {
map_data_column[null_map_index++] = (UInt8) true;
}
}
}
}
} else {
_num_filtered = 0;
_run_length_null_map = &run_length_null_map;
if (null_map != nullptr) {
NullMap& map_data_column = *null_map;
auto null_map_index = map_data_column.size();
map_data_column.resize(null_map_index + num_values);

for (auto& run_length : run_length_null_map) {
if (is_null) {
memset(map_data_column.data() + null_map_index, 1, run_length);
null_map_index += run_length;
_num_nulls += run_length;
} else {
memset(map_data_column.data() + null_map_index, 0, run_length);
null_map_index += run_length;
}
is_null = !is_null;
}
} else {
for (auto& run_length : run_length_null_map) {
if (is_null) {
_num_nulls += run_length;
}
is_null = !is_null;
}
}
}
}

bool ColumnSelectVector::can_filter_all(size_t remaining_num_values) {
bool FilterMap::can_filter_all(size_t remaining_num_values, size_t filter_map_index) {
if (!_has_filter) {
return false;
}
if (_filter_all) {
// all data in normal columns can be skipped when _filter_all = true,
// so the remaining_num_values should be less than the remaining filter map size.
DCHECK_LE(remaining_num_values + _filter_map_index, _filter_map_size);
DCHECK_LE(remaining_num_values + filter_map_index, _filter_map_size);
// return true always, to make sure that the data in normal columns can be skipped.
return true;
}
if (remaining_num_values + _filter_map_index > _filter_map_size) {
if (remaining_num_values + filter_map_index > _filter_map_size) {
return false;
}
return simd::count_zero_num(reinterpret_cast<const int8_t*>(_filter_map + _filter_map_index),
remaining_num_values) == remaining_num_values;
}
return simd::count_zero_num(
reinterpret_cast<const int8_t*>(_filter_map_data + filter_map_index),
remaining_num_values) == remaining_num_values;
}

Status FilterMap::generate_nested_filter_map(const std::vector<level_t>& rep_levels,
std::vector<uint8_t>& nested_filter_map_data,
std::unique_ptr<FilterMap>* nested_filter_map,
size_t* current_row_ptr, bool is_cross_page,
size_t start_index) const {
if (!has_filter() || filter_all()) {
*nested_filter_map = std::make_unique<FilterMap>();
return Status::OK();
}

if (rep_levels.empty()) {
return Status::OK();
}

nested_filter_map_data.resize(rep_levels.size());

void ColumnSelectVector::skip(size_t num_values) {
_filter_map_index += num_values;
size_t current_row = current_row_ptr ? *current_row_ptr : 0;

for (size_t i = start_index; i < rep_levels.size(); i++) {
if (!is_cross_page && i > start_index && rep_levels[i] == 0) {
current_row++;
if (current_row >= _filter_map_size) {
return Status::InvalidArgument(
fmt::format("Filter map size {} is not enough for {} rows",
_filter_map_size, current_row + 1));
}
}
nested_filter_map_data[i] = _filter_map_data[current_row];
}

if (current_row_ptr) {
*current_row_ptr = current_row;
}

auto new_filter = std::make_unique<FilterMap>();
RETURN_IF_ERROR(
new_filter->init(nested_filter_map_data.data(), nested_filter_map_data.size(), false));
*nested_filter_map = std::move(new_filter);

return Status::OK();
}

ParsedVersion::ParsedVersion(std::string application, std::optional<std::string> version,
Expand Down
146 changes: 112 additions & 34 deletions be/src/vec/exec/format/parquet/parquet_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,45 +69,134 @@ struct ParquetInt96 {
#pragma pack()
static_assert(sizeof(ParquetInt96) == 12, "The size of ParquetInt96 is not 12.");

class FilterMap {
public:
FilterMap() = default;
Status init(const uint8_t* filter_map_data, size_t filter_map_size, bool filter_all);

Status generate_nested_filter_map(const std::vector<level_t>& rep_levels,
std::vector<uint8_t>& nested_filter_map_data,
std::unique_ptr<FilterMap>* nested_filter_map,
size_t* current_row_ptr, // 当前处理到哪一行
bool is_cross_page, // 是否是跨页的情况
size_t start_index = 0) const; // rep_levels的起始处理位置

const uint8_t* filter_map_data() const { return _filter_map_data; }
size_t filter_map_size() const { return _filter_map_size; }
bool has_filter() const { return _has_filter; }
bool filter_all() const { return _filter_all; }
double filter_ratio() const { return _has_filter ? _filter_ratio : 0; }

bool can_filter_all(size_t remaining_num_values, size_t filter_map_index);

private:
bool _has_filter = false;
bool _filter_all = false;
const uint8_t* _filter_map_data = nullptr;
size_t _filter_map_size = 0;
double _filter_ratio = 0;
};

class ColumnSelectVector {
public:
enum DataReadType : uint8_t { CONTENT = 0, NULL_DATA, FILTERED_CONTENT, FILTERED_NULL };

ColumnSelectVector(const uint8_t* filter_map, size_t filter_map_size, bool filter_all);

ColumnSelectVector() = default;

void build(const uint8_t* filter_map, size_t filter_map_size, bool filter_all);

const uint8_t* filter_map() { return _filter_map; }
Status init(const std::vector<uint16_t>& run_length_null_map, size_t num_values,
NullMap* null_map, FilterMap* filter_map, size_t filter_map_index) {
_num_values = num_values;
_num_nulls = 0;
_read_index = 0;
size_t map_index = 0;
bool is_null = false;
_has_filter = filter_map->has_filter();
if (filter_map->has_filter()) {
// No run length null map is generated when _filter_all = true
// DCHECK(!filter_map->filter_all());
_data_map.resize(num_values);
for (auto& run_length : run_length_null_map) {
if (is_null) {
_num_nulls += run_length;
for (int i = 0; i < run_length; ++i) {
_data_map[map_index++] = FILTERED_NULL;
}
} else {
for (int i = 0; i < run_length; ++i) {
_data_map[map_index++] = FILTERED_CONTENT;
}
}
is_null = !is_null;
}
size_t num_read = 0;
DCHECK_LE(filter_map_index + num_values, filter_map->filter_map_size());
for (size_t i = 0; i < num_values; ++i) {
if (filter_map->filter_map_data()[filter_map_index++]) {
_data_map[i] = _data_map[i] == FILTERED_NULL ? NULL_DATA : CONTENT;
num_read++;
}
}
_num_filtered = num_values - num_read;
if (null_map != nullptr && num_read > 0) {
NullMap& map_data_column = *null_map;
auto null_map_index = map_data_column.size();
map_data_column.resize(null_map_index + num_read);
if (_num_nulls == 0) {
memset(map_data_column.data() + null_map_index, 0, num_read);
} else if (_num_nulls == num_values) {
memset(map_data_column.data() + null_map_index, 1, num_read);
} else {
for (size_t i = 0; i < num_values; ++i) {
if (_data_map[i] == CONTENT) {
map_data_column[null_map_index++] = (UInt8) false;
} else if (_data_map[i] == NULL_DATA) {
map_data_column[null_map_index++] = (UInt8) true;
}
}
}
}
} else {
_num_filtered = 0;
_run_length_null_map = &run_length_null_map;
if (null_map != nullptr) {
NullMap& map_data_column = *null_map;
auto null_map_index = map_data_column.size();
map_data_column.resize(null_map_index + num_values);

for (auto& run_length : run_length_null_map) {
if (is_null) {
memset(map_data_column.data() + null_map_index, 1, run_length);
null_map_index += run_length;
_num_nulls += run_length;
} else {
memset(map_data_column.data() + null_map_index, 0, run_length);
null_map_index += run_length;
}
is_null = !is_null;
}
} else {
for (auto& run_length : run_length_null_map) {
if (is_null) {
_num_nulls += run_length;
}
is_null = !is_null;
}
}
}
return Status::OK();
}

size_t num_values() const { return _num_values; }

size_t num_nulls() const { return _num_nulls; }

size_t num_filtered() const { return _num_filtered; }

double filter_ratio() const { return _has_filter ? _filter_ratio : 0; }

void fallback_filter() { _has_filter = false; }

bool has_filter() const { return _has_filter; }

bool can_filter_all(size_t remaining_num_values);

bool filter_all() const { return _filter_all; }

void skip(size_t num_values);

void reset() {
if (_has_filter) {
_filter_map_index = 0;
}
}

template <bool has_filter>
size_t get_next_run(DataReadType* data_read_type) {
DCHECK_EQ(_has_filter, has_filter);
// DCHECK_EQ(_has_filter, has_filter);
if constexpr (has_filter) {
if (_read_index == _num_values) {
return 0;
Expand Down Expand Up @@ -137,22 +226,11 @@ class ColumnSelectVector {
}
}

void set_run_length_null_map(const std::vector<uint16_t>& run_length_null_map,
size_t num_values, NullMap* null_map = nullptr);

private:
std::vector<DataReadType> _data_map;
// the length of non-null values and null values are arranged in turn.
const std::vector<uint16_t>* _run_length_null_map;
bool _has_filter = false;
// only used when the whole batch is skipped
bool _filter_all = false;
const uint8_t* _filter_map = nullptr;
size_t _filter_map_size = 0;
double _filter_ratio = 0;
size_t _filter_map_index = 0;

// generated in set_run_length_null_map
bool _has_filter;
size_t _num_values;
size_t _num_nulls;
size_t _num_filtered;
Expand Down
Loading

0 comments on commit 5ef9e17

Please sign in to comment.