Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug](pipeline) make sink operator process eos signals after wake_up_early #45207

Merged
merged 3 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -498,12 +498,12 @@ class RuntimePredicateWrapper {
switch (_filter_type) {
case RuntimeFilterType::IN_FILTER: {
if (!_context->hybrid_set) {
_context->ignored = true;
set_ignored();
return Status::OK();
}
_context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
if (_max_in_num >= 0 && _context->hybrid_set->size() >= _max_in_num) {
_context->ignored = true;
set_ignored();
// release in filter
_context->hybrid_set.reset();
}
Expand Down Expand Up @@ -1337,7 +1337,7 @@ void IRuntimeFilter::set_synced_size(uint64_t global_size) {
}

void IRuntimeFilter::set_ignored() {
_wrapper->_context->ignored = true;
_wrapper->set_ignored();
}

bool IRuntimeFilter::get_ignored() {
Expand Down
50 changes: 21 additions & 29 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,26 +135,16 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
}
}};

if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled()) {
if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() || !_eos) {
return Base::close(state, exec_status);
}

try {
if (state->get_task()->wake_up_by_downstream()) {
if (_should_build_hash_table) {
// partitial ignore rf to make global rf work
RETURN_IF_ERROR(
_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
} else {
// do not publish filter coz local rf not inited and useless
return Base::close(state, exec_status);
}
if (state->get_task()->wake_up_early()) {
BiteTheDDDDt marked this conversation as resolved.
Show resolved Hide resolved
// partitial ignore rf to make global rf work or ignore useless rf
RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
} else if (_should_build_hash_table) {
if (p._shared_hashtable_controller &&
!p._shared_hash_table_context->complete_build_stage) {
return Status::InternalError("close before sink meet eos");
}
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
{
Expand All @@ -166,26 +156,25 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
}
} else if ((p._shared_hashtable_controller && !p._shared_hash_table_context->signaled) ||
(p._shared_hash_table_context &&
!p._shared_hash_table_context->complete_build_stage)) {
throw Exception(ErrorCode::INTERNAL_ERROR, "build_sink::close meet error state");
} else {
RETURN_IF_ERROR(
_runtime_filter_slots->copy_from_shared_context(p._shared_hash_table_context));
}

SCOPED_TIMER(_publish_runtime_filter_timer);
RETURN_IF_ERROR(_runtime_filter_slots->publish(state, !_should_build_hash_table));
} catch (Exception& e) {
bool blocked_by_complete_build_stage = p._shared_hashtable_controller &&
!p._shared_hash_table_context->complete_build_stage;
bool blocked_by_shared_hash_table_signal = !_should_build_hash_table &&
p._shared_hashtable_controller &&
!p._shared_hash_table_context->signaled;

return Status::InternalError(
"rf process meet error: {}, wake_up_by_downstream: {}, should_build_hash_table: "
"{}, _finish_dependency: {}, complete_build_stage: {}, shared_hash_table_signaled: "
"rf process meet error: {}, wake_up_early: {}, should_build_hash_table: "
"{}, _finish_dependency: {}, blocked_by_complete_build_stage: {}, "
"blocked_by_shared_hash_table_signal: "
"{}",
e.to_string(), state->get_task()->wake_up_by_downstream(), _should_build_hash_table,
_finish_dependency->debug_string(),
p._shared_hash_table_context && !p._shared_hash_table_context->complete_build_stage,
p._shared_hashtable_controller && !p._shared_hash_table_context->signaled);
e.to_string(), state->get_task()->wake_up_early(), _should_build_hash_table,
_finish_dependency->debug_string(), blocked_by_complete_build_stage,
blocked_by_shared_hash_table_signal);
}
return Base::close(state, exec_status);
}
Expand Down Expand Up @@ -479,7 +468,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());

local_state._eos = eos;
if (local_state._should_build_hash_table) {
// If eos or have already met a null value using short-circuit strategy, we do not need to pull
// data from probe side.
Expand Down Expand Up @@ -556,6 +544,9 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
return _shared_hash_table_context->status;
}

RETURN_IF_ERROR(local_state._runtime_filter_slots->copy_from_shared_context(
_shared_hash_table_context));

