Skip to content

Commit

Permalink
[Opt](multi-catalog)Disable dict filter in parquet/orc reader if have…
Browse files Browse the repository at this point in the history
… non-single conjuncts.
  • Loading branch information
kaka11chen committed Nov 29, 2024
1 parent cb04281 commit 3c6b546
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 92 deletions.
55 changes: 21 additions & 34 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1120,18 +1120,22 @@ Status OrcReader::set_fill_columns(
}
}

if (!_slot_id_to_filter_conjuncts) {
return Status::OK();
}

// Add predicate_partition_columns in _slot_id_to_filter_conjuncts(single slot conjuncts)
// to _filter_conjuncts, others should be added from not_single_slot_filter_conjuncts.
for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
auto& [value, slot_desc] = kv.second;
auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id());
if (iter != _slot_id_to_filter_conjuncts->end()) {
for (auto& ctx : iter->second) {
_filter_conjuncts.push_back(ctx);
if (!_not_single_slot_filter_conjuncts.empty()) {
_filter_conjuncts.insert(_filter_conjuncts.end(), _not_single_slot_filter_conjuncts.begin(),
_not_single_slot_filter_conjuncts.end());
_disable_dict_filter = true;
}

if (_slot_id_to_filter_conjuncts && !_slot_id_to_filter_conjuncts->empty()) {
// Add predicate_partition_columns in _slot_id_to_filter_conjuncts(single slot conjuncts)
// to _filter_conjuncts, others should be added from not_single_slot_filter_conjuncts.
for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
auto& [value, slot_desc] = kv.second;
auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id());
if (iter != _slot_id_to_filter_conjuncts->end()) {
for (auto& ctx : iter->second) {
_filter_conjuncts.push_back(ctx);
}
}
}
}
Expand Down Expand Up @@ -1891,16 +1895,8 @@ Status OrcReader::get_next_block_impl(Block* block, size_t* read_rows, bool* eof
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, *_filter));
}
if (!_not_single_slot_filter_conjuncts.empty()) {
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
_not_single_slot_filter_conjuncts, block, columns_to_filter,
column_to_keep)));
} else {
Block::erase_useless_column(block, column_to_keep);
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
}
Block::erase_useless_column(block, column_to_keep);
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
*read_rows = block->rows();
} else {
uint64_t rr;
Expand Down Expand Up @@ -2017,17 +2013,8 @@ Status OrcReader::get_next_block_impl(Block* block, size_t* read_rows, bool* eof
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, result_filter));
}
//_not_single_slot_filter_conjuncts check : missing column1 == missing column2 , missing column == exists column ...
if (!_not_single_slot_filter_conjuncts.empty()) {
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
_not_single_slot_filter_conjuncts, block, columns_to_filter,
column_to_keep)));
} else {
Block::erase_useless_column(block, column_to_keep);
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
}
Block::erase_useless_column(block, column_to_keep);
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
} else {
if (_delete_rows_filter_ptr) {
_execute_filter_position_delete_rowids(*_delete_rows_filter_ptr);
Expand Down Expand Up @@ -2208,7 +2195,7 @@ Status OrcReader::fill_dict_filter_column_names(
int i = 0;
for (auto& predicate_col_name : predicate_col_names) {
int slot_id = predicate_col_slot_ids[i];
if (_can_filter_by_dict(slot_id)) {
if (!_disable_dict_filter && _can_filter_by_dict(slot_id)) {
_dict_filter_cols.emplace_back(std::make_pair(predicate_col_name, slot_id));
column_names.emplace_back(_col_name_to_file_col_name[predicate_col_name]);
} else {
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ class OrcReader : public GenericReader {
VExprContextSPtrs _dict_filter_conjuncts;
VExprContextSPtrs _non_dict_filter_conjuncts;
VExprContextSPtrs _filter_conjuncts;
bool _disable_dict_filter = false;
// std::pair<col_name, slot_id>
std::vector<std::pair<std::string, int>> _dict_filter_cols;
std::shared_ptr<ObjectPool> _obj_pool;
Expand Down
98 changes: 41 additions & 57 deletions be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,6 @@ Status RowGroupReader::init(
_tuple_descriptor = tuple_descriptor;
_row_descriptor = row_descriptor;
_col_name_to_slot_id = colname_to_slot_id;
if (not_single_slot_filter_conjuncts != nullptr && !not_single_slot_filter_conjuncts->empty()) {
_not_single_slot_filter_conjuncts.insert(_not_single_slot_filter_conjuncts.end(),
not_single_slot_filter_conjuncts->begin(),
not_single_slot_filter_conjuncts->end());
}
_slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
_merge_read_ranges(row_ranges);
if (_read_columns.empty()) {
Expand All @@ -140,45 +135,52 @@ Status RowGroupReader::init(
}
_column_readers[read_col] = std::move(reader);
}
// Check if single slot can be filtered by dict.
if (!_slot_id_to_filter_conjuncts) {
return Status::OK();

bool disable_dict_filter = false;
if (not_single_slot_filter_conjuncts != nullptr && !not_single_slot_filter_conjuncts->empty()) {
disable_dict_filter = true;
_filter_conjuncts.insert(_filter_conjuncts.end(), not_single_slot_filter_conjuncts->begin(),
not_single_slot_filter_conjuncts->end());
}
const std::vector<string>& predicate_col_names = _lazy_read_ctx.predicate_columns.first;
const std::vector<int>& predicate_col_slot_ids = _lazy_read_ctx.predicate_columns.second;
for (size_t i = 0; i < predicate_col_names.size(); ++i) {
const string& predicate_col_name = predicate_col_names[i];
int slot_id = predicate_col_slot_ids[i];
auto field = const_cast<FieldSchema*>(schema.get_column(predicate_col_name));
if (!_lazy_read_ctx.has_complex_type &&
_can_filter_by_dict(slot_id,
_row_group_meta.columns[field->physical_column_index].meta_data)) {
_dict_filter_cols.emplace_back(std::make_pair(predicate_col_name, slot_id));
} else {
if (_slot_id_to_filter_conjuncts->find(slot_id) !=
_slot_id_to_filter_conjuncts->end()) {
for (auto& ctx : _slot_id_to_filter_conjuncts->at(slot_id)) {
_filter_conjuncts.push_back(ctx);

// Check if single slot can be filtered by dict.
if (_slot_id_to_filter_conjuncts && !_slot_id_to_filter_conjuncts->empty()) {
const std::vector<string>& predicate_col_names = _lazy_read_ctx.predicate_columns.first;
const std::vector<int>& predicate_col_slot_ids = _lazy_read_ctx.predicate_columns.second;
for (size_t i = 0; i < predicate_col_names.size(); ++i) {
const string& predicate_col_name = predicate_col_names[i];
int slot_id = predicate_col_slot_ids[i];
auto field = const_cast<FieldSchema*>(schema.get_column(predicate_col_name));
if (!disable_dict_filter && !_lazy_read_ctx.has_complex_type &&
_can_filter_by_dict(
slot_id, _row_group_meta.columns[field->physical_column_index].meta_data)) {
_dict_filter_cols.emplace_back(std::make_pair(predicate_col_name, slot_id));
} else {
if (_slot_id_to_filter_conjuncts->find(slot_id) !=
_slot_id_to_filter_conjuncts->end()) {
for (auto& ctx : _slot_id_to_filter_conjuncts->at(slot_id)) {
_filter_conjuncts.push_back(ctx);
}
}
}
}
}
// Add predicate_partition_columns in _slot_id_to_filter_conjuncts(single slot conjuncts)
// to _filter_conjuncts, others should be added from not_single_slot_filter_conjuncts.
for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
auto& [value, slot_desc] = kv.second;
auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id());
if (iter != _slot_id_to_filter_conjuncts->end()) {
for (auto& ctx : iter->second) {
_filter_conjuncts.push_back(ctx);
// Add predicate_partition_columns in _slot_id_to_filter_conjuncts(single slot conjuncts)
// to _filter_conjuncts, others should be added from not_single_slot_filter_conjuncts.
for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
auto& [value, slot_desc] = kv.second;
auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id());
if (iter != _slot_id_to_filter_conjuncts->end()) {
for (auto& ctx : iter->second) {
_filter_conjuncts.push_back(ctx);
}
}
}
//For check missing column : missing column == xx, missing column is null,missing column is not null.
_filter_conjuncts.insert(_filter_conjuncts.end(),
_lazy_read_ctx.missing_columns_conjuncts.begin(),
_lazy_read_ctx.missing_columns_conjuncts.end());
RETURN_IF_ERROR(_rewrite_dict_predicates());
}
//For check missing column : missing column == xx, missing column is null,missing column is not null.
_filter_conjuncts.insert(_filter_conjuncts.end(),
_lazy_read_ctx.missing_columns_conjuncts.begin(),
_lazy_read_ctx.missing_columns_conjuncts.end());
RETURN_IF_ERROR(_rewrite_dict_predicates());
return Status::OK();
}

Expand Down Expand Up @@ -363,17 +365,8 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_

RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, result_filter));
if (!_not_single_slot_filter_conjuncts.empty()) {
_convert_dict_cols_to_string_cols(block);
SCOPED_RAW_TIMER(&_predicate_filter_time);
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
_not_single_slot_filter_conjuncts, block, columns_to_filter,
column_to_keep)));
} else {
Block::erase_useless_column(block, column_to_keep);
_convert_dict_cols_to_string_cols(block);
}
Block::erase_useless_column(block, column_to_keep);
_convert_dict_cols_to_string_cols(block);
} else {
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(_filter_block(block, column_to_keep, columns_to_filter)));
Expand Down Expand Up @@ -601,15 +594,6 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
*batch_eof = pre_eof;
RETURN_IF_ERROR(_fill_partition_columns(block, column_size, _lazy_read_ctx.partition_columns));
RETURN_IF_ERROR(_fill_missing_columns(block, column_size, _lazy_read_ctx.missing_columns));
if (!_not_single_slot_filter_conjuncts.empty()) {
{
SCOPED_RAW_TIMER(&_predicate_filter_time);
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
_not_single_slot_filter_conjuncts, block, columns_to_filter,
origin_column_num)));
}
}
return Status::OK();
}

Expand Down
1 change: 0 additions & 1 deletion be/src/vec/exec/format/parquet/vparquet_group_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ class RowGroupReader : public ProfileCollector {
const TupleDescriptor* _tuple_descriptor = nullptr;
const RowDescriptor* _row_descriptor = nullptr;
const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr;
VExprContextSPtrs _not_single_slot_filter_conjuncts;
const std::unordered_map<int, VExprContextSPtrs>* _slot_id_to_filter_conjuncts = nullptr;
VExprContextSPtrs _dict_filter_conjuncts;
VExprContextSPtrs _filter_conjuncts;
Expand Down

0 comments on commit 3c6b546

Please sign in to comment.