Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](orc) check all the cases before build_search_argument #44615

Merged
merged 7 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 86 additions & 88 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <memory>
#include <ostream>
#include <tuple>
#include <utility>

#include "cctz/civil_time.h"
#include "cctz/time_zone.h"
Expand Down Expand Up @@ -624,15 +625,37 @@ std::tuple<bool, orc::Literal, orc::PredicateDataType> OrcReader::_make_orc_lite
}
}

// check if the slot of expr can be pushed down to orc reader
// check if the slot of expr can be pushed down to orc reader and make orc predicate type
bool OrcReader::_check_slot_can_push_down(const VExprSPtr& expr) {
if (!expr->children()[0]->is_slot_ref()) {
return false;
}
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
// check if the slot exists in orc file and not partition column
return _col_name_to_file_col_name.contains(slot_ref->expr_name()) &&
!_lazy_read_ctx.predicate_partition_columns.contains(slot_ref->expr_name());
if (!_col_name_to_file_col_name.contains(slot_ref->expr_name()) &&
suxiaogang223 marked this conversation as resolved.
Show resolved Hide resolved
_lazy_read_ctx.predicate_partition_columns.contains(slot_ref->expr_name())) {
return false;
}
auto [valid, _, predicate_type] = _make_orc_literal(slot_ref, nullptr);
if (valid) {
_vslot_ref_to_orc_predicate_data_type[slot_ref] = predicate_type;
}
return valid;
}

// check if the literal of expr can be pushed down to orc reader and make orc literal
bool OrcReader::_check_literal_can_push_down(const VExprSPtr& expr, uint16_t child_id) {
if (!expr->children()[child_id]->is_literal()) {
return false;
}
// the slot has been checked in _check_slot_can_push_down before calling this function
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
const auto* literal = static_cast<const VLiteral*>(expr->children()[child_id].get());
auto [valid, orc_literal, _] = _make_orc_literal(slot_ref, literal);
if (valid) {
_vliteral_to_orc_literal.insert(std::make_pair(literal, orc_literal));
}
return valid;
}

// check if there are rest children of expr can be pushed down to orc reader
Expand All @@ -642,7 +665,7 @@ bool OrcReader::_check_rest_children_can_push_down(const VExprSPtr& expr) {
}