local_state.profile()->add_info_string(
"SharedHashTableFrom",
print_id(
Expand All @@ -581,6 +572,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
}

if (eos) {
local_state._eos = true;
local_state.init_short_circuit_for_probe();
// Since the comparison of null values is meaningless, null aware left anti/semi join should not output null
// when the build side is not empty.
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
// If we use a short-circuit strategy, should return block directly by add additional null data.
auto block_rows = local_state._probe_block.rows();
if (local_state._probe_eos && block_rows == 0) {
*eos = local_state._probe_eos;
*eos = true;
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ void Pipeline::make_all_runnable() {
if (_sink->count_down_destination()) {
for (auto* task : _tasks) {
if (task) {
task->set_wake_up_by_downstream();
task->set_wake_up_early();
}
}
for (auto* task : _tasks) {
Expand Down
116 changes: 57 additions & 59 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <glog/logging.h>
#include <stddef.h>

#include <algorithm>
#include <ostream>
#include <vector>

Expand Down Expand Up @@ -223,19 +224,13 @@ bool PipelineTask::_wait_to_start() {
_blocked_dep = _execution_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
static_cast<Dependency*>(_blocked_dep)->start_watcher();
if (_wake_up_by_downstream) {
_eos = true;
}
return true;
}

for (auto* op_dep : _filter_dependencies) {
_blocked_dep = op_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
if (_wake_up_by_downstream) {
_eos = true;
}
return true;
}
}
Expand All @@ -257,9 +252,6 @@ bool PipelineTask::_is_blocked() {
_blocked_dep = dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
if (_wake_up_by_downstream) {
_eos = true;
}
return true;
}
}
Expand All @@ -279,25 +271,22 @@ bool PipelineTask::_is_blocked() {
_blocked_dep = op_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
if (_wake_up_by_downstream) {
_eos = true;
}
return true;
}
}
return false;
}

Status PipelineTask::execute(bool* eos) {
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'execute' has cognitive complexity of 70 (threshold 50) [readability-function-cognitive-complexity]

Status PipelineTask::execute(bool* eos) {
                     ^
Additional context

be/src/pipeline/pipeline_task.cpp:280: +1, including nesting penalty of 0, nesting level increased to 1

    if (_eos) {
    ^

be/src/pipeline/pipeline_task.cpp:290: +1, including nesting penalty of 0, nesting level increased to 1

    DBUG_EXECUTE_IF("fault_inject::PipelineXTask::execute", {
    ^

be/src/util/debug_points.h:36: expanded from macro 'DBUG_EXECUTE_IF'

    if (UNLIKELY(config::enable_debug_points)) {                              \
    ^

be/src/pipeline/pipeline_task.cpp:290: +2, including nesting penalty of 1, nesting level increased to 2

    DBUG_EXECUTE_IF("fault_inject::PipelineXTask::execute", {
    ^

be/src/util/debug_points.h:38: expanded from macro 'DBUG_EXECUTE_IF'

        if (dp) {                                                             \
        ^

be/src/pipeline/pipeline_task.cpp:308: +1, including nesting penalty of 0, nesting level increased to 1

    if (_wait_to_start()) {
    ^

be/src/pipeline/pipeline_task.cpp:313: +1, including nesting penalty of 0, nesting level increased to 1

    if (!_opened && !_fragment_context->is_canceled()) {
    ^

be/src/pipeline/pipeline_task.cpp:313: +1

    if (!_opened && !_fragment_context->is_canceled()) {
                 ^

be/src/pipeline/pipeline_task.cpp:314: +2, including nesting penalty of 1, nesting level increased to 2

        if (_wake_up_early) {
        ^

be/src/pipeline/pipeline_task.cpp:319: +2, including nesting penalty of 1, nesting level increased to 2

        RETURN_IF_ERROR(_open());
        ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/pipeline/pipeline_task.cpp:319: +3, including nesting penalty of 2, nesting level increased to 3

        RETURN_IF_ERROR(_open());
        ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/pipeline/pipeline_task.cpp:322: nesting level increased to 1

    auto set_wake_up_and_dep_ready = [&]() {
                                     ^

be/src/pipeline/pipeline_task.cpp:323: +2, including nesting penalty of 1, nesting level increased to 2

        if (wake_up_early()) {
        ^

be/src/pipeline/pipeline_task.cpp:332: +1, including nesting penalty of 0, nesting level increased to 1

    while (!_fragment_context->is_canceled()) {
    ^

be/src/pipeline/pipeline_task.cpp:333: +2, including nesting penalty of 1, nesting level increased to 2

        if (_is_blocked()) {
        ^

be/src/pipeline/pipeline_task.cpp:340: +2, including nesting penalty of 1, nesting level increased to 2

        if (_fragment_context->is_canceled()) {
        ^

be/src/pipeline/pipeline_task.cpp:344: +2, including nesting penalty of 1, nesting level increased to 2

        if (time_spent > THREAD_TIME_SLICE) {
        ^

be/src/pipeline/pipeline_task.cpp:352: +2, including nesting penalty of 1, nesting level increased to 2

        if (should_revoke_memory(_state, sink_revocable_mem_size)) {
        ^

be/src/pipeline/pipeline_task.cpp:353: +3, including nesting penalty of 2, nesting level increased to 3

            RETURN_IF_ERROR(_sink->revoke_memory(_state));
            ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/pipeline/pipeline_task.cpp:353: +4, including nesting penalty of 3, nesting level increased to 4

            RETURN_IF_ERROR(_sink->revoke_memory(_state));
            ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/pipeline/pipeline_task.cpp:356: +2, including nesting penalty of 1, nesting level increased to 2

        DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", {
        ^

be/src/util/debug_points.h:36: expanded from macro 'DBUG_EXECUTE_IF'

    if (UNLIKELY(config::enable_debug_points)) {                              \
    ^

be/src/pipeline/pipeline_task.cpp:356: +3, including nesting penalty of 2, nesting level increased to 3

        DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", {
        ^

be/src/util/debug_points.h:38: expanded from macro 'DBUG_EXECUTE_IF'

        if (dp) {                                                             \
        ^

be/src/pipeline/pipeline_task.cpp:362: +2, including nesting penalty of 1, nesting level increased to 2

        if (_sink->is_finished(_state)) {
        ^

be/src/pipeline/pipeline_task.cpp:367: +1

        *eos = wake_up_early() || _dry_run;
                               ^

be/src/pipeline/pipeline_task.cpp:368: +2, including nesting penalty of 1, nesting level increased to 2

        if (!*eos) {
        ^

be/src/pipeline/pipeline_task.cpp:371: +3, including nesting penalty of 2, nesting level increased to 3

            RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos));
            ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/pipeline/pipeline_task.cpp:371: +4, including nesting penalty of 3, nesting level increased to 4

            RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos));
            ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/pipeline/pipeline_task.cpp:374: +2, including nesting penalty of 1, nesting level increased to 2

        if (*eos) {
        ^

be/src/pipeline/pipeline_task.cpp:375: +3, including nesting penalty of 2, nesting level increased to 3

            RETURN_IF_ERROR(close(Status::OK(), false));
            ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/pipeline/pipeline_task.cpp:375: +4, including nesting penalty of 3, nesting level increased to 4

            RETURN_IF_ERROR(close(Status::OK(), false));
            ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/pipeline/pipeline_task.cpp:378: +2, including nesting penalty of 1, nesting level increased to 2

        if (_block->rows() != 0 || *eos) {
        ^

be/src/pipeline/pipeline_task.cpp:382: +3, including nesting penalty of 2, nesting level increased to 3

            if (status.is<ErrorCode::END_OF_FILE>()) {
            ^

be/src/pipeline/pipeline_task.cpp:384: +1, nesting level increased to 3

            } else if (!status) {
                   ^

be/src/pipeline/pipeline_task.cpp:388: +3, including nesting penalty of 2, nesting level increased to 3

            if (*eos) { // just return, the scheduler will do finish work
            ^

be/src/pipeline/pipeline_task.cpp:396: +1, including nesting penalty of 0, nesting level increased to 1

    RETURN_IF_ERROR(get_task_queue()->push_back(this));
    ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/pipeline/pipeline_task.cpp:396: +2, including nesting penalty of 1, nesting level increased to 2

    RETURN_IF_ERROR(get_task_queue()->push_back(this));
    ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_TIMER(_exec_timer);
SCOPED_ATTACH_TASK(_state);
_eos = _sink->is_finished(_state) || _eos || _wake_up_by_downstream;
*eos = _eos;
if (_eos) {
// If task is waken up by finish dependency, `_eos` is set to true by last execution, and we should return here.
*eos = true;
return Status::OK();
}

SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_TIMER(_exec_timer);
SCOPED_ATTACH_TASK(_state);

int64_t time_spent = 0;
DBUG_EXECUTE_IF("fault_inject::PipelineXTask::execute", {
Status status = Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task execute failed");
Expand All @@ -320,27 +309,31 @@ Status PipelineTask::execute(bool* eos) {
if (_wait_to_start()) {
return Status::OK();
}
if (_wake_up_by_downstream) {
_eos = true;
*eos = true;
return Status::OK();
}

// The status must be runnable
if (!_opened && !_fragment_context->is_canceled()) {
if (_wake_up_early) {
BiteTheDDDDt marked this conversation as resolved.
Show resolved Hide resolved
*eos = true;
_eos = true;
return Status::OK();
}
RETURN_IF_ERROR(_open());
}

auto set_wake_up_and_dep_ready = [&]() {
if (wake_up_early()) {
BiteTheDDDDt marked this conversation as resolved.
Show resolved Hide resolved
return;
}
set_wake_up_early();
clear_blocking_state();
};

_task_profile->add_info_string("TaskState", "Runnable");
_task_profile->add_info_string("BlockedByDependency", "");
while (!_fragment_context->is_canceled()) {
if (_is_blocked()) {
return Status::OK();
}
if (_wake_up_by_downstream) {
_eos = true;
*eos = true;
return Status::OK();
}

/// When a task is cancelled,
/// its blocking state will be cleared and it will transition to a ready state (though it is not truly ready).
Expand All @@ -361,47 +354,47 @@ Status PipelineTask::execute(bool* eos) {
RETURN_IF_ERROR(_sink->revoke_memory(_state));
continue;
}
*eos = _eos;
DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", {
Status status =
Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task executing failed");
return status;
});
// `_dry_run` means sink operator need no more data
// `_sink->is_finished(_state)` means sink operator should be finished
if (_dry_run || _sink->is_finished(_state)) {
*eos = true;
_eos = true;
} else {
if (_sink->is_finished(_state)) {
set_wake_up_and_dep_ready();
}

// `_dry_run` means sink operator need no more data
*eos = wake_up_early() || _dry_run;
if (!*eos) {
SCOPED_TIMER(_get_block_timer);
_get_block_counter->update(1);
RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos));
}

if (*eos) {
RETURN_IF_ERROR(close(Status::OK(), false));
}

if (_block->rows() != 0 || *eos) {
SCOPED_TIMER(_sink_timer);
Status status = Status::OK();
// Define a lambda function to catch sink exception, because sink will check
// return error status with EOF, it is special, could not return directly.
auto sink_function = [&]() -> Status {
Status internal_st;
internal_st = _sink->sink(_state, block, *eos);
return internal_st;
};
status = sink_function();
if (!status.is<ErrorCode::END_OF_FILE>()) {
RETURN_IF_ERROR(status);
Status status = _sink->sink(_state, block, *eos);

if (status.is<ErrorCode::END_OF_FILE>()) {
set_wake_up_and_dep_ready();
} else if (!status) {
return status;
}
*eos = status.is<ErrorCode::END_OF_FILE>() ? true : *eos;

if (*eos) { // just return, the scheduler will do finish work
_eos = true;
_task_profile->add_info_string("TaskState", "Finished");
_eos = true;
return Status::OK();
}
}
}

static_cast<void>(get_task_queue()->push_back(this));
RETURN_IF_ERROR(get_task_queue()->push_back(this));
return Status::OK();
}

Expand Down Expand Up @@ -470,17 +463,14 @@ void PipelineTask::finalize() {
_le_state_map.clear();
}

Status PipelineTask::close(Status exec_status) {
Status PipelineTask::close(Status exec_status, bool close_sink) {
int64_t close_ns = 0;
Defer defer {[&]() {
if (_task_queue) {
_task_queue->update_statistics(this, close_ns);
}
}};
Status s;
{
SCOPED_RAW_TIMER(&close_ns);
s = _sink->close(_state, exec_status);
if (close_sink) {
s = _sink->close(_state, exec_status);
}
for (auto& op : _operators) {
auto tem = op->close(_state);
BiteTheDDDDt marked this conversation as resolved.
Show resolved Hide resolved
if (!tem.ok() && s.ok()) {
Expand All @@ -489,10 +479,18 @@ Status PipelineTask::close(Status exec_status) {
}
}
if (_opened) {
_fresh_profile_counter();
COUNTER_SET(_close_timer, close_ns);
COUNTER_UPDATE(_close_timer, close_ns);
COUNTER_UPDATE(_task_profile->total_time_counter(), close_ns);
}

if (close_sink && _opened) {
_task_profile->add_info_string("WakeUpEarly", wake_up_early() ? "true" : "false");
_fresh_profile_counter();
}

if (_task_queue) {
_task_queue->update_statistics(this, close_ns);
}
return s;
}

Expand All @@ -508,10 +506,10 @@ std::string PipelineTask::debug_string() {
auto elapsed = _fragment_context->elapsed_time() / 1000000000.0;
fmt::format_to(debug_string_buffer,
"PipelineTask[this = {}, id = {}, open = {}, eos = {}, finish = {}, dry run = "
"{}, elapse time = {}s, _wake_up_by_downstream = {}], block dependency = {}, is "
"{}, elapse time = {}s, _wake_up_early = {}], block dependency = {}, is "
"running = {}\noperators: ",
(void*)this, _index, _opened, _eos, _finalized, _dry_run, elapsed,
_wake_up_by_downstream.load(),
_wake_up_early.load(),
cur_blocked_dep && !_finalized ? cur_blocked_dep->debug_string() : "NULL",
is_running());
for (size_t i = 0; i < _operators.size(); i++) {
Expand Down
Loading
Loading