Skip to content

Commit

Permalink
[pipelineX](pick) pick PRs from pipeline (apache#25340)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored Oct 12, 2023
1 parent 80a49ed commit 22684de
Show file tree
Hide file tree
Showing 12 changed files with 142 additions and 113 deletions.
7 changes: 0 additions & 7 deletions be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -543,13 +543,6 @@ Status AggLocalState::close(RuntimeState* state) {
_agg_data->method_variant);
}

_shared_state->agg_data = nullptr;
_shared_state->aggregate_data_container = nullptr;
_shared_state->agg_arena_pool = nullptr;
_shared_state->agg_profile_arena = nullptr;

std::vector<vectorized::AggregateDataPtr> tmp_values;
_shared_state->values.swap(tmp_values);
return Base::close(state);
}

Expand Down
18 changes: 4 additions & 14 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,19 +330,6 @@ Status AnalyticLocalState::output_current_block(vectorized::Block* block) {
return Status::OK();
}

void AnalyticLocalState::release_mem() {
_agg_arena_pool = nullptr;

std::vector<vectorized::Block> tmp_input_blocks;
_shared_state->input_blocks.swap(tmp_input_blocks);

std::vector<std::vector<vectorized::MutableColumnPtr>> tmp_agg_input_columns;
_shared_state->agg_input_columns.swap(tmp_agg_input_columns);

std::vector<vectorized::MutableColumnPtr> tmp_result_window_columns;
_result_window_columns.swap(tmp_result_window_columns);
}

AnalyticSourceOperatorX::AnalyticSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: OperatorX<AnalyticLocalState>(pool, tnode, descs),
Expand Down Expand Up @@ -443,7 +430,10 @@ Status AnalyticLocalState::close(RuntimeState* state) {
}

static_cast<void>(_destroy_agg_status());
release_mem();
_agg_arena_pool = nullptr;

std::vector<vectorized::MutableColumnPtr> tmp_result_window_columns;
_result_window_columns.swap(tmp_result_window_columns);
return PipelineXLocalState<AnalyticDependency>::close(state);
}

Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/analytic_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ class AnalyticLocalState final : public PipelineXLocalState<AnalyticDependency>

Status output_current_block(vectorized::Block* block);

void release_mem();

bool init_next_partition(vectorized::BlockRowPos found_partition_end);

private:
Expand Down
8 changes: 8 additions & 0 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ void HashJoinBuildSinkLocalState::init_short_circuit_for_probe() {
(_shared_state->build_blocks->empty() && p._join_op == TJoinOp::RIGHT_OUTER_JOIN) ||
(_shared_state->build_blocks->empty() && p._join_op == TJoinOp::RIGHT_SEMI_JOIN) ||
(_shared_state->build_blocks->empty() && p._join_op == TJoinOp::RIGHT_ANTI_JOIN);

//when build table rows is 0 and not have other_join_conjunct and not _is_mark_join and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
//we could get the result is probe table + null-column(if need output)
_shared_state->empty_right_table_need_probe_dispose =
(_shared_state->build_blocks->empty() && !p._have_other_join_conjunct &&
!p._is_mark_join) &&
(p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op == TJoinOp::FULL_OUTER_JOIN ||
p._join_op == TJoinOp::LEFT_ANTI_JOIN);
}

Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
Expand Down
156 changes: 109 additions & 47 deletions be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ Status HashJoinProbeLocalState::close(RuntimeState* state) {
}},
*_process_hashtable_ctx_variants);
}
_shared_state->arena = nullptr;
_shared_state->hash_table_variants.reset();
_process_hashtable_ctx_variants = nullptr;
_null_map_column = nullptr;
_tuple_is_null_left_flag_column = nullptr;
Expand Down Expand Up @@ -190,46 +188,96 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
return Status::OK();
}
if (local_state._shared_state->_has_null_in_build_side &&
_short_circuit_for_null_in_build_side) {
_short_circuit_for_null_in_build_side && _is_mark_join) {
/// `_has_null_in_build_side` means have null value in build side.
/// `_short_circuit_for_null_in_build_side` means short circuit if has null in build side(e.g. null aware left anti join).
/// We need to create a column as mark with all rows set to NULL.
if (_is_mark_join) {
auto block_rows = local_state._probe_block.rows();
if (block_rows == 0) {
if (local_state._probe_eos) {
source_state = SourceState::FINISHED;
}
return Status::OK();
auto block_rows = local_state._probe_block.rows();
if (block_rows == 0) {
if (local_state._probe_eos) {
source_state = SourceState::FINISHED;
}
return Status::OK();
}

vectorized::Block temp_block;
//get probe side output column
for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
if (_left_output_slot_flags[i]) {
temp_block.insert(local_state._probe_block.get_by_position(i));
}
}
auto mark_column = vectorized::ColumnNullable::create(
vectorized::ColumnUInt8::create(block_rows, 0),
vectorized::ColumnUInt8::create(block_rows, 1));
temp_block.insert({std::move(mark_column),
make_nullable(std::make_shared<vectorized::DataTypeUInt8>()), ""});

{
SCOPED_TIMER(local_state._join_filter_timer);
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
local_state._conjuncts, &temp_block, temp_block.columns()));
vectorized::Block temp_block;
//get probe side output column
for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
if (_left_output_slot_flags[i]) {
temp_block.insert(local_state._probe_block.get_by_position(i));
}
}
auto mark_column =
vectorized::ColumnNullable::create(vectorized::ColumnUInt8::create(block_rows, 0),
vectorized::ColumnUInt8::create(block_rows, 1));
temp_block.insert({std::move(mark_column),
make_nullable(std::make_shared<vectorized::DataTypeUInt8>()), ""});

