Skip to content

Commit

Permalink
[Enchancement](join) remove tuple is null maintain on join operators (#…
Browse files Browse the repository at this point in the history
…46618)

### What problem does this PR solve?
tuple_is_null is useless now
  • Loading branch information
BiteTheDDDDt authored Jan 9, 2025
1 parent 56a61a9 commit 322de2b
Show file tree
Hide file tree
Showing 8 changed files with 8 additions and 167 deletions.
44 changes: 0 additions & 44 deletions be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,6 @@ Status HashJoinProbeLocalState::close(RuntimeState* state) {
}
_process_hashtable_ctx_variants = nullptr;
_null_map_column = nullptr;
_tuple_is_null_left_flag_column = nullptr;
_tuple_is_null_right_flag_column = nullptr;
_probe_block.clear();
return JoinProbeLocalState<HashJoinSharedState, HashJoinProbeLocalState>::close(state);
}
Expand All @@ -161,33 +159,6 @@ bool HashJoinProbeLocalState::_need_probe_null_map(vectorized::Block& block,
return false;
}

void HashJoinProbeLocalState::add_tuple_is_null_column(vectorized::Block* block) {
DCHECK(_parent->cast<HashJoinProbeOperatorX>()._is_outer_join);
if (!_parent->cast<HashJoinProbeOperatorX>()._use_specific_projections) {
return;
}
auto p0 = _tuple_is_null_left_flag_column->assume_mutable();
auto p1 = _tuple_is_null_right_flag_column->assume_mutable();
auto& left_null_map = reinterpret_cast<vectorized::ColumnUInt8&>(*p0);
auto& right_null_map = reinterpret_cast<vectorized::ColumnUInt8&>(*p1);
auto left_size = left_null_map.size();
auto right_size = right_null_map.size();

if (left_size == 0) {
DCHECK_EQ(right_size, block->rows());
left_null_map.get_data().resize_fill(right_size, 0);
}
if (right_size == 0) {
DCHECK_EQ(left_size, block->rows());
right_null_map.get_data().resize_fill(left_size, 0);
}

block->insert(
{std::move(p0), std::make_shared<vectorized::DataTypeUInt8>(), "left_tuples_is_null"});
block->insert(
{std::move(p1), std::make_shared<vectorized::DataTypeUInt8>(), "right_tuples_is_null"});
}

