From 7856662ecf46ce956286d06dea86e90356d889b6 Mon Sep 17 00:00:00 2001 From: Pxl Date: Tue, 17 Dec 2024 17:21:42 +0800 Subject: [PATCH] [Bug](pipeline) make sink operator process eos signals after wake_up_early #45207 (#45400) make sink operator process eos signals after wake_up_early #45207 (#45400) --- be/src/pipeline/exec/hashjoin_build_sink.cpp | 57 ++++++---- be/src/pipeline/pipeline.cpp | 7 +- be/src/pipeline/pipeline_task.cpp | 6 +- be/src/pipeline/pipeline_task.h | 5 +- .../pipeline/pipeline_x/pipeline_x_task.cpp | 107 ++++++++++-------- be/src/pipeline/pipeline_x/pipeline_x_task.h | 12 +- 6 files changed, 112 insertions(+), 82 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index dff377d62d5c4e..87c8d7caea30ad 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -137,34 +137,47 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu } }}; - if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled()) { + if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() || !_eos) { return Base::close(state, exec_status); } - if (state->get_task()->wake_up_by_downstream()) { - if (_should_build_hash_table) { - // partitial ignore rf to make global rf work + try { + if (state->get_task()->wake_up_early()) { + // partitial ignore rf to make global rf work or ignore useless rf RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency)); RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters()); - } else { - // do not publish filter coz local rf not inited and useless - return Base::close(state, exec_status); - } - } else if (_should_build_hash_table) { - auto* block = _shared_state->build_block.get(); - uint64_t hash_table_size = block ? block->rows() : 0; - { - SCOPED_TIMER(_runtime_filter_init_timer); - RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size)); - RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state)); - } - if (hash_table_size > 1) { - SCOPED_TIMER(_runtime_filter_compute_timer); - _runtime_filter_slots->insert(block); + } else if (_should_build_hash_table) { + auto* block = _shared_state->build_block.get(); + uint64_t hash_table_size = block ? block->rows() : 0; + { + SCOPED_TIMER(_runtime_filter_init_timer); + RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size)); + RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state)); + } + if (hash_table_size > 1) { + SCOPED_TIMER(_runtime_filter_compute_timer); + _runtime_filter_slots->insert(block); + } } + + SCOPED_TIMER(_publish_runtime_filter_timer); + RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table)); + } catch (Exception& e) { + bool blocked_by_complete_build_stage = p._shared_hashtable_controller && + !p._shared_hash_table_context->complete_build_stage; + bool blocked_by_shared_hash_table_signal = !_should_build_hash_table && + p._shared_hashtable_controller && + !p._shared_hash_table_context->signaled; + + return Status::InternalError( + "rf process meet error: {}, wake_up_early: {}, should_build_hash_table: " + "{}, _finish_dependency: {}, blocked_by_complete_build_stage: {}, " + "blocked_by_shared_hash_table_signal: " + "{}", + e.to_string(), state->get_task()->wake_up_early(), _should_build_hash_table, + _finish_dependency->debug_string(), blocked_by_complete_build_stage, + blocked_by_shared_hash_table_signal); } - SCOPED_TIMER(_publish_runtime_filter_timer); - RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table)); return Base::close(state, exec_status); } @@ -536,7 +549,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); - local_state._eos = eos; if (local_state._should_build_hash_table) { // If eos or have already met a null value using short-circuit strategy, we do not need to pull // data from probe side. @@ -654,6 +666,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* } if (eos) { + local_state._eos = eos; local_state.init_short_circuit_for_probe(); // Since the comparison of null values is meaningless, null aware left anti/semi join should not output null // when the build side is not empty. diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp index 2431e64d158c2a..450cb0c123d8f8 100644 --- a/be/src/pipeline/pipeline.cpp +++ b/be/src/pipeline/pipeline.cpp @@ -104,7 +104,12 @@ void Pipeline::make_all_runnable() { if (_sink_x->count_down_destination()) { for (auto* task : _tasks) { if (task) { - task->clear_blocking_state(true); + task->set_wake_up_early(); + } + } + for (auto* task : _tasks) { + if (task) { + task->clear_blocking_state(); } } } diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 13b672868b360b..eb2073e8f8cb81 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -325,7 +325,7 @@ Status PipelineTask::execute(bool* eos) { return status; } -Status PipelineTask::close(Status exec_status) { +Status PipelineTask::close(Status exec_status, bool close_sink) { int64_t close_ns = 0; Defer defer {[&]() { if (_task_queue) { @@ -335,7 +335,9 @@ Status PipelineTask::close(Status exec_status) { Status s; { SCOPED_RAW_TIMER(&close_ns); - s = _sink->close(_state); + if (close_sink) { + s = _sink->close(_state); + } for (auto& op : _operators) { auto tem = op->close(_state); if (!tem.ok() && s.ok()) { diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 661b16c99ed88e..08fdb32d7fe34e 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -133,7 +133,7 @@ class PipelineTask { // if the pipeline create a bunch of pipeline task // must be call after all pipeline task is finish to release resource - virtual Status close(Status exec_status); + virtual Status close(Status exec_status, bool close_sink = true); void put_in_runnable_queue() { _schedule_time++; @@ -293,7 +293,8 @@ class PipelineTask { PipelineId pipeline_id() const { return _pipeline->id(); } - virtual void clear_blocking_state(bool wake_up_by_downstream = false) {} + virtual void clear_blocking_state() {} + virtual void set_wake_up_early() {} protected: void _finish_p_dependency() { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 6be1c6a1492c57..bea8d52200d49b 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -226,6 +226,10 @@ Status PipelineXTask::_open() { } Status PipelineXTask::execute(bool* eos) { + if (_eos) { + *eos = true; + return Status::OK(); + } SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_TIMER(_exec_timer); SCOPED_ATTACH_TASK(_state); @@ -247,28 +251,23 @@ Status PipelineXTask::execute(bool* eos) { cpu_qs->add_cpu_nanos(delta_cpu_time); } }}; - *eos = _sink->is_finished(_state) || _wake_up_by_downstream || is_final_state(_cur_state); - if (*eos) { - return Status::OK(); - } + if (has_dependency()) { set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY); return Status::OK(); } - if (_wake_up_by_downstream) { - *eos = true; - return Status::OK(); - } if (_runtime_filter_blocked_dependency() != nullptr) { set_state(PipelineTaskState::BLOCKED_FOR_RF); return Status::OK(); } - if (_wake_up_by_downstream) { - *eos = true; - return Status::OK(); - } + // The status must be runnable if (!_opened) { + if (_wake_up_early) { + *eos = true; + _eos = true; + return Status::OK(); + } { SCOPED_RAW_TIMER(&time_spent); RETURN_IF_ERROR(_open()); @@ -277,20 +276,20 @@ Status PipelineXTask::execute(bool* eos) { set_state(PipelineTaskState::BLOCKED_FOR_SOURCE); return Status::OK(); } - if (_wake_up_by_downstream) { - *eos = true; - return Status::OK(); - } if (!sink_can_write()) { set_state(PipelineTaskState::BLOCKED_FOR_SINK); return Status::OK(); } - if (_wake_up_by_downstream) { - *eos = true; - return Status::OK(); - } } + auto set_wake_up_and_dep_ready = [&]() { + if (wake_up_early()) { + return; + } + set_wake_up_early(); + clear_blocking_state(); + }; + Status status = Status::OK(); set_begin_execute_time(); while (!_fragment_context->is_canceled()) { @@ -298,18 +297,10 @@ Status PipelineXTask::execute(bool* eos) { set_state(PipelineTaskState::BLOCKED_FOR_SOURCE); break; } - if (_wake_up_by_downstream) { - *eos = true; - return Status::OK(); - } if (!sink_can_write()) { set_state(PipelineTaskState::BLOCKED_FOR_SINK); break; } - if (_wake_up_by_downstream) { - *eos = true; - return Status::OK(); - } /// When a task is cancelled, /// its blocking state will be cleared and it will transition to a ready state (though it is not truly ready). @@ -336,10 +327,15 @@ Status PipelineXTask::execute(bool* eos) { Status::Error("fault_inject pipeline_task executing failed"); return status; }); + + if (_sink->is_finished(_state)) { + set_wake_up_and_dep_ready(); + } + // `_dry_run` means sink operator need no more data + *eos = wake_up_early() || _dry_run; + // Pull block from operator chain - if (_dry_run || _sink->is_finished(_state)) { - *eos = true; - } else { + if (!*eos) { SCOPED_TIMER(_get_block_timer); _get_block_counter->update(1); try { @@ -350,14 +346,21 @@ Status PipelineXTask::execute(bool* eos) { } } + if (*eos) { + RETURN_IF_ERROR(close(Status::OK(), false)); + } + if (_block->rows() != 0 || *eos) { SCOPED_TIMER(_sink_timer); status = _sink->sink(_state, block, *eos); - if (!status.is()) { - RETURN_IF_ERROR(status); + if (status.is()) { + set_wake_up_and_dep_ready(); + } else if (!status) { + return status; } - *eos = status.is() ? true : *eos; + if (*eos) { // just return, the scheduler will do finish work + _eos = true; break; } } @@ -425,17 +428,14 @@ void PipelineXTask::finalize() { _le_state_map.clear(); } -Status PipelineXTask::close(Status exec_status) { +Status PipelineXTask::close(Status exec_status, bool close_sink) { int64_t close_ns = 0; - Defer defer {[&]() { - if (_task_queue) { - _task_queue->update_statistics(this, close_ns); - } - }}; Status s; { SCOPED_RAW_TIMER(&close_ns); - s = _sink->close(_state, exec_status); + if (close_sink) { + s = _sink->close(_state, exec_status); + } for (auto& op : _operators) { auto tem = op->close(_state); if (!tem.ok() && s.ok()) { @@ -444,10 +444,17 @@ Status PipelineXTask::close(Status exec_status) { } } if (_opened) { - _fresh_profile_counter(); - COUNTER_SET(_close_timer, close_ns); + COUNTER_UPDATE(_close_timer, close_ns); COUNTER_UPDATE(_task_profile->total_time_counter(), close_ns); } + if (close_sink && _opened) { + _task_profile->add_info_string("WakeUpEarly", wake_up_early() ? "true" : "false"); + _fresh_profile_counter(); + } + + if (_task_queue) { + _task_queue->update_statistics(this, close_ns); + } return s; } @@ -468,13 +475,13 @@ std::string PipelineXTask::debug_string() { // If at the same time FE cancel this pipeline task and logging debug_string before _blocked_dep is cleared, // it will think _blocked_dep is not nullptr and call _blocked_dep->debug_string(). auto* cur_blocked_dep = _blocked_dep; - fmt::format_to( - debug_string_buffer, - "PipelineTask[this = {}, state = {}, dry run = {}, elapse time " - "= {}s], _wake_up_by_downstream = {}, block dependency = {}, is running = " - "{}\noperators: ", - (void*)this, get_state_name(_cur_state), _dry_run, elapsed, _wake_up_by_downstream, - cur_blocked_dep && !_finished ? cur_blocked_dep->debug_string() : "NULL", is_running()); + fmt::format_to(debug_string_buffer, + "PipelineTask[this = {}, state = {}, dry run = {}, elapse time " + "= {}s], _wake_up_early = {}, block dependency = {}, is running = " + "{}\noperators: ", + (void*)this, get_state_name(_cur_state), _dry_run, elapsed, _wake_up_early, + cur_blocked_dep && !_finished ? cur_blocked_dep->debug_string() : "NULL", + is_running()); for (size_t i = 0; i < _operators.size(); i++) { fmt::format_to(debug_string_buffer, "\n{}", _opened && !_finished ? _operators[i]->debug_string(_state, i) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 42dca15076a170..0127e01b323083 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -69,7 +69,7 @@ class PipelineXTask : public PipelineTask { // if the pipeline create a bunch of pipeline task // must be call after all pipeline task is finish to release resource - Status close(Status exec_status) override; + Status close(Status exec_status, bool close_sink = true) override; Status close_sink(Status exec_status); bool source_can_read() override { @@ -138,8 +138,7 @@ class PipelineXTask : public PipelineTask { int task_id() const { return _index; }; - void clear_blocking_state(bool wake_up_by_downstream = false) override { - _wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream; + void clear_blocking_state() override { _state->get_query_ctx()->get_execution_dependency()->set_always_ready(); // We use a lock to assure all dependencies are not deconstructed here. std::unique_lock lc(_dependency_lock); @@ -177,7 +176,9 @@ class PipelineXTask : public PipelineTask { static bool should_revoke_memory(RuntimeState* state, int64_t revocable_mem_bytes); - bool wake_up_by_downstream() const { return _wake_up_by_downstream; } + bool wake_up_early() const { return _wake_up_early; } + + void set_wake_up_early() override { _wake_up_early = true; } private: friend class RuntimeFilterDependency; @@ -255,7 +256,8 @@ class PipelineXTask : public PipelineTask { std::atomic _finished {false}; std::mutex _dependency_lock; - std::atomic _wake_up_by_downstream = false; + std::atomic _wake_up_early = false; + std::atomic _eos = false; }; } // namespace doris::pipeline