RETURN_IF_ERROR(local_state._build_output_block(&temp_block, output_block, false));
temp_block.clear();
local_state._probe_block.clear_column_data(
_child_x->row_desc().num_materialized_slots());
local_state.reached_limit(output_block, source_state);
{
SCOPED_TIMER(local_state._join_filter_timer);
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(
local_state._conjuncts, &temp_block, temp_block.columns()));
}

RETURN_IF_ERROR(local_state._build_output_block(&temp_block, output_block, false));
temp_block.clear();
local_state._probe_block.clear_column_data(_child_x->row_desc().num_materialized_slots());
local_state.reached_limit(output_block, source_state);
return Status::OK();
}

//TODO: this short circuit maybe could refactor, no need to check at here.
if (local_state._shared_state->empty_right_table_need_probe_dispose) {
// when build table rows is 0 and not have other_join_conjunct and join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
// we could get the result is probe table + null-column(if need output)
// If we use a short-circuit strategy, should return block directly by add additional null data.
auto block_rows = local_state._probe_block.rows();
if (local_state._probe_eos && block_rows == 0) {
if (local_state._probe_eos) {
source_state = SourceState::FINISHED;
}
return Status::OK();
}

vectorized::Block temp_block;
//get probe side output column
for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
temp_block.insert(local_state._probe_block.get_by_position(i));
}

//create build side null column, if need output
for (int i = 0;
(_join_op != TJoinOp::LEFT_ANTI_JOIN) && i < _right_output_slot_flags.size(); ++i) {
auto type = remove_nullable(_right_table_data_types[i]);
auto column = type->create_column();
column->resize(block_rows);
auto null_map_column =
vectorized::ColumnVector<vectorized::UInt8>::create(block_rows, 1);
auto nullable_column = vectorized::ColumnNullable::create(std::move(column),
std::move(null_map_column));
temp_block.insert({std::move(nullable_column), make_nullable(type),
_right_table_column_names[i]});
}
if (_is_outer_join) {
reinterpret_cast<vectorized::ColumnUInt8*>(
local_state._tuple_is_null_left_flag_column.get())
->get_data()
.resize_fill(block_rows, 0);
reinterpret_cast<vectorized::ColumnUInt8*>(
local_state._tuple_is_null_right_flag_column.get())
->get_data()
.resize_fill(block_rows, 1);
}

/// No need to check the block size in `_filter_data_and_build_output` because here dose not
/// increase the output rows count(just same as `_probe_block`'s rows count).
RETURN_IF_ERROR(local_state.filter_data_and_build_output(state, output_block, source_state,
&temp_block, false));
temp_block.clear();
local_state._probe_block.clear_column_data(_child_x->row_desc().num_materialized_slots());
return Status::OK();
}

