Skip to content

Commit

Permalink
Revert modified dictionary filter processing logic to optimize late m…
Browse files Browse the repository at this point in the history
…aterialization.
  • Loading branch information
kaka11chen committed Dec 8, 2024
1 parent 3509b9a commit 03df851
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 7 deletions.
32 changes: 25 additions & 7 deletions be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ Status RowGroupReader::init(
_tuple_descriptor = tuple_descriptor;
_row_descriptor = row_descriptor;
_col_name_to_slot_id = colname_to_slot_id;
if (not_single_slot_filter_conjuncts != nullptr && !not_single_slot_filter_conjuncts->empty()) {
_not_single_slot_filter_conjuncts.insert(_not_single_slot_filter_conjuncts.end(),
not_single_slot_filter_conjuncts->begin(),
not_single_slot_filter_conjuncts->end());
}
_slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
_merge_read_ranges(row_ranges);
if (_read_columns.empty()) {
Expand Down Expand Up @@ -136,11 +141,6 @@ Status RowGroupReader::init(
_column_readers[read_col] = std::move(reader);
}
// Check if single slot can be filtered by dict.
if (not_single_slot_filter_conjuncts != nullptr && !not_single_slot_filter_conjuncts->empty()) {
_filter_conjuncts.insert(_filter_conjuncts.end(), not_single_slot_filter_conjuncts->begin(),
not_single_slot_filter_conjuncts->end());
return Status::OK();
}
if (!_slot_id_to_filter_conjuncts) {
return Status::OK();
}
Expand Down Expand Up @@ -363,8 +363,17 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_

RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, result_filter));
Block::erase_useless_column(block, column_to_keep);
_convert_dict_cols_to_string_cols(block);
if (!_not_single_slot_filter_conjuncts.empty()) {
_convert_dict_cols_to_string_cols(block);
SCOPED_RAW_TIMER(&_predicate_filter_time);
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
_not_single_slot_filter_conjuncts, block, columns_to_filter,
column_to_keep)));
} else {
Block::erase_useless_column(block, column_to_keep);
_convert_dict_cols_to_string_cols(block);
}
} else {
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(_filter_block(block, column_to_keep, columns_to_filter)));
Expand Down Expand Up @@ -595,6 +604,15 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
*batch_eof = pre_eof;
RETURN_IF_ERROR(_fill_partition_columns(block, column_size, _lazy_read_ctx.partition_columns));
RETURN_IF_ERROR(_fill_missing_columns(block, column_size, _lazy_read_ctx.missing_columns));
if (!_not_single_slot_filter_conjuncts.empty()) {
{
SCOPED_RAW_TIMER(&_predicate_filter_time);
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
_not_single_slot_filter_conjuncts, block, columns_to_filter,
origin_column_num)));
}
}
return Status::OK();
}

Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/parquet/vparquet_group_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ class RowGroupReader : public ProfileCollector {
const TupleDescriptor* _tuple_descriptor = nullptr;
const RowDescriptor* _row_descriptor = nullptr;
const std::unordered_map<std::string, int>* _col_name_to_slot_id = nullptr;
VExprContextSPtrs _not_single_slot_filter_conjuncts;
const std::unordered_map<int, VExprContextSPtrs>* _slot_id_to_filter_conjuncts = nullptr;
VExprContextSPtrs _dict_filter_conjuncts;
VExprContextSPtrs _filter_conjuncts;
Expand Down

0 comments on commit 03df851

Please sign in to comment.