From 6f8b8840f4009da73c5944e0b33eaebff229162e Mon Sep 17 00:00:00 2001 From: Pxl Date: Fri, 3 Jan 2025 18:37:27 +0800 Subject: [PATCH] =?UTF-8?q?[Bug](pipeline)=20make=20sink=20operator=20proc?= =?UTF-8?q?ess=20eos=20signals=20after=20wake=5Fup=5F=E2=80=A6=20(#45207)?= =?UTF-8?q?=20(#46374)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …early (#45207) 1. make sink operator process eos signals after wake_up_early 2. set wake_up_early when `pipeline task meet wake_up_by_downstream`/`sink reach limit`/`sink get eof status` 3. close non-sink operators after sink meet eos ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: ### Release note None ### Check List (For Author) - Test - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason - Behavior changed: - [ ] No. - [ ] Yes. - Does this need documentation? - [ ] No. - [ ] Yes. ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label --- be/src/exprs/runtime_filter.cpp | 6 +- be/src/pipeline/exec/hashjoin_build_sink.cpp | 57 +++++---- .../pipeline/exec/hashjoin_probe_operator.cpp | 2 +- be/src/pipeline/pipeline.cpp | 7 +- be/src/pipeline/pipeline_task.cpp | 116 +++++++++--------- be/src/pipeline/pipeline_task.h | 11 +- 6 files changed, 108 insertions(+), 91 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index f5e9c96184fff9..2169ec727b2428 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -498,12 +498,12 @@ class RuntimePredicateWrapper { switch (_filter_type) { case RuntimeFilterType::IN_FILTER: { if (!_context->hybrid_set) { - _context->ignored = true; + set_ignored(); return Status::OK(); } _context->hybrid_set->insert(wrapper->_context->hybrid_set.get()); if (_max_in_num >= 0 && _context->hybrid_set->size() >= _max_in_num) { - _context->ignored = true; + set_ignored(); // release in filter _context->hybrid_set.reset(); } @@ -1332,7 +1332,7 @@ void IRuntimeFilter::set_synced_size(uint64_t global_size) { } void IRuntimeFilter::set_ignored() { - _wrapper->_context->ignored = true; + _wrapper->set_ignored(); } bool IRuntimeFilter::get_ignored() { diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 991127d83fe26f..4fa9f9a95a6fc6 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -123,35 +123,48 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu } }}; - if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled()) { + if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() || !_eos) { return Base::close(state, exec_status); } - if (state->get_task()->wake_up_by_downstream()) { - if (_should_build_hash_table) { - // partitial ignore rf to make global rf work + try { + if (state->get_task()->wake_up_early()) { + // partitial ignore rf to make global rf work or ignore useless rf RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency)); RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters()); - } else { - // do not publish filter coz local rf not inited and useless - return Base::close(state, exec_status); - } - } else if (_should_build_hash_table) { - auto* block = _shared_state->build_block.get(); - uint64_t hash_table_size = block ? block->rows() : 0; - { - SCOPED_TIMER(_runtime_filter_init_timer); - RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size)); - RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state)); - } - if (hash_table_size > 1) { - SCOPED_TIMER(_runtime_filter_compute_timer); - _runtime_filter_slots->insert(block); + } else if (_should_build_hash_table) { + auto* block = _shared_state->build_block.get(); + uint64_t hash_table_size = block ? block->rows() : 0; + { + SCOPED_TIMER(_runtime_filter_init_timer); + RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size)); + RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state)); + } + if (hash_table_size > 1) { + SCOPED_TIMER(_runtime_filter_compute_timer); + _runtime_filter_slots->insert(block); + } } + + SCOPED_TIMER(_publish_runtime_filter_timer); + RETURN_IF_ERROR(_runtime_filter_slots->publish(state, !_should_build_hash_table)); + } catch (Exception& e) { + bool blocked_by_complete_build_stage = p._shared_hashtable_controller && + !p._shared_hash_table_context->complete_build_stage; + bool blocked_by_shared_hash_table_signal = !_should_build_hash_table && + p._shared_hashtable_controller && + !p._shared_hash_table_context->signaled; + + return Status::InternalError( + "rf process meet error: {}, wake_up_early: {}, should_build_hash_table: " + "{}, _finish_dependency: {}, blocked_by_complete_build_stage: {}, " + "blocked_by_shared_hash_table_signal: " + "{}", + e.to_string(), state->get_task()->wake_up_early(), _should_build_hash_table, + _finish_dependency->debug_string(), blocked_by_complete_build_stage, + blocked_by_shared_hash_table_signal); } - SCOPED_TIMER(_publish_runtime_filter_timer); - RETURN_IF_ERROR(_runtime_filter_slots->publish(state, !_should_build_hash_table)); return Base::close(state, exec_status); } @@ -516,7 +529,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); - local_state._eos = eos; if (local_state._should_build_hash_table) { // If eos or have already met a null value using short-circuit strategy, we do not need to pull // data from probe side. @@ -622,6 +634,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* } if (eos) { + local_state._eos = true; local_state.init_short_circuit_for_probe(); // Since the comparison of null values is meaningless, null aware left anti/semi join should not output null // when the build side is not empty. diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index 756a151394b41e..4f9184db8a5061 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -241,7 +241,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc // If we use a short-circuit strategy, should return block directly by add additional null data. auto block_rows = local_state._probe_block.rows(); if (local_state._probe_eos && block_rows == 0) { - *eos = local_state._probe_eos; + *eos = true; return Status::OK(); } diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp index 96da754daa5d98..6c39d361e59c77 100644 --- a/be/src/pipeline/pipeline.cpp +++ b/be/src/pipeline/pipeline.cpp @@ -112,7 +112,12 @@ void Pipeline::make_all_runnable() { if (_sink->count_down_destination()) { for (auto* task : _tasks) { if (task) { - task->clear_blocking_state(true); + task->set_wake_up_early(); + } + } + for (auto* task : _tasks) { + if (task) { + task->clear_blocking_state(); } } } diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 04a48d93be001b..9d83c475778401 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -22,6 +22,7 @@ #include #include +#include #include #include @@ -227,9 +228,6 @@ bool PipelineTask::_wait_to_start() { _blocked_dep = _execution_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { static_cast(_blocked_dep)->start_watcher(); - if (_wake_up_by_downstream) { - _eos = true; - } return true; } @@ -237,9 +235,6 @@ bool PipelineTask::_wait_to_start() { _blocked_dep = op_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { _blocked_dep->start_watcher(); - if (_wake_up_by_downstream) { - _eos = true; - } return true; } } @@ -261,9 +256,6 @@ bool PipelineTask::_is_blocked() { _blocked_dep = dep->is_blocked_by(this); if (_blocked_dep != nullptr) { _blocked_dep->start_watcher(); - if (_wake_up_by_downstream) { - _eos = true; - } return true; } } @@ -283,9 +275,6 @@ bool PipelineTask::_is_blocked() { _blocked_dep = op_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { _blocked_dep->start_watcher(); - if (_wake_up_by_downstream) { - _eos = true; - } return true; } } @@ -293,15 +282,15 @@ bool PipelineTask::_is_blocked() { } Status PipelineTask::execute(bool* eos) { - SCOPED_TIMER(_task_profile->total_time_counter()); - SCOPED_TIMER(_exec_timer); - SCOPED_ATTACH_TASK(_state); - _eos = _sink->is_finished(_state) || _eos || _wake_up_by_downstream; - *eos = _eos; if (_eos) { - // If task is waken up by finish dependency, `_eos` is set to true by last execution, and we should return here. + *eos = true; return Status::OK(); } + + SCOPED_TIMER(_task_profile->total_time_counter()); + SCOPED_TIMER(_exec_timer); + SCOPED_ATTACH_TASK(_state); + int64_t time_spent = 0; DBUG_EXECUTE_IF("fault_inject::PipelineXTask::execute", { Status status = Status::Error("fault_inject pipeline_task execute failed"); @@ -324,27 +313,31 @@ Status PipelineTask::execute(bool* eos) { if (_wait_to_start()) { return Status::OK(); } - if (_wake_up_by_downstream) { - _eos = true; - *eos = true; - return Status::OK(); - } + // The status must be runnable if (!_opened && !_fragment_context->is_canceled()) { + if (_wake_up_early) { + *eos = true; + _eos = true; + return Status::OK(); + } RETURN_IF_ERROR(_open()); } + auto set_wake_up_and_dep_ready = [&]() { + if (wake_up_early()) { + return; + } + set_wake_up_early(); + clear_blocking_state(); + }; + _task_profile->add_info_string("TaskState", "Runnable"); _task_profile->add_info_string("BlockedByDependency", ""); while (!_fragment_context->is_canceled()) { if (_is_blocked()) { return Status::OK(); } - if (_wake_up_by_downstream) { - _eos = true; - *eos = true; - return Status::OK(); - } /// When a task is cancelled, /// its blocking state will be cleared and it will transition to a ready state (though it is not truly ready). @@ -365,47 +358,47 @@ Status PipelineTask::execute(bool* eos) { RETURN_IF_ERROR(_sink->revoke_memory(_state)); continue; } - *eos = _eos; DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", { Status status = Status::Error("fault_inject pipeline_task executing failed"); return status; }); - // `_dry_run` means sink operator need no more data // `_sink->is_finished(_state)` means sink operator should be finished - if (_dry_run || _sink->is_finished(_state)) { - *eos = true; - _eos = true; - } else { + if (_sink->is_finished(_state)) { + set_wake_up_and_dep_ready(); + } + + // `_dry_run` means sink operator need no more data + *eos = wake_up_early() || _dry_run; + if (!*eos) { SCOPED_TIMER(_get_block_timer); _get_block_counter->update(1); RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos)); } + if (*eos) { + RETURN_IF_ERROR(close(Status::OK(), false)); + } + if (_block->rows() != 0 || *eos) { SCOPED_TIMER(_sink_timer); - Status status = Status::OK(); - // Define a lambda function to catch sink exception, because sink will check - // return error status with EOF, it is special, could not return directly. - auto sink_function = [&]() -> Status { - Status internal_st; - internal_st = _sink->sink(_state, block, *eos); - return internal_st; - }; - status = sink_function(); - if (!status.is()) { - RETURN_IF_ERROR(status); + Status status = _sink->sink(_state, block, *eos); + + if (status.is()) { + set_wake_up_and_dep_ready(); + } else if (!status) { + return status; } - *eos = status.is() ? true : *eos; + if (*eos) { // just return, the scheduler will do finish work - _eos = true; _task_profile->add_info_string("TaskState", "Finished"); + _eos = true; return Status::OK(); } } } - static_cast(get_task_queue()->push_back(this)); + RETURN_IF_ERROR(get_task_queue()->push_back(this)); return Status::OK(); } @@ -474,17 +467,14 @@ void PipelineTask::finalize() { _le_state_map.clear(); } -Status PipelineTask::close(Status exec_status) { +Status PipelineTask::close(Status exec_status, bool close_sink) { int64_t close_ns = 0; - Defer defer {[&]() { - if (_task_queue) { - _task_queue->update_statistics(this, close_ns); - } - }}; Status s; { SCOPED_RAW_TIMER(&close_ns); - s = _sink->close(_state, exec_status); + if (close_sink) { + s = _sink->close(_state, exec_status); + } for (auto& op : _operators) { auto tem = op->close(_state); if (!tem.ok() && s.ok()) { @@ -493,10 +483,18 @@ Status PipelineTask::close(Status exec_status) { } } if (_opened) { - _fresh_profile_counter(); - COUNTER_SET(_close_timer, close_ns); + COUNTER_UPDATE(_close_timer, close_ns); COUNTER_UPDATE(_task_profile->total_time_counter(), close_ns); } + + if (close_sink && _opened) { + _task_profile->add_info_string("WakeUpEarly", wake_up_early() ? "true" : "false"); + _fresh_profile_counter(); + } + + if (_task_queue) { + _task_queue->update_statistics(this, close_ns); + } return s; } @@ -512,10 +510,10 @@ std::string PipelineTask::debug_string() { auto elapsed = _fragment_context->elapsed_time() / 1000000000.0; fmt::format_to(debug_string_buffer, "PipelineTask[this = {}, id = {}, open = {}, eos = {}, finish = {}, dry run = " - "{}, elapse time = {}s, _wake_up_by_downstream = {}], block dependency = {}, is " + "{}, elapse time = {}s, _wake_up_early = {}], block dependency = {}, is " "running = {}\noperators: ", (void*)this, _index, _opened, _eos, _finalized, _dry_run, elapsed, - _wake_up_by_downstream.load(), + _wake_up_early.load(), cur_blocked_dep && !_finalized ? cur_blocked_dep->debug_string() : "NULL", is_running()); for (size_t i = 0; i < _operators.size(); i++) { diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index febc9634c49f23..94a553e2fa14ab 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -61,7 +61,7 @@ class PipelineTask { // if the pipeline create a bunch of pipeline task // must be call after all pipeline task is finish to release resource - Status close(Status exec_status); + Status close(Status exec_status, bool close_sink = true); PipelineFragmentContext* fragment_context() { return _fragment_context; } @@ -135,11 +135,12 @@ class PipelineTask { int task_id() const { return _index; }; bool is_finalized() const { return _finalized; } - void clear_blocking_state(bool wake_up_by_downstream = false) { + void set_wake_up_early() { _wake_up_early = true; } + + void clear_blocking_state() { _state->get_query_ctx()->get_execution_dependency()->set_always_ready(); // We use a lock to assure all dependencies are not deconstructed here. std::unique_lock lc(_dependency_lock); - _wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream; if (!_finalized) { _execution_dep->set_always_ready(); for (auto* dep : _filter_dependencies) { @@ -236,7 +237,7 @@ class PipelineTask { PipelineId pipeline_id() const { return _pipeline->id(); } - bool wake_up_by_downstream() const { return _wake_up_by_downstream; } + bool wake_up_early() const { return _wake_up_early; } private: friend class RuntimeFilterDependency; @@ -318,7 +319,7 @@ class PipelineTask { std::atomic _running = false; std::atomic _eos = false; - std::atomic _wake_up_by_downstream = false; + std::atomic _wake_up_early = false; }; } // namespace doris::pipeline