local_state._join_block.clear_column_data();

vectorized::MutableBlock mutable_join_block(&local_state._join_block);
Expand Down Expand Up @@ -298,33 +346,45 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
if (!st) {
return st;
}
if (_is_outer_join) {
local_state.add_tuple_is_null_column(&temp_block);
}
auto output_rows = temp_block.rows();
DCHECK(output_rows <= state->batch_size());
{
SCOPED_TIMER(local_state._join_filter_timer);
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, &temp_block,
temp_block.columns()));
}

RETURN_IF_ERROR(local_state.filter_data_and_build_output(state, output_block, source_state,
&temp_block));
// Here make _join_block release the columns' ptr
local_state._join_block.set_columns(local_state._join_block.clone_empty_columns());
mutable_join_block.clear();
return Status::OK();
}

Status HashJoinProbeLocalState::filter_data_and_build_output(RuntimeState* state,
vectorized::Block* output_block,
SourceState& source_state,
vectorized::Block* temp_block,
bool check_rows_count) {
auto& p = _parent->cast<HashJoinProbeOperatorX>();
if (p._is_outer_join) {
add_tuple_is_null_column(temp_block);
}
auto output_rows = temp_block->rows();
if (check_rows_count) {
DCHECK(output_rows <= state->batch_size());
}
{
SCOPED_TIMER(_join_filter_timer);
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, temp_block,
temp_block->columns()));
}

RETURN_IF_ERROR(local_state._build_output_block(&temp_block, output_block, false));
local_state._reset_tuple_is_null_column();
local_state.reached_limit(output_block, source_state);
RETURN_IF_ERROR(_build_output_block(temp_block, output_block, false));
_reset_tuple_is_null_column();
reached_limit(output_block, source_state);
return Status::OK();
}

bool HashJoinProbeOperatorX::need_more_input_data(RuntimeState* state) const {
auto& local_state = state->get_local_state(id())->cast<HashJoinProbeLocalState>();
return (local_state._probe_block.rows() == 0 ||
local_state._probe_index == local_state._probe_block.rows()) &&
!local_state._probe_eos &&
(!local_state._shared_state->short_circuit_for_probe || _is_mark_join);
!local_state._probe_eos && !local_state._shared_state->short_circuit_for_probe;
}

Status HashJoinProbeOperatorX::_do_evaluate(vectorized::Block& block,
Expand Down Expand Up @@ -465,6 +525,8 @@ Status HashJoinProbeOperatorX::prepare(RuntimeState* state) {
_right_table_data_types =
vectorized::VectorizedUtils::get_data_types(_build_side_child->row_desc());
_left_table_data_types = vectorized::VectorizedUtils::get_data_types(_child_x->row_desc());
_right_table_column_names =
vectorized::VectorizedUtils::get_column_names(_build_side_child->row_desc());
_build_side_child.reset();
return Status::OK();
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class HashJoinProbeLocalState final
void prepare_for_next();
void add_tuple_is_null_column(vectorized::Block* block) override;
void init_for_probe(RuntimeState* state);
Status filter_data_and_build_output(RuntimeState* state, vectorized::Block* output_block,
SourceState& source_state, vectorized::Block* temp_block,
bool check_rows_count = true);

HashJoinProbeOperatorX* join_probe() { return (HashJoinProbeOperatorX*)_parent; }

Expand Down Expand Up @@ -135,6 +138,7 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLoca
std::vector<SlotId> _hash_output_slot_ids;
std::vector<bool> _left_output_slot_flags;
std::vector<bool> _right_output_slot_flags;
std::vector<std::string> _right_table_column_names;
};

} // namespace pipeline
Expand Down
6 changes: 0 additions & 6 deletions be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,6 @@ Status NestedLoopJoinProbeLocalState::close(RuntimeState* state) {
}
_child_block->clear();

vectorized::Blocks tmp_build_blocks;
_shared_state->build_blocks.swap(tmp_build_blocks);

