Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](orc) check all the cases before build_search_argument #44615

Merged
merged 7 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 88 additions & 88 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <memory>
#include <ostream>
#include <tuple>
#include <utility>

#include "cctz/civil_time.h"
#include "cctz/time_zone.h"
Expand Down Expand Up @@ -567,12 +568,14 @@ std::tuple<bool, orc::Literal> convert_to_orc_literal(const orc::Type* type,

std::tuple<bool, orc::Literal, orc::PredicateDataType> OrcReader::_make_orc_literal(
const VSlotRef* slot_ref, const VLiteral* literal) {
DCHECK(_col_name_to_file_col_name_low_case.contains(slot_ref->expr_name()));
auto file_col_name_low_case = _col_name_to_file_col_name_low_case[slot_ref->expr_name()];
if (!_type_map.contains(file_col_name_low_case)) {
// TODO: this is for acid table
LOG(WARNING) << "Column " << slot_ref->expr_name() << " not found in _type_map";
return std::make_tuple(false, orc::Literal(false), orc::PredicateDataType::LONG);
}
DCHECK(_type_map.contains(file_col_name_low_case));
const auto* orc_type = _type_map[file_col_name_low_case];
if (!TYPEKIND_TO_PREDICATE_TYPE.contains(orc_type->getKind())) {
LOG(WARNING) << "Unsupported Push Down Orc Type [TypeKind=" << orc_type->getKind() << "]";
Expand Down Expand Up @@ -624,15 +627,37 @@ std::tuple<bool, orc::Literal, orc::PredicateDataType> OrcReader::_make_orc_lite
}
}

// check if the slot of expr can be pushed down to orc reader
// check if the slot of expr can be pushed down to orc reader and make orc predicate type
bool OrcReader::_check_slot_can_push_down(const VExprSPtr& expr) {
if (!expr->children()[0]->is_slot_ref()) {
return false;
}
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
// check if the slot exists in orc file and not partition column
return _col_name_to_file_col_name.contains(slot_ref->expr_name()) &&
!_lazy_read_ctx.predicate_partition_columns.contains(slot_ref->expr_name());
if (!_col_name_to_file_col_name.contains(slot_ref->expr_name()) ||
_lazy_read_ctx.predicate_partition_columns.contains(slot_ref->expr_name())) {
return false;
}
auto [valid, _, predicate_type] = _make_orc_literal(slot_ref, nullptr);
if (valid) {
_vslot_ref_to_orc_predicate_data_type[slot_ref] = predicate_type;
}
return valid;
}

// check if the literal of expr can be pushed down to orc reader and make orc literal
bool OrcReader::_check_literal_can_push_down(const VExprSPtr& expr, uint16_t child_id) {
if (!expr->children()[child_id]->is_literal()) {
return false;
}
// the slot has been checked in _check_slot_can_push_down before calling this function
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
const auto* literal = static_cast<const VLiteral*>(expr->children()[child_id].get());
auto [valid, orc_literal, _] = _make_orc_literal(slot_ref, literal);
if (valid) {
_vliteral_to_orc_literal.insert(std::make_pair(literal, orc_literal));
}
return valid;
}

// check if there are rest children of expr can be pushed down to orc reader
Expand All @@ -642,7 +667,7 @@ bool OrcReader::_check_rest_children_can_push_down(const VExprSPtr& expr) {
}

for (size_t i = 1; i < expr->children().size(); ++i) {
if (!expr->children()[i]->is_literal()) {
if (!_check_literal_can_push_down(expr, i)) {
return false;
}
}
Expand All @@ -651,7 +676,10 @@ bool OrcReader::_check_rest_children_can_push_down(const VExprSPtr& expr) {

// check if the expr can be pushed down to orc reader
bool OrcReader::_check_expr_can_push_down(const VExprSPtr& expr) {
DCHECK(expr != nullptr);
if (expr == nullptr) {
return false;
}

switch (expr->op()) {
case TExprOpcode::COMPOUND_AND:
// at least one child can be pushed down
Expand Down Expand Up @@ -693,198 +721,167 @@ bool OrcReader::_check_expr_can_push_down(const VExprSPtr& expr) {
}
}

bool OrcReader::_build_less_than(const VExprSPtr& expr,
void OrcReader::_build_less_than(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() == 2);
DCHECK(expr->children()[0]->is_slot_ref());
DCHECK(expr->children()[1]->is_literal());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
const auto* literal = static_cast<const VLiteral*>(expr->children()[1].get());
auto [valid, orc_literal, predicate_type] = _make_orc_literal(slot_ref, literal);
if (!valid) {
return false;
}
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
auto predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
DCHECK(_vliteral_to_orc_literal.contains(literal));
auto orc_literal = _vliteral_to_orc_literal.find(literal)->second;
builder->lessThan(slot_ref->expr_name(), predicate_type, orc_literal);
return true;
}

bool OrcReader::_build_less_than_equals(const VExprSPtr& expr,
void OrcReader::_build_less_than_equals(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() == 2);
DCHECK(expr->children()[0]->is_slot_ref());
DCHECK(expr->children()[1]->is_literal());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
const auto* literal = static_cast<const VLiteral*>(expr->children()[1].get());
auto [valid, orc_literal, predicate_type] = _make_orc_literal(slot_ref, literal);
if (!valid) {
return false;
}
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
auto predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
DCHECK(_vliteral_to_orc_literal.contains(literal));
auto orc_literal = _vliteral_to_orc_literal.find(literal)->second;
builder->lessThanEquals(slot_ref->expr_name(), predicate_type, orc_literal);
return true;
}

bool OrcReader::_build_equals(const VExprSPtr& expr,
void OrcReader::_build_equals(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() == 2);
DCHECK(expr->children()[0]->is_slot_ref());
DCHECK(expr->children()[1]->is_literal());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
const auto* literal = static_cast<const VLiteral*>(expr->children()[1].get());
auto [valid, orc_literal, predicate_type] = _make_orc_literal(slot_ref, literal);
if (!valid) {
return false;
}
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
auto predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
DCHECK(_vliteral_to_orc_literal.contains(literal));
auto orc_literal = _vliteral_to_orc_literal.find(literal)->second;
builder->equals(slot_ref->expr_name(), predicate_type, orc_literal);
return true;
}

bool OrcReader::_build_filter_in(const VExprSPtr& expr,
void OrcReader::_build_filter_in(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() >= 2);
DCHECK(expr->children()[0]->is_slot_ref());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
std::vector<orc::Literal> literals;
orc::PredicateDataType predicate_type = orc::PredicateDataType::LONG;
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
orc::PredicateDataType predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
for (size_t i = 1; i < expr->children().size(); ++i) {
DCHECK(expr->children()[i]->is_literal());
const auto* literal = static_cast<const VLiteral*>(expr->children()[i].get());
auto [valid, orc_literal, type] = _make_orc_literal(slot_ref, literal);
if (!valid) {
return false;
}
DCHECK(_vliteral_to_orc_literal.contains(literal));
auto orc_literal = _vliteral_to_orc_literal.find(literal)->second;
literals.emplace_back(orc_literal);
predicate_type = type;
}
DCHECK(!literals.empty());
builder->in(slot_ref->expr_name(), predicate_type, literals);
return true;
}

bool OrcReader::_build_is_null(const VExprSPtr& expr,
void OrcReader::_build_is_null(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
DCHECK(expr->children().size() == 1);
DCHECK(expr->children()[0]->is_slot_ref());
const auto* slot_ref = static_cast<const VSlotRef*>(expr->children()[0].get());
auto [valid, _, predicate_type] = _make_orc_literal(slot_ref, nullptr);
DCHECK(_vslot_ref_to_orc_predicate_data_type.contains(slot_ref));
auto predicate_type = _vslot_ref_to_orc_predicate_data_type[slot_ref];
builder->isNull(slot_ref->expr_name(), predicate_type);
return true;
}

bool OrcReader::_build_search_argument(const VExprSPtr& expr,
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
if (expr == nullptr) {
return false;
}

// if expr can not be pushed down, skip it and continue to next expr
// OPTIMIZE: check expr only once
if (!_check_expr_can_push_down(expr)) {
return false;
}

switch (expr->op()) {
case TExprOpcode::COMPOUND_AND: {
bool at_least_one_can_push_down = false;
builder->startAnd();
bool at_least_one_can_push_down = false;
for (const auto& child : expr->children()) {
if (_build_search_argument(child, builder)) {
at_least_one_can_push_down = true;
}
}
if (!at_least_one_can_push_down) {
// if all exprs can not be pushed down, builder->end() will throw exception
return false;
}
DCHECK(at_least_one_can_push_down);
builder->end();
break;
}
case TExprOpcode::COMPOUND_OR:
case TExprOpcode::COMPOUND_OR: {
builder->startOr();
bool all_can_push_down = true;
for (const auto& child : expr->children()) {
if (!_build_search_argument(child, builder)) {
return false;
all_can_push_down = false;
}
}
DCHECK(all_can_push_down);
builder->end();
break;
case TExprOpcode::COMPOUND_NOT:
builder->startNot();
}
case TExprOpcode::COMPOUND_NOT: {
DCHECK_EQ(expr->children().size(), 1);
if (!_build_search_argument(expr->children()[0], builder)) {
return false;
}
builder->startNot();
auto res = _build_search_argument(expr->children()[0], builder);
DCHECK(res);
builder->end();
break;
}
case TExprOpcode::GE:
builder->startNot();
if (!_build_less_than(expr, builder)) {
return false;
}
_build_less_than(expr, builder);
builder->end();
break;
case TExprOpcode::GT:
builder->startNot();
if (!_build_less_than_equals(expr, builder)) {
return false;
}
_build_less_than_equals(expr, builder);
builder->end();
break;
case TExprOpcode::LE:
if (!_build_less_than_equals(expr, builder)) {
return false;
}
_build_less_than_equals(expr, builder);
break;
case TExprOpcode::LT:
if (!_build_less_than(expr, builder)) {
return false;
}
_build_less_than(expr, builder);
break;
case TExprOpcode::EQ:
if (!_build_equals(expr, builder)) {
return false;
}
_build_equals(expr, builder);
break;
case TExprOpcode::NE:
builder->startNot();
if (!_build_equals(expr, builder)) {
return false;
}
_build_equals(expr, builder);
builder->end();
break;
case TExprOpcode::FILTER_IN:
if (!_build_filter_in(expr, builder)) {
return false;
}
_build_filter_in(expr, builder);
break;
case TExprOpcode::FILTER_NOT_IN:
builder->startNot();
if (!_build_filter_in(expr, builder)) {
return false;
}
_build_filter_in(expr, builder);
builder->end();
break;
// is null and is not null is represented as function call
case TExprOpcode::INVALID_OPCODE: {
case TExprOpcode::INVALID_OPCODE:
DCHECK(expr->node_type() == TExprNodeType::FUNCTION_CALL);
if (expr->fn().name.function_name == "is_null_pred") {
if (!_build_is_null(expr, builder)) {
return false;
}
_build_is_null(expr, builder);
} else if (expr->fn().name.function_name == "is_not_null_pred") {
builder->startNot();
if (!_build_is_null(expr, builder)) {
return false;
}
_build_is_null(expr, builder);
builder->end();
} else {
// should not reach here, because _check_expr_can_push_down has already checked
__builtin_unreachable();
}
break;
}
default: {

default:
// should not reach here, because _check_expr_can_push_down has already checked
__builtin_unreachable();
}
}
return true;
}

Expand All @@ -898,6 +895,8 @@ bool OrcReader::_init_search_argument(const VExprContextSPtrs& conjuncts) {
bool at_least_one_can_push_down = false;
builder->startAnd();
for (const auto& expr_ctx : conjuncts) {
_vslot_ref_to_orc_predicate_data_type.clear();
_vliteral_to_orc_literal.clear();
if (_build_search_argument(expr_ctx->root(), builder)) {
at_least_one_can_push_down = true;
}
Expand Down Expand Up @@ -943,7 +942,7 @@ Status OrcReader::set_fill_columns(
visit_slot(child.get());
}
} else if (VInPredicate* in_predicate = typeid_cast<VInPredicate*>(filter_impl)) {
if (in_predicate->get_num_children() > 0) {
if (!in_predicate->children().empty()) {
visit_slot(in_predicate->children()[0].get());
}
} else {
Expand Down Expand Up @@ -1179,7 +1178,8 @@ Status OrcReader::_fill_partition_columns(
if (num_deserialized != rows) {
return Status::InternalError(
"Failed to fill partition column: {}={} ."
"Number of rows expected to be written : {}, number of rows actually written : "
"Number of rows expected to be written : {}, number of rows actually "
"written : "
"{}",
slot_desc->col_name(), value, num_deserialized, rows);
}
Expand Down
Loading
Loading