diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index 1114931f627ed7..c5d7987ab8a139 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -543,13 +543,6 @@ Status AggLocalState::close(RuntimeState* state) { _agg_data->method_variant); } - _shared_state->agg_data = nullptr; - _shared_state->aggregate_data_container = nullptr; - _shared_state->agg_arena_pool = nullptr; - _shared_state->agg_profile_arena = nullptr; - - std::vector tmp_values; - _shared_state->values.swap(tmp_values); return Base::close(state); } diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index 401bf6596d9714..5397a0a851460e 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -330,19 +330,6 @@ Status AnalyticLocalState::output_current_block(vectorized::Block* block) { return Status::OK(); } -void AnalyticLocalState::release_mem() { - _agg_arena_pool = nullptr; - - std::vector tmp_input_blocks; - _shared_state->input_blocks.swap(tmp_input_blocks); - - std::vector> tmp_agg_input_columns; - _shared_state->agg_input_columns.swap(tmp_agg_input_columns); - - std::vector tmp_result_window_columns; - _result_window_columns.swap(tmp_result_window_columns); -} - AnalyticSourceOperatorX::AnalyticSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : OperatorX(pool, tnode, descs), @@ -443,7 +430,10 @@ Status AnalyticLocalState::close(RuntimeState* state) { } static_cast(_destroy_agg_status()); - release_mem(); + _agg_arena_pool = nullptr; + + std::vector tmp_result_window_columns; + _result_window_columns.swap(tmp_result_window_columns); return PipelineXLocalState::close(state); } diff --git a/be/src/pipeline/exec/analytic_source_operator.h b/be/src/pipeline/exec/analytic_source_operator.h index 36497c1ffa506e..873ed570ba8e9d 100644 --- a/be/src/pipeline/exec/analytic_source_operator.h +++ b/be/src/pipeline/exec/analytic_source_operator.h @@ -59,8 +59,6 @@ class AnalyticLocalState final : public PipelineXLocalState Status output_current_block(vectorized::Block* block); - void release_mem(); - bool init_next_partition(vectorized::BlockRowPos found_partition_end); private: diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 3b0342a926b6fb..99aeaf0a1c8462 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -148,6 +148,14 @@ void HashJoinBuildSinkLocalState::init_short_circuit_for_probe() { (_shared_state->build_blocks->empty() && p._join_op == TJoinOp::RIGHT_OUTER_JOIN) || (_shared_state->build_blocks->empty() && p._join_op == TJoinOp::RIGHT_SEMI_JOIN) || (_shared_state->build_blocks->empty() && p._join_op == TJoinOp::RIGHT_ANTI_JOIN); + + //when build table rows is 0 and not have other_join_conjunct and not _is_mark_join and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN + //we could get the result is probe table + null-column(if need output) + _shared_state->empty_right_table_need_probe_dispose = + (_shared_state->build_blocks->empty() && !p._have_other_join_conjunct && + !p._is_mark_join) && + (p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op == TJoinOp::FULL_OUTER_JOIN || + p._join_op == TJoinOp::LEFT_ANTI_JOIN); } Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state, diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index a4a66507085a0a..1688b17778e269 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -86,8 +86,6 @@ Status HashJoinProbeLocalState::close(RuntimeState* state) { }}, *_process_hashtable_ctx_variants); } - _shared_state->arena = nullptr; - _shared_state->hash_table_variants.reset(); _process_hashtable_ctx_variants = nullptr; _null_map_column = nullptr; _tuple_is_null_left_flag_column = nullptr; @@ -190,46 +188,96 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc return Status::OK(); } if (local_state._shared_state->_has_null_in_build_side && - _short_circuit_for_null_in_build_side) { + _short_circuit_for_null_in_build_side && _is_mark_join) { /// `_has_null_in_build_side` means have null value in build side. /// `_short_circuit_for_null_in_build_side` means short circuit if has null in build side(e.g. null aware left anti join). /// We need to create a column as mark with all rows set to NULL. - if (_is_mark_join) { - auto block_rows = local_state._probe_block.rows(); - if (block_rows == 0) { - if (local_state._probe_eos) { - source_state = SourceState::FINISHED; - } - return Status::OK(); + auto block_rows = local_state._probe_block.rows(); + if (block_rows == 0) { + if (local_state._probe_eos) { + source_state = SourceState::FINISHED; } + return Status::OK(); + } - vectorized::Block temp_block; - //get probe side output column - for (int i = 0; i < _left_output_slot_flags.size(); ++i) { - if (_left_output_slot_flags[i]) { - temp_block.insert(local_state._probe_block.get_by_position(i)); - } - } - auto mark_column = vectorized::ColumnNullable::create( - vectorized::ColumnUInt8::create(block_rows, 0), - vectorized::ColumnUInt8::create(block_rows, 1)); - temp_block.insert({std::move(mark_column), - make_nullable(std::make_shared()), ""}); - - { - SCOPED_TIMER(local_state._join_filter_timer); - RETURN_IF_ERROR(vectorized::VExprContext::filter_block( - local_state._conjuncts, &temp_block, temp_block.columns())); + vectorized::Block temp_block; + //get probe side output column + for (int i = 0; i < _left_output_slot_flags.size(); ++i) { + if (_left_output_slot_flags[i]) { + temp_block.insert(local_state._probe_block.get_by_position(i)); } + } + auto mark_column = + vectorized::ColumnNullable::create(vectorized::ColumnUInt8::create(block_rows, 0), + vectorized::ColumnUInt8::create(block_rows, 1)); + temp_block.insert({std::move(mark_column), + make_nullable(std::make_shared()), ""}); - RETURN_IF_ERROR(local_state._build_output_block(&temp_block, output_block, false)); - temp_block.clear(); - local_state._probe_block.clear_column_data( - _child_x->row_desc().num_materialized_slots()); - local_state.reached_limit(output_block, source_state); + { + SCOPED_TIMER(local_state._join_filter_timer); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block( + local_state._conjuncts, &temp_block, temp_block.columns())); + } + + RETURN_IF_ERROR(local_state._build_output_block(&temp_block, output_block, false)); + temp_block.clear(); + local_state._probe_block.clear_column_data(_child_x->row_desc().num_materialized_slots()); + local_state.reached_limit(output_block, source_state); + return Status::OK(); + } + + //TODO: this short circuit maybe could refactor, no need to check at here. + if (local_state._shared_state->empty_right_table_need_probe_dispose) { + // when build table rows is 0 and not have other_join_conjunct and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN + // we could get the result is probe table + null-column(if need output) + // If we use a short-circuit strategy, should return block directly by add additional null data. + auto block_rows = local_state._probe_block.rows(); + if (local_state._probe_eos && block_rows == 0) { + if (local_state._probe_eos) { + source_state = SourceState::FINISHED; + } return Status::OK(); } + + vectorized::Block temp_block; + //get probe side output column + for (int i = 0; i < _left_output_slot_flags.size(); ++i) { + temp_block.insert(local_state._probe_block.get_by_position(i)); + } + + //create build side null column, if need output + for (int i = 0; + (_join_op != TJoinOp::LEFT_ANTI_JOIN) && i < _right_output_slot_flags.size(); ++i) { + auto type = remove_nullable(_right_table_data_types[i]); + auto column = type->create_column(); + column->resize(block_rows); + auto null_map_column = + vectorized::ColumnVector::create(block_rows, 1); + auto nullable_column = vectorized::ColumnNullable::create(std::move(column), + std::move(null_map_column)); + temp_block.insert({std::move(nullable_column), make_nullable(type), + _right_table_column_names[i]}); + } + if (_is_outer_join) { + reinterpret_cast( + local_state._tuple_is_null_left_flag_column.get()) + ->get_data() + .resize_fill(block_rows, 0); + reinterpret_cast( + local_state._tuple_is_null_right_flag_column.get()) + ->get_data() + .resize_fill(block_rows, 1); + } + + /// No need to check the block size in `_filter_data_and_build_output` because here dose not + /// increase the output rows count(just same as `_probe_block`'s rows count). + RETURN_IF_ERROR(local_state.filter_data_and_build_output(state, output_block, source_state, + &temp_block, false)); + temp_block.clear(); + local_state._probe_block.clear_column_data(_child_x->row_desc().num_materialized_slots()); + return Status::OK(); } + local_state._join_block.clear_column_data(); vectorized::MutableBlock mutable_join_block(&local_state._join_block); @@ -298,24 +346,37 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc if (!st) { return st; } - if (_is_outer_join) { - local_state.add_tuple_is_null_column(&temp_block); - } - auto output_rows = temp_block.rows(); - DCHECK(output_rows <= state->batch_size()); - { - SCOPED_TIMER(local_state._join_filter_timer); - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, &temp_block, - temp_block.columns())); - } + RETURN_IF_ERROR(local_state.filter_data_and_build_output(state, output_block, source_state, + &temp_block)); // Here make _join_block release the columns' ptr local_state._join_block.set_columns(local_state._join_block.clone_empty_columns()); mutable_join_block.clear(); + return Status::OK(); +} + +Status HashJoinProbeLocalState::filter_data_and_build_output(RuntimeState* state, + vectorized::Block* output_block, + SourceState& source_state, + vectorized::Block* temp_block, + bool check_rows_count) { + auto& p = _parent->cast(); + if (p._is_outer_join) { + add_tuple_is_null_column(temp_block); + } + auto output_rows = temp_block->rows(); + if (check_rows_count) { + DCHECK(output_rows <= state->batch_size()); + } + { + SCOPED_TIMER(_join_filter_timer); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, temp_block, + temp_block->columns())); + } - RETURN_IF_ERROR(local_state._build_output_block(&temp_block, output_block, false)); - local_state._reset_tuple_is_null_column(); - local_state.reached_limit(output_block, source_state); + RETURN_IF_ERROR(_build_output_block(temp_block, output_block, false)); + _reset_tuple_is_null_column(); + reached_limit(output_block, source_state); return Status::OK(); } @@ -323,8 +384,7 @@ bool HashJoinProbeOperatorX::need_more_input_data(RuntimeState* state) const { auto& local_state = state->get_local_state(id())->cast(); return (local_state._probe_block.rows() == 0 || local_state._probe_index == local_state._probe_block.rows()) && - !local_state._probe_eos && - (!local_state._shared_state->short_circuit_for_probe || _is_mark_join); + !local_state._probe_eos && !local_state._shared_state->short_circuit_for_probe; } Status HashJoinProbeOperatorX::_do_evaluate(vectorized::Block& block, @@ -465,6 +525,8 @@ Status HashJoinProbeOperatorX::prepare(RuntimeState* state) { _right_table_data_types = vectorized::VectorizedUtils::get_data_types(_build_side_child->row_desc()); _left_table_data_types = vectorized::VectorizedUtils::get_data_types(_child_x->row_desc()); + _right_table_column_names = + vectorized::VectorizedUtils::get_column_names(_build_side_child->row_desc()); _build_side_child.reset(); return Status::OK(); } diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index f4be3d9051cded..4d96b68b7d47f7 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -60,6 +60,9 @@ class HashJoinProbeLocalState final void prepare_for_next(); void add_tuple_is_null_column(vectorized::Block* block) override; void init_for_probe(RuntimeState* state); + Status filter_data_and_build_output(RuntimeState* state, vectorized::Block* output_block, + SourceState& source_state, vectorized::Block* temp_block, + bool check_rows_count = true); HashJoinProbeOperatorX* join_probe() { return (HashJoinProbeOperatorX*)_parent; } @@ -135,6 +138,7 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX _hash_output_slot_ids; std::vector _left_output_slot_flags; std::vector _right_output_slot_flags; + std::vector _right_table_column_names; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index 41253ac5b12e11..e1454c8c3c0cfd 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -73,12 +73,6 @@ Status NestedLoopJoinProbeLocalState::close(RuntimeState* state) { } _child_block->clear(); - vectorized::Blocks tmp_build_blocks; - _shared_state->build_blocks.swap(tmp_build_blocks); - - vectorized::MutableColumns tmp_build_side_visited_flags; - _shared_state->build_side_visited_flags.swap(tmp_build_side_visited_flags); - _tuple_is_null_left_flag_column = nullptr; _tuple_is_null_right_flag_column = nullptr; return JoinProbeLocalState::close( diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp b/be/src/pipeline/exec/partition_sort_source_operator.cpp index a67728de4f3657..a80d3277006204 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp @@ -29,15 +29,14 @@ OperatorPtr PartitionSortSourceOperatorBuilder::build_operator() { return std::make_shared(this, _node); } -Status PartitionSortSourceLocalState::close(RuntimeState* state) { - if (_closed) { - return Status::OK(); - } +Status PartitionSortSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { + RETURN_IF_ERROR(PipelineXLocalState::init(state, info)); SCOPED_TIMER(profile()->total_time_counter()); - SCOPED_TIMER(_close_timer); - _shared_state->previous_row = nullptr; - _shared_state->partition_sorts.clear(); - return PipelineXLocalState::close(state); + SCOPED_TIMER(_open_timer); + _get_next_timer = ADD_TIMER(profile(), "GetResultTime"); + _get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime"); + _shared_state->previous_row = std::make_unique(); + return Status::OK(); } Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* output_block, diff --git a/be/src/pipeline/exec/partition_sort_source_operator.h b/be/src/pipeline/exec/partition_sort_source_operator.h index 94615613439e6c..f7d950838c638a 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.h +++ b/be/src/pipeline/exec/partition_sort_source_operator.h @@ -57,26 +57,17 @@ class PartitionSortSourceLocalState final : public PipelineXLocalState; PartitionSortSourceLocalState(RuntimeState* state, OperatorXBase* parent) : PipelineXLocalState(state, parent), - _get_next_timer(nullptr) {} + _get_sorted_timer(nullptr), + _get_next_timer(nullptr), + _num_rows_returned(0) {} - Status init(RuntimeState* state, LocalStateInfo& info) override { - RETURN_IF_ERROR(PipelineXLocalState::init(state, info)); - SCOPED_TIMER(profile()->total_time_counter()); - SCOPED_TIMER(_open_timer); - _get_next_timer = ADD_TIMER(profile(), "GetResultTime"); - _get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime"); - _shared_state->previous_row = std::make_unique(); - return Status::OK(); - } - - Status close(RuntimeState* state) override; - - int64_t _num_rows_returned = 0; + Status init(RuntimeState* state, LocalStateInfo& info) override; private: friend class PartitionSortSourceOperatorX; - RuntimeProfile::Counter* _get_sorted_timer = nullptr; - RuntimeProfile::Counter* _get_next_timer = nullptr; + RuntimeProfile::Counter* _get_sorted_timer; + RuntimeProfile::Counter* _get_next_timer; + int64_t _num_rows_returned; }; class PartitionSortSourceOperatorX final : public OperatorX { diff --git a/be/src/pipeline/exec/sort_source_operator.cpp b/be/src/pipeline/exec/sort_source_operator.cpp index 7fd70f8da18260..32218879fffc0b 100644 --- a/be/src/pipeline/exec/sort_source_operator.cpp +++ b/be/src/pipeline/exec/sort_source_operator.cpp @@ -60,14 +60,4 @@ Dependency* SortSourceOperatorX::wait_for_dependency(RuntimeState* state) { return local_state._dependency->read_blocked_by(); } -Status SortLocalState::close(RuntimeState* state) { - SCOPED_TIMER(profile()->total_time_counter()); - SCOPED_TIMER(_close_timer); - if (_closed) { - return Status::OK(); - } - _shared_state->sorter = nullptr; - return PipelineXLocalState::close(state); -} - } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/sort_source_operator.h b/be/src/pipeline/exec/sort_source_operator.h index b58a123e441bc5..4a50b916484597 100644 --- a/be/src/pipeline/exec/sort_source_operator.h +++ b/be/src/pipeline/exec/sort_source_operator.h @@ -47,13 +47,11 @@ class SortSourceOperator final : public SourceOperator { - ENABLE_FACTORY_CREATOR(SortLocalState); - public: + ENABLE_FACTORY_CREATOR(SortLocalState); SortLocalState(RuntimeState* state, OperatorXBase* parent); Status init(RuntimeState* state, LocalStateInfo& info) override; - Status close(RuntimeState* state) override; private: friend class SortSourceOperatorX; diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index a548bd39666b5c..c5abc5142cc968 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -550,6 +550,8 @@ struct JoinSharedState { // 2. build side rows is empty, Join op is: inner join/right outer join/left semi/right semi/right anti bool _has_null_in_build_side = false; bool short_circuit_for_probe = false; + // for some join, when build side rows is empty, we could return directly by add some additional null data in probe table. + bool empty_right_table_need_probe_dispose = false; vectorized::JoinOpVariants join_op_variants; };