vectorized::MutableColumns tmp_build_side_visited_flags;
_shared_state->build_side_visited_flags.swap(tmp_build_side_visited_flags);

_tuple_is_null_left_flag_column = nullptr;
_tuple_is_null_right_flag_column = nullptr;
return JoinProbeLocalState<NestedLoopJoinDependency, NestedLoopJoinProbeLocalState>::close(
Expand Down
15 changes: 7 additions & 8 deletions be/src/pipeline/exec/partition_sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,14 @@ OperatorPtr PartitionSortSourceOperatorBuilder::build_operator() {
return std::make_shared<PartitionSortSourceOperator>(this, _node);
}

Status PartitionSortSourceLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
Status PartitionSortSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXLocalState<PartitionSortDependency>::init(state, info));
SCOPED_TIMER(profile()->total_time_counter());
SCOPED_TIMER(_close_timer);
_shared_state->previous_row = nullptr;
_shared_state->partition_sorts.clear();
return PipelineXLocalState<PartitionSortDependency>::close(state);
SCOPED_TIMER(_open_timer);
_get_next_timer = ADD_TIMER(profile(), "GetResultTime");
_get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime");
_shared_state->previous_row = std::make_unique<vectorized::SortCursorCmp>();
return Status::OK();
}

Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* output_block,
Expand Down
23 changes: 7 additions & 16 deletions be/src/pipeline/exec/partition_sort_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,26 +57,17 @@ class PartitionSortSourceLocalState final : public PipelineXLocalState<Partition
using Base = PipelineXLocalState<PartitionSortDependency>;
PartitionSortSourceLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<PartitionSortDependency>(state, parent),
_get_next_timer(nullptr) {}
_get_sorted_timer(nullptr),
_get_next_timer(nullptr),
_num_rows_returned(0) {}

Status init(RuntimeState* state, LocalStateInfo& info) override {
RETURN_IF_ERROR(PipelineXLocalState<PartitionSortDependency>::init(state, info));
SCOPED_TIMER(profile()->total_time_counter());
SCOPED_TIMER(_open_timer);
_get_next_timer = ADD_TIMER(profile(), "GetResultTime");
_get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime");
_shared_state->previous_row = std::make_unique<vectorized::SortCursorCmp>();
return Status::OK();
}

Status close(RuntimeState* state) override;

int64_t _num_rows_returned = 0;
Status init(RuntimeState* state, LocalStateInfo& info) override;

private:
friend class PartitionSortSourceOperatorX;
RuntimeProfile::Counter* _get_sorted_timer = nullptr;
RuntimeProfile::Counter* _get_next_timer = nullptr;
RuntimeProfile::Counter* _get_sorted_timer;
RuntimeProfile::Counter* _get_next_timer;
int64_t _num_rows_returned;
};

class PartitionSortSourceOperatorX final : public OperatorX<PartitionSortSourceLocalState> {
Expand Down
10 changes: 0 additions & 10 deletions be/src/pipeline/exec/sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,4 @@ Dependency* SortSourceOperatorX::wait_for_dependency(RuntimeState* state) {
return local_state._dependency->read_blocked_by();
}

Status SortLocalState::close(RuntimeState* state) {
SCOPED_TIMER(profile()->total_time_counter());
SCOPED_TIMER(_close_timer);
if (_closed) {
return Status::OK();
}
_shared_state->sorter = nullptr;
return PipelineXLocalState<SortDependency>::close(state);
}

} // namespace doris::pipeline
4 changes: 1 addition & 3 deletions be/src/pipeline/exec/sort_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,11 @@ class SortSourceOperator final : public SourceOperator<SortSourceOperatorBuilder

class SortSourceOperatorX;
class SortLocalState final : public PipelineXLocalState<SortDependency> {
ENABLE_FACTORY_CREATOR(SortLocalState);

public:
ENABLE_FACTORY_CREATOR(SortLocalState);
SortLocalState(RuntimeState* state, OperatorXBase* parent);

Status init(RuntimeState* state, LocalStateInfo& info) override;
Status close(RuntimeState* state) override;

private:
friend class SortSourceOperatorX;
Expand Down
Loading

0 comments on commit 22684de

Please sign in to comment.