void HashJoinProbeLocalState::_prepare_probe_block() {
// clear_column_data of _probe_block
if (!_probe_column_disguise_null.empty()) {
Expand Down Expand Up @@ -258,16 +229,6 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
local_state._probe_block.insert({std::move(nullable_column), make_nullable(type),
_right_table_column_names[i]});
}
if (_is_outer_join) {
reinterpret_cast<vectorized::ColumnUInt8*>(
local_state._tuple_is_null_left_flag_column.get())
->get_data()
.resize_fill(block_rows, 0);
reinterpret_cast<vectorized::ColumnUInt8*>(
local_state._tuple_is_null_right_flag_column.get())
->get_data()
.resize_fill(block_rows, 1);
}

/// No need to check the block size in `_filter_data_and_build_output` because here dose not
/// increase the output rows count(just same as `_probe_block`'s rows count).
Expand Down Expand Up @@ -414,10 +375,6 @@ Status HashJoinProbeLocalState::filter_data_and_build_output(RuntimeState* state
bool* eos,
vectorized::Block* temp_block,
bool check_rows_count) {
auto& p = _parent->cast<HashJoinProbeOperatorX>();
if (p._is_outer_join) {
add_tuple_is_null_column(temp_block);
}
auto output_rows = temp_block->rows();
if (check_rows_count) {
DCHECK(output_rows <= state->batch_size());
Expand All @@ -429,7 +386,6 @@ Status HashJoinProbeLocalState::filter_data_and_build_output(RuntimeState* state
}

RETURN_IF_ERROR(_build_output_block(temp_block, output_block, false));
_reset_tuple_is_null_column();
reached_limit(output_block, eos);
return Status::OK();
}
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ class HashJoinProbeLocalState final
Status close(RuntimeState* state) override;

void prepare_for_next();
void add_tuple_is_null_column(vectorized::Block* block) override;
Status filter_data_and_build_output(RuntimeState* state, vectorized::Block* output_block,
bool* eos, vectorized::Block* temp_block,
bool check_rows_count = true);
Expand Down
7 changes: 1 addition & 6 deletions be/src/pipeline/exec/join/process_hash_table_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ struct ProcessHashTableProbe {

void probe_side_output_column(vectorized::MutableColumns& mcol,
const std::vector<bool>& output_slot_flags, int size,
int last_probe_index, bool all_match_one,
bool have_other_join_conjunct);
bool all_match_one, bool have_other_join_conjunct);

template <typename HashTableType>
Status process(HashTableType& hash_table_ctx, ConstNullMapPtr null_map,
Expand Down Expand Up @@ -111,10 +110,6 @@ struct ProcessHashTableProbe {
uint32_t _build_index_for_null_probe_key {0};

std::vector<int> _build_blocks_locs;
// only need set the tuple is null in RIGHT_OUTER_JOIN and FULL_OUTER_JOIN
vectorized::ColumnUInt8::Container* _tuple_is_null_left_flags = nullptr;
// only need set the tuple is null in LEFT_OUTER_JOIN and FULL_OUTER_JOIN
vectorized::ColumnUInt8::Container* _tuple_is_null_right_flags = nullptr;

size_t _serialized_key_buffer_size {0};
uint8_t* _serialized_key_buffer = nullptr;
Expand Down
53 changes: 7 additions & 46 deletions be/src/pipeline/exec/join/process_hash_table_probe_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,6 @@ ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinProbeLocalState
: _parent(parent),
_batch_size(batch_size),
_build_block(parent->build_block()),
_tuple_is_null_left_flags(parent->is_outer_join()
? &(reinterpret_cast<vectorized::ColumnUInt8&>(
*parent->_tuple_is_null_left_flag_column)
.get_data())
: nullptr),
_tuple_is_null_right_flags(parent->is_outer_join()
? &(reinterpret_cast<vectorized::ColumnUInt8&>(
*parent->_tuple_is_null_right_flag_column)
.get_data())
: nullptr),
_have_other_join_conjunct(parent->have_other_join_conjunct()),
_is_right_semi_anti(parent->is_right_semi_anti()),
_left_output_slot_flags(parent->left_output_slot_flags()),
Expand All @@ -70,27 +60,13 @@ void ProcessHashTableProbe<JoinOpType>::build_side_output_column(
bool have_other_join_conjunct, bool is_mark_join) {
SCOPED_TIMER(_build_side_output_timer);

constexpr auto probe_all =
JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType == TJoinOp::FULL_OUTER_JOIN;

// indicates whether build_indexs contain 0
bool build_index_has_zero =
(JoinOpType != TJoinOp::INNER_JOIN && JoinOpType != TJoinOp::RIGHT_OUTER_JOIN) ||
have_other_join_conjunct || is_mark_join;
if (!size) {
return;
}
// Dispose right tuple is null flags columns
if (probe_all && !have_other_join_conjunct) {
_tuple_is_null_right_flags->resize(size);
auto* __restrict null_data = _tuple_is_null_right_flags->data();
for (int i = 0; i < size; ++i) {
null_data[i] = _build_indexs[i] == 0;
}
if (_need_calculate_build_index_has_zero) {
build_index_has_zero = simd::contain_byte(null_data, size, 1);
}
}

if (!build_index_has_zero && _build_column_has_null.empty()) {
_need_calculate_build_index_has_zero = false;
Expand Down Expand Up @@ -141,14 +117,14 @@ void ProcessHashTableProbe<JoinOpType>::build_side_output_column(
template <int JoinOpType>
void ProcessHashTableProbe<JoinOpType>::probe_side_output_column(
vectorized::MutableColumns& mcol, const std::vector<bool>& output_slot_flags, int size,
int last_probe_index, bool all_match_one, bool have_other_join_conjunct) {
bool all_match_one, bool have_other_join_conjunct) {
SCOPED_TIMER(_probe_side_output_timer);
auto& probe_block = _parent->_probe_block;
for (int i = 0; i < output_slot_flags.size(); ++i) {
if (output_slot_flags[i]) {
auto& column = probe_block.get_by_position(i).column;
if (all_match_one) {
mcol[i]->insert_range_from(*column, last_probe_index, size);
mcol[i]->insert_range_from(*column, _probe_indexs[0], size);
} else {
mcol[i]->insert_indices_from(*column, _probe_indexs.data(),
_probe_indexs.data() + size);
Expand All @@ -158,12 +134,6 @@ void ProcessHashTableProbe<JoinOpType>::probe_side_output_column(
mcol[i] = vectorized::ColumnConst::create(std::move(mcol[i]), size);
}
}

if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) {
if (!have_other_join_conjunct) {
_tuple_is_null_left_flags->resize_fill(size, 0);
}
}
}

template <int JoinOpType>
Expand Down Expand Up @@ -212,7 +182,6 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c

auto& probe_index = _parent->_probe_index;
auto& build_index = _parent->_build_index;
auto last_probe_index = probe_index;
{
SCOPED_TIMER(_init_probe_side_timer);
_init_probe_side<HashTableType>(hash_table_ctx, probe_rows, with_other_conjuncts, null_map);
Expand Down Expand Up @@ -277,9 +246,8 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c

if constexpr (with_other_conjuncts || (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN &&
JoinOpType != TJoinOp::RIGHT_ANTI_JOIN)) {
auto check_all_match_one = [](const std::vector<uint32_t>& vecs, uint32_t probe_idx,
int size) {
if (!size || vecs[0] != probe_idx || vecs[size - 1] != probe_idx + size - 1) {
auto check_all_match_one = [](const std::vector<uint32_t>& vecs, int size) {
if (!size || vecs[size - 1] != vecs[0] + size - 1) {
return false;
}
for (int i = 1; i < size; i++) {
Expand All @@ -290,10 +258,9 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
return true;
};

probe_side_output_column(
mcol, *_left_output_slot_flags, current_offset, last_probe_index,
check_all_match_one(_probe_indexs, last_probe_index, current_offset),
with_other_conjuncts);
probe_side_output_column(mcol, *_left_output_slot_flags, current_offset,
check_all_match_one(_probe_indexs, current_offset),
with_other_conjuncts);
}

output_block->swap(mutable_block.to_block());
Expand Down Expand Up @@ -540,8 +507,6 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Bl

for (size_t i = 0; i < row_count; ++i) {
if (filter_map[i]) {
_tuple_is_null_right_flags->emplace_back(!_build_indexs[i] ||
!filter_column_ptr[i]);
if constexpr (JoinOpType == TJoinOp::FULL_OUTER_JOIN) {
visited[_build_indexs[i]] = 1;
}
Expand Down Expand Up @@ -585,12 +550,9 @@ Status ProcessHashTableProbe<JoinOpType>::do_other_join_conjuncts(vectorized::Bl
visited[_build_indexs[i]] |= filter_column_ptr[i];
}
} else if constexpr (JoinOpType == TJoinOp::RIGHT_OUTER_JOIN) {
auto filter_size = 0;
for (int i = 0; i < row_count; ++i) {
visited[_build_indexs[i]] |= filter_column_ptr[i];
filter_size += filter_column_ptr[i];
}
_tuple_is_null_left_flags->resize_fill(filter_size, 0);
}

if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN ||
Expand Down Expand Up @@ -659,7 +621,6 @@ Status ProcessHashTableProbe<JoinOpType>::finish_probing(HashTableType& hash_tab
assert_cast<vectorized::ColumnNullable*>(mcol[i].get())
->insert_many_defaults(block_size);
}
_tuple_is_null_left_flags->resize_fill(block_size, 1);
}
output_block->swap(mutable_block.to_block(0));
DCHECK(block_size <= _batch_size);
Expand Down
13 changes: 0 additions & 13 deletions be/src/pipeline/exec/join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ template <typename SharedStateArg, typename Derived>
Status JoinProbeLocalState<SharedStateArg, Derived>::open(RuntimeState* state) {
RETURN_IF_ERROR(Base::open(state));
auto& p = Base::_parent->template cast<typename Derived::Parent>();
// only use in outer join as the bool column to mark for function of `tuple_is_null`
if (p._is_outer_join) {
_tuple_is_null_left_flag_column = vectorized::ColumnUInt8::create();
_tuple_is_null_right_flag_column = vectorized::ColumnUInt8::create();
}
_output_expr_ctxs.resize(p._output_expr_ctxs.size());
for (size_t i = 0; i < _output_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._output_expr_ctxs[i]->clone(state, _output_expr_ctxs[i]));
Expand Down Expand Up @@ -96,14 +91,6 @@ Status JoinProbeLocalState<SharedStateArg, Derived>::_build_output_block(
return Status::OK();
}

template <typename SharedStateArg, typename Derived>
void JoinProbeLocalState<SharedStateArg, Derived>::_reset_tuple_is_null_column() {
if (Base::_parent->template cast<typename Derived::Parent>()._is_outer_join) {
reinterpret_cast<vectorized::ColumnUInt8&>(*_tuple_is_null_left_flag_column).clear();
reinterpret_cast<vectorized::ColumnUInt8&>(*_tuple_is_null_right_flag_column).clear();
}
}

template <typename LocalStateType>
JoinProbeOperatorX<LocalStateType>::JoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode,
int operator_id, const DescriptorTbl& descs)
Expand Down
4 changes: 0 additions & 4 deletions be/src/pipeline/exec/join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ class JoinProbeLocalState : public PipelineXLocalState<SharedStateArg> {
Status init(RuntimeState* state, LocalStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
virtual void add_tuple_is_null_column(vectorized::Block* block) = 0;

protected:
template <typename LocalStateType>
Expand All @@ -41,12 +40,9 @@ class JoinProbeLocalState : public PipelineXLocalState<SharedStateArg> {
void _construct_mutable_join_block();
Status _build_output_block(vectorized::Block* origin_block, vectorized::Block* output_block,
bool keep_origin = true);
void _reset_tuple_is_null_column();
// output expr
vectorized::VExprContextSPtrs _output_expr_ctxs;
vectorized::Block _join_block;
vectorized::MutableColumnPtr _tuple_is_null_left_flag_column = nullptr;
vectorized::MutableColumnPtr _tuple_is_null_right_flag_column = nullptr;

size_t _mark_column_id = -1;

Expand Down
Loading

0 comments on commit 322de2b

Please sign in to comment.