for (size_t i = 1; i < expr->children().size(); ++i) {
if (!expr->children()[i]->is_literal()) {
if (!_check_literal_can_push_down(expr, i)) {
return false;
}
}
Expand All @@ -651,7 +674,10 @@ bool OrcReader::_check_rest_children_can_push_down(const VExprSPtr& expr) {

// check if the expr can be pushed down to orc reader
bool OrcReader::_check_expr_can_push_down(const VExprSPtr& expr) {
DCHECK(expr != nullptr);
if (expr == nullptr) {
return false;
}

switch (expr->op()) {
case TExprOpcode::COMPOUND_AND:
// at least one child can be pushed down
Expand Down Expand Up @@ -693,198 +719,167 @@ bool OrcReader::_check_expr_can_push_down(const VExprSPtr& expr) {
}
}

bool OrcReader::_build_less_than(const VExprSPtr& expr,
void OrcReader::_build_less_than(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() == 2);
DCHECK(expr->children()[0]->is_slot_ref());
DCHECK(expr->children()[1]->is_literal());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
const auto* literal = static_cast<const VLiteral*>(expr->children()[1].get());
auto [valid, orc_literal, predicate_type] = _make_orc_literal(slot_ref, literal);
if (!valid) {
return false;
}
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
auto predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
DCHECK(_vliteral_to_orc_literal.contains(literal));
auto orc_literal = _vliteral_to_orc_literal.find(literal)->second;
builder->lessThan(slot_ref->expr_name(), predicate_type, orc_literal);
return true;
}

bool OrcReader::_build_less_than_equals(const VExprSPtr& expr,
void OrcReader::_build_less_than_equals(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() == 2);
DCHECK(expr->children()[0]->is_slot_ref());
DCHECK(expr->children()[1]->is_literal());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
const auto* literal = static_cast<const VLiteral*>(expr->children()[1].get());
auto [valid, orc_literal, predicate_type] = _make_orc_literal(slot_ref, literal);
if (!valid) {
return false;
}
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
auto predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
DCHECK(_vliteral_to_orc_literal.contains(literal));
auto orc_literal = _vliteral_to_orc_literal.find(literal)->second;
builder->lessThanEquals(slot_ref->expr_name(), predicate_type, orc_literal);
return true;
}

bool OrcReader::_build_equals(const VExprSPtr& expr,
void OrcReader::_build_equals(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() == 2);
DCHECK(expr->children()[0]->is_slot_ref());
DCHECK(expr->children()[1]->is_literal());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
const auto* literal = static_cast<const VLiteral*>(expr->children()[1].get());
auto [valid, orc_literal, predicate_type] = _make_orc_literal(slot_ref, literal);
if (!valid) {
return false;
}
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
auto predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
DCHECK(_vliteral_to_orc_literal.contains(literal));
auto orc_literal = _vliteral_to_orc_literal.find(literal)->second;
builder->equals(slot_ref->expr_name(), predicate_type, orc_literal);
return true;
}

bool OrcReader::_build_filter_in(const VExprSPtr& expr,
void OrcReader::_build_filter_in(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() >= 2);
DCHECK(expr->children()[0]->is_slot_ref());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
std::vector<orc::Literal> literals;
orc::PredicateDataType predicate_type = orc::PredicateDataType::LONG;
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
orc::PredicateDataType predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
for (size_t i = 1; i < expr->children().size(); ++i) {
DCHECK(expr->children()[i]->is_literal());
const auto* literal = static_cast<const VLiteral*>(expr->children()[i].get());
auto [valid, orc_literal, type] = _make_orc_literal(slot_ref, literal);
if (!valid) {
return false;
}
DCHECK(_vliteral_to_orc_literal.contains(literal));
auto orc_literal = _vliteral_to_orc_literal.find(literal)->second;
literals.emplace_back(orc_literal);
predicate_type = type;
}
DCHECK(!literals.empty());
builder->in(slot_ref->expr_name(), predicate_type, literals);
return true;
}

bool OrcReader::_build_is_null(const VExprSPtr& expr,
void OrcReader::_build_is_null(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() == 1);
DCHECK(expr->children()[0]->is_slot_ref());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
auto [valid, _, predicate_type] = _make_orc_literal(slot_ref, nullptr);
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
auto predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
builder->isNull(slot_ref->expr_name(), predicate_type);
return true;
}

bool OrcReader::_build_search_argument(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
if (expr == nullptr) {
return false;
}

// if expr can not be pushed down, skip it and continue to next expr
// OPTIMIZE: check expr only once
if (!_check_expr_can_push_down(expr)) {
return false;
}

switch (expr->op()) {
case TExprOpcode::COMPOUND_AND: {
bool at_least_one_can_push_down = false;
builder->startAnd();
bool at_least_one_can_push_down = false;
for (const auto& child : expr->children()) {
if (_build_search_argument(child, builder)) {
at_least_one_can_push_down = true;
}
}
if (!at_least_one_can_push_down) {
// if all exprs can not be pushed down, builder->end() will throw exception
return false;
}
DCHECK(at_least_one_can_push_down);
builder->end();
break;
}
case TExprOpcode::COMPOUND_OR:
case TExprOpcode::COMPOUND_OR: {
builder->startOr();
bool all_can_push_down = true;
for (const auto& child : expr->children()) {
if (!_build_search_argument(child, builder)) {
return false;
all_can_push_down = false;
}
}
DCHECK(all_can_push_down);
builder->end();
break;
case TExprOpcode::COMPOUND_NOT:
builder->startNot();
}
case TExprOpcode::COMPOUND_NOT: {
DCHECK_EQ(expr->children().size(), 1);
if (!_build_search_argument(expr->children()[0], builder)) {
return false;
}
builder->startNot();
auto res = _build_search_argument(expr->children()[0], builder);
DCHECK(res);
builder->end();
break;
}
case TExprOpcode::GE:
builder->startNot();
if (!_build_less_than(expr, builder)) {
return false;
}
_build_less_than(expr, builder);
builder->end();
break;
case TExprOpcode::GT:
builder->startNot();
if (!_build_less_than_equals(expr, builder)) {
return false;
}
_build_less_than_equals(expr, builder);
builder->end();
break;
case TExprOpcode::LE:
if (!_build_less_than_equals(expr, builder)) {
return false;
}
_build_less_than_equals(expr, builder);
break;
case TExprOpcode::LT:
if (!_build_less_than(expr, builder)) {
return false;
}
_build_less_than(expr, builder);
break;
case TExprOpcode::EQ:
if (!_build_equals(expr, builder)) {
return false;
}
_build_equals(expr, builder);
break;
case TExprOpcode::NE:
builder->startNot();
if (!_build_equals(expr, builder)) {
return false;
}
_build_equals(expr, builder);
builder->end();
break;
case TExprOpcode::FILTER_IN:
if (!_build_filter_in(expr, builder)) {
return false;
}
_build_filter_in(expr, builder);
break;
case TExprOpcode::FILTER_NOT_IN:
builder->startNot();
if (!_build_filter_in(expr, builder)) {
return false;
}
_build_filter_in(expr, builder);
builder->end();
break;
// is null and is not null is represented as function call
case TExprOpcode::INVALID_OPCODE: {
case TExprOpcode::INVALID_OPCODE:
DCHECK(expr->node_type() == TExprNodeType::FUNCTION_CALL);
if (expr->fn().name.function_name == "is_null_pred") {
if (!_build_is_null(expr, builder)) {
return false;
}
_build_is_null(expr, builder);
} else if (expr->fn().name.function_name == "is_not_null_pred") {
builder->startNot();
if (!_build_is_null(expr, builder)) {
return false;
}
_build_is_null(expr, builder);
builder->end();
} else {
// should not reach here, because _check_expr_can_push_down has already checked
__builtin_unreachable();
}
break;
}
default: {

default:
// should not reach here, because _check_expr_can_push_down has already checked
__builtin_unreachable();
}
}
return true;
}

Expand All @@ -898,6 +893,8 @@ bool OrcReader::_init_search_argument(const VExprContextSPtrs& conjuncts) {
bool at_least_one_can_push_down = false;
builder->startAnd();
for (const auto& expr_ctx : conjuncts) {
_vslot_ref_to_orc_predicate_data_type.clear();
_vliteral_to_orc_literal.clear();
if (_build_search_argument(expr_ctx->root(), builder)) {
at_least_one_can_push_down = true;
}
Expand Down Expand Up @@ -943,7 +940,7 @@ Status OrcReader::set_fill_columns(
visit_slot(child.get());
}
} else if (VInPredicate* in_predicate = typeid_cast<VInPredicate*>(filter_impl)) {
if (in_predicate->get_num_children() > 0) {
if (!in_predicate->children().empty()) {
visit_slot(in_predicate->children()[0].get());
}
} else {
Expand Down Expand Up @@ -1179,7 +1176,8 @@ Status OrcReader::_fill_partition_columns(
if (num_deserialized != rows) {
return Status::InternalError(
"Failed to fill partition column: {}={} ."
"Number of rows expected to be written : {}, number of rows actually written : "
"Number of rows expected to be written : {}, number of rows actually "
"written : "
"{}",
slot_desc->col_name(), value, num_deserialized, rows);
}
Expand Down
18 changes: 13 additions & 5 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "orc/Reader.hh"
#include "orc/Type.hh"
#include "orc/Vector.hh"
#include "orc/sargs/Literal.hh"
#include "runtime/types.h"
#include "util/runtime_profile.h"
#include "vec/aggregate_functions/aggregate_function.h"
Expand Down Expand Up @@ -288,23 +289,27 @@ class OrcReader : public GenericReader {
bool* is_hive1_orc);
static bool _check_acid_schema(const orc::Type& type);
static const orc::Type& _remove_acid(const orc::Type& type);

// functions for building search argument until _init_search_argument
std::tuple<bool, orc::Literal, orc::PredicateDataType> _make_orc_literal(
const VSlotRef* slot_ref, const VLiteral* literal);
bool _check_slot_can_push_down(const VExprSPtr& expr);
bool _check_literal_can_push_down(const VExprSPtr& expr, uint16_t child_id);
bool _check_rest_children_can_push_down(const VExprSPtr& expr);
bool _check_expr_can_push_down(const VExprSPtr& expr);
bool _build_less_than(const VExprSPtr& expr,
void _build_less_than(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder);
bool _build_less_than_equals(const VExprSPtr& expr,
void _build_less_than_equals(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder);
bool _build_equals(const VExprSPtr& expr, std::unique_ptr<orc::SearchArgumentBuilder>& builder);
bool _build_filter_in(const VExprSPtr& expr,
void _build_equals(const VExprSPtr& expr, std::unique_ptr<orc::SearchArgumentBuilder>& builder);
void _build_filter_in(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder);
bool _build_is_null(const VExprSPtr& expr,
void _build_is_null(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder);
bool _build_search_argument(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder);
bool _init_search_argument(const VExprContextSPtrs& conjuncts);

void _init_bloom_filter(
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
void _init_system_properties();
Expand Down Expand Up @@ -644,6 +649,9 @@ class OrcReader : public GenericReader {
std::unordered_map<std::string, std::string> _table_col_to_file_col;
//support iceberg position delete .
std::vector<int64_t>* _position_delete_ordered_rowids = nullptr;
std::unordered_map<const VSlotRef*, orc::PredicateDataType>
_vslot_ref_to_orc_predicate_data_type;
std::unordered_map<const VLiteral*, orc::Literal> _vliteral_to_orc_literal;
};

class ORCFileInputStream : public orc::InputStream, public ProfileCollector {
Expand Down
Loading
Loading