Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug](pipeline) make sink operator process eos signals after wake_up_early #45207

Merged
merged 3 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -498,12 +498,12 @@ class RuntimePredicateWrapper {
switch (_filter_type) {
case RuntimeFilterType::IN_FILTER: {
if (!_context->hybrid_set) {
_context->ignored = true;
set_ignored();
return Status::OK();
}
_context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
if (_max_in_num >= 0 && _context->hybrid_set->size() >= _max_in_num) {
_context->ignored = true;
set_ignored();
// release in filter
_context->hybrid_set.reset();
}
Expand Down Expand Up @@ -1337,7 +1337,7 @@ void IRuntimeFilter::set_synced_size(uint64_t global_size) {
}

void IRuntimeFilter::set_ignored() {
_wrapper->_context->ignored = true;
_wrapper->set_ignored();
}

bool IRuntimeFilter::get_ignored() {
Expand Down
3 changes: 0 additions & 3 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,6 @@ void ExchangeSinkLocalState::on_channel_finished(InstanceLoId channel_id) {
_finished_channels.emplace(channel_id);
if (_working_channels_count.fetch_sub(1) == 1) {
set_reach_limit();
if (_finish_dependency) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set_reach_limit 实际上会使pipeline task触发is_finished的逻辑,这个pr中触发is_finished会直接set_wake_up_early,以统一的方式来set dependency

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果当前task处于被finish dependency阻塞的状态,on_channel_finished是在rpc的callback中执行。之前可以在这里set ready来唤醒task,这么改完以后是如何唤醒task执行呢

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

图片

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不是,是当前task已经处在阻塞状态了。能走到这里的前提是这个task被唤醒起来执行了

_finish_dependency->set_ready();
}
}
}
}
Expand Down
51 changes: 22 additions & 29 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,26 +135,17 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
}
}};

if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled()) {
if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() ||
!p.get_local_state(state)._eos) {
BiteTheDDDDt marked this conversation as resolved.
Show resolved Hide resolved
return Base::close(state, exec_status);
}

try {
if (state->get_task()->wake_up_by_downstream()) {
if (_should_build_hash_table) {
// partitial ignore rf to make global rf work
RETURN_IF_ERROR(
_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
} else {
// do not publish filter coz local rf not inited and useless
return Base::close(state, exec_status);
}
if (state->get_task()->wake_up_early()) {
BiteTheDDDDt marked this conversation as resolved.
Show resolved Hide resolved
// partitial ignore rf to make global rf work or ignore useless rf
RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
} else if (_should_build_hash_table) {
if (p._shared_hashtable_controller &&
!p._shared_hash_table_context->complete_build_stage) {
return Status::InternalError("close before sink meet eos");
}
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
{
Expand All @@ -166,26 +157,25 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
}
} else if ((p._shared_hashtable_controller && !p._shared_hash_table_context->signaled) ||
(p._shared_hash_table_context &&
!p._shared_hash_table_context->complete_build_stage)) {
throw Exception(ErrorCode::INTERNAL_ERROR, "build_sink::close meet error state");
} else {
RETURN_IF_ERROR(
_runtime_filter_slots->copy_from_shared_context(p._shared_hash_table_context));
}

SCOPED_TIMER(_publish_runtime_filter_timer);
RETURN_IF_ERROR(_runtime_filter_slots->publish(state, !_should_build_hash_table));
} catch (Exception& e) {
bool blocked_by_complete_build_stage = p._shared_hashtable_controller &&
!p._shared_hash_table_context->complete_build_stage;
bool blocked_by_shared_hash_table_signal = !_should_build_hash_table &&
p._shared_hashtable_controller &&
!p._shared_hash_table_context->signaled;

return Status::InternalError(
"rf process meet error: {}, wake_up_by_downstream: {}, should_build_hash_table: "
"{}, _finish_dependency: {}, complete_build_stage: {}, shared_hash_table_signaled: "
"rf process meet error: {}, wake_up_early: {}, should_build_hash_table: "
"{}, _finish_dependency: {}, blocked_by_complete_build_stage: {}, "
"blocked_by_shared_hash_table_signal: "
"{}",
e.to_string(), state->get_task()->wake_up_by_downstream(), _should_build_hash_table,
_finish_dependency->debug_string(),
p._shared_hash_table_context && !p._shared_hash_table_context->complete_build_stage,
p._shared_hashtable_controller && !p._shared_hash_table_context->signaled);
e.to_string(), state->get_task()->wake_up_early(), _should_build_hash_table,
_finish_dependency->debug_string(), blocked_by_complete_build_stage,
blocked_by_shared_hash_table_signal);
}
return Base::close(state, exec_status);
}
Expand Down Expand Up @@ -479,7 +469,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());

local_state._eos = eos;
if (local_state._should_build_hash_table) {
// If eos or have already met a null value using short-circuit strategy, we do not need to pull
// data from probe side.
Expand Down Expand Up @@ -556,6 +545,9 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
return _shared_hash_table_context->status;
}

RETURN_IF_ERROR(local_state._runtime_filter_slots->copy_from_shared_context(
_shared_hash_table_context));

local_state.profile()->add_info_string(
"SharedHashTableFrom",
print_id(
Expand All @@ -581,6 +573,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
}

if (eos) {
local_state._eos = true;
local_state.init_short_circuit_for_probe();
// Since the comparison of null values is meaningless, null aware left anti/semi join should not output null
// when the build side is not empty.
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
// If we use a short-circuit strategy, should return block directly by add additional null data.
auto block_rows = local_state._probe_block.rows();
if (local_state._probe_eos && block_rows == 0) {
*eos = local_state._probe_eos;
*eos = true;
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ void Pipeline::make_all_runnable() {
if (_sink->count_down_destination()) {
for (auto* task : _tasks) {
if (task) {
task->set_wake_up_by_downstream();
task->set_wake_up_early();
}
}
for (auto* task : _tasks) {
Expand Down
97 changes: 45 additions & 52 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <glog/logging.h>
#include <stddef.h>

#include <algorithm>
#include <ostream>
#include <vector>

Expand Down Expand Up @@ -223,19 +224,13 @@ bool PipelineTask::_wait_to_start() {
_blocked_dep = _execution_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
static_cast<Dependency*>(_blocked_dep)->start_watcher();
if (_wake_up_by_downstream) {
_eos = true;
}
return true;
}

for (auto* op_dep : _filter_dependencies) {
_blocked_dep = op_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
if (_wake_up_by_downstream) {
_eos = true;
}
return true;
}
}
Expand All @@ -257,9 +252,6 @@ bool PipelineTask::_is_blocked() {
_blocked_dep = dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
if (_wake_up_by_downstream) {
_eos = true;
}
return true;
}
}
Expand All @@ -279,25 +271,22 @@ bool PipelineTask::_is_blocked() {
_blocked_dep = op_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
if (_wake_up_by_downstream) {
_eos = true;
}
return true;
}
}
return false;
}

Status PipelineTask::execute(bool* eos) {
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'execute' has cognitive complexity of 70 (threshold 50) [readability-function-cognitive-complexity]

Status PipelineTask::execute(bool* eos) {
                     ^
Additional context

be/src/pipeline/pipeline_task.cpp:280: +1, including nesting penalty of 0, nesting level increased to 1

    if (_eos) {
    ^

be/src/pipeline/pipeline_task.cpp:290: +1, including nesting penalty of 0, nesting level increased to 1

    DBUG_EXECUTE_IF("fault_inject::PipelineXTask::execute", {
    ^

be/src/util/debug_points.h:36: expanded from macro 'DBUG_EXECUTE_IF'

    if (UNLIKELY(config::enable_debug_points)) {                              \
    ^

be/src/pipeline/pipeline_task.cpp:290: +2, including nesting penalty of 1, nesting level increased to 2

    DBUG_EXECUTE_IF("fault_inject::PipelineXTask::execute", {
    ^

be/src/util/debug_points.h:38: expanded from macro 'DBUG_EXECUTE_IF'

        if (dp) {                                                             \
        ^

be/src/pipeline/pipeline_task.cpp:308: +1, including nesting penalty of 0, nesting level increased to 1

    if (_wait_to_start()) {
    ^

be/src/pipeline/pipeline_task.cpp:313: +1, including nesting penalty of 0, nesting level increased to 1

    if (!_opened && !_fragment_context->is_canceled()) {
    ^

be/src/pipeline/pipeline_task.cpp:313: +1

    if (!_opened && !_fragment_context->is_canceled()) {
                 ^

be/src/pipeline/pipeline_task.cpp:314: +2, including nesting penalty of 1, nesting level increased to 2

        if (_wake_up_early) {
        ^

be/src/pipeline/pipeline_task.cpp:319: +2, including nesting penalty of 1, nesting level increased to 2

        RETURN_IF_ERROR(_open());
        ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/pipeline/pipeline_task.cpp:319: +3, including nesting penalty of 2, nesting level increased to 3

        RETURN_IF_ERROR(_open());
        ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/pipeline/pipeline_task.cpp:322: nesting level increased to 1

    auto set_wake_up_and_dep_ready = [&]() {
                                     ^

be/src/pipeline/pipeline_task.cpp:323: +2, including nesting penalty of 1, nesting level increased to 2

        if (wake_up_early()) {
        ^

be/src/pipeline/pipeline_task.cpp:332: +1, including nesting penalty of 0, nesting level increased to 1

    while (!_fragment_context->is_canceled()) {
    ^

be/src/pipeline/pipeline_task.cpp:333: +2, including nesting penalty of 1, nesting level increased to 2

        if (_is_blocked()) {
        ^

be/src/pipeline/pipeline_task.cpp:340: +2, including nesting penalty of 1, nesting level increased to 2

        if (_fragment_context->is_canceled()) {
        ^

be/src/pipeline/pipeline_task.cpp:344: +2, including nesting penalty of 1, nesting level increased to 2

        if (time_spent > THREAD_TIME_SLICE) {
        ^

be/src/pipeline/pipeline_task.cpp:352: +2, including nesting penalty of 1, nesting level increased to 2

        if (should_revoke_memory(_state, sink_revocable_mem_size)) {
        ^

be/src/pipeline/pipeline_task.cpp:353: +3, including nesting penalty of 2, nesting level increased to 3

            RETURN_IF_ERROR(_sink->revoke_memory(_state));
            ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/pipeline/pipeline_task.cpp:353: +4, including nesting penalty of 3, nesting level increased to 4

            RETURN_IF_ERROR(_sink->revoke_memory(_state));
            ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/pipeline/pipeline_task.cpp:356: +2, including nesting penalty of 1, nesting level increased to 2

        DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", {
        ^

be/src/util/debug_points.h:36: expanded from macro 'DBUG_EXECUTE_IF'

    if (UNLIKELY(config::enable_debug_points)) {                              \
    ^

be/src/pipeline/pipeline_task.cpp:356: +3, including nesting penalty of 2, nesting level increased to 3

        DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", {
        ^

be/src/util/debug_points.h:38: expanded from macro 'DBUG_EXECUTE_IF'

        if (dp) {                                                             \
        ^

be/src/pipeline/pipeline_task.cpp:362: +2, including nesting penalty of 1, nesting level increased to 2

        if (_sink->is_finished(_state)) {
        ^

be/src/pipeline/pipeline_task.cpp:367: +1

        *eos = wake_up_early() || _dry_run;
                               ^

be/src/pipeline/pipeline_task.cpp:368: +2, including nesting penalty of 1, nesting level increased to 2

        if (!*eos) {
        ^

be/src/pipeline/pipeline_task.cpp:371: +3, including nesting penalty of 2, nesting level increased to 3

            RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos));
            ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/pipeline/pipeline_task.cpp:371: +4, including nesting penalty of 3, nesting level increased to 4

            RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos));
            ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/pipeline/pipeline_task.cpp:374: +2, including nesting penalty of 1, nesting level increased to 2

        if (*eos) {
        ^

be/src/pipeline/pipeline_task.cpp:375: +3, including nesting penalty of 2, nesting level increased to 3

            RETURN_IF_ERROR(close(Status::OK(), false));
            ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/pipeline/pipeline_task.cpp:375: +4, including nesting penalty of 3, nesting level increased to 4

            RETURN_IF_ERROR(close(Status::OK(), false));
            ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/pipeline/pipeline_task.cpp:378: +2, including nesting penalty of 1, nesting level increased to 2

        if (_block->rows() != 0 || *eos) {
        ^

be/src/pipeline/pipeline_task.cpp:382: +3, including nesting penalty of 2, nesting level increased to 3

            if (status.is<ErrorCode::END_OF_FILE>()) {
            ^

be/src/pipeline/pipeline_task.cpp:384: +1, nesting level increased to 3

            } else if (!status) {
                   ^

be/src/pipeline/pipeline_task.cpp:388: +3, including nesting penalty of 2, nesting level increased to 3

            if (*eos) { // just return, the scheduler will do finish work
            ^

be/src/pipeline/pipeline_task.cpp:396: +1, including nesting penalty of 0, nesting level increased to 1

    RETURN_IF_ERROR(get_task_queue()->push_back(this));
    ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/pipeline/pipeline_task.cpp:396: +2, including nesting penalty of 1, nesting level increased to 2

    RETURN_IF_ERROR(get_task_queue()->push_back(this));
    ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_TIMER(_exec_timer);
SCOPED_ATTACH_TASK(_state);
_eos = _sink->is_finished(_state) || _eos || _wake_up_by_downstream;
*eos = _eos;
if (_eos) {
// If task is waken up by finish dependency, `_eos` is set to true by last execution, and we should return here.
*eos = true;
return Status::OK();
}

SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_TIMER(_exec_timer);
SCOPED_ATTACH_TASK(_state);

int64_t time_spent = 0;
DBUG_EXECUTE_IF("fault_inject::PipelineXTask::execute", {
Status status = Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task execute failed");
Expand All @@ -320,27 +309,31 @@ Status PipelineTask::execute(bool* eos) {
if (_wait_to_start()) {
return Status::OK();
}
if (_wake_up_by_downstream) {
_eos = true;
*eos = true;
return Status::OK();
}

// The status must be runnable
if (!_opened && !_fragment_context->is_canceled()) {
if (_wake_up_early) {
BiteTheDDDDt marked this conversation as resolved.
Show resolved Hide resolved
*eos = true;
_eos = true;
return Status::OK();
}
RETURN_IF_ERROR(_open());
}

auto set_wake_up_and_dep_ready = [&]() {
if (wake_up_early()) {
BiteTheDDDDt marked this conversation as resolved.
Show resolved Hide resolved
return;
}
set_wake_up_early();
clear_blocking_state();
};

_task_profile->add_info_string("TaskState", "Runnable");
_task_profile->add_info_string("BlockedByDependency", "");
while (!_fragment_context->is_canceled()) {
if (_is_blocked()) {
return Status::OK();
}
if (_wake_up_by_downstream) {
_eos = true;
*eos = true;
return Status::OK();
}

/// When a task is cancelled,
/// its blocking state will be cleared and it will transition to a ready state (though it is not truly ready).
Expand All @@ -361,47 +354,44 @@ Status PipelineTask::execute(bool* eos) {
RETURN_IF_ERROR(_sink->revoke_memory(_state));
continue;
}
*eos = _eos;
DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", {
Status status =
Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task executing failed");
return status;
});
// `_dry_run` means sink operator need no more data
// `_sink->is_finished(_state)` means sink operator should be finished
if (_dry_run || _sink->is_finished(_state)) {
*eos = true;
_eos = true;
} else {
if (_sink->is_finished(_state)) {
set_wake_up_and_dep_ready();
}

// `_dry_run` means sink operator need no more data
*eos = wake_up_early() || _dry_run;
if (!*eos) {
SCOPED_TIMER(_get_block_timer);
_get_block_counter->update(1);
RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos));
}

if (_block->rows() != 0 || *eos) {
SCOPED_TIMER(_sink_timer);
Status status = Status::OK();
// Define a lambda function to catch sink exception, because sink will check
// return error status with EOF, it is special, could not return directly.
auto sink_function = [&]() -> Status {
Status internal_st;
internal_st = _sink->sink(_state, block, *eos);
return internal_st;
};
status = sink_function();
if (!status.is<ErrorCode::END_OF_FILE>()) {
RETURN_IF_ERROR(status);
Status status = _sink->sink(_state, block, *eos);

if (status.is<ErrorCode::END_OF_FILE>()) {
set_wake_up_and_dep_ready();
} else if (!status) {
return status;
}
*eos = status.is<ErrorCode::END_OF_FILE>() ? true : *eos;

if (*eos) { // just return, the scheduler will do finish work
_eos = true;
RETURN_IF_ERROR(close(status, false));
BiteTheDDDDt marked this conversation as resolved.
Show resolved Hide resolved
_task_profile->add_info_string("TaskState", "Finished");
_eos = true;
return Status::OK();
}
}
}

static_cast<void>(get_task_queue()->push_back(this));
RETURN_IF_ERROR(get_task_queue()->push_back(this));
return Status::OK();
}

Expand Down Expand Up @@ -470,7 +460,7 @@ void PipelineTask::finalize() {
_le_state_map.clear();
}

Status PipelineTask::close(Status exec_status) {
Status PipelineTask::close(Status exec_status, bool close_sink) {
int64_t close_ns = 0;
Defer defer {[&]() {
if (_task_queue) {
Expand All @@ -480,7 +470,10 @@ Status PipelineTask::close(Status exec_status) {
Status s;
{
SCOPED_RAW_TIMER(&close_ns);
s = _sink->close(_state, exec_status);
if (close_sink) {
_task_profile->add_info_string("WakeUpEarly", wake_up_early() ? "true" : "false");
s = _sink->close(_state, exec_status);
}
for (auto& op : _operators) {
auto tem = op->close(_state);
BiteTheDDDDt marked this conversation as resolved.
Show resolved Hide resolved
if (!tem.ok() && s.ok()) {
Expand Down Expand Up @@ -508,10 +501,10 @@ std::string PipelineTask::debug_string() {
auto elapsed = _fragment_context->elapsed_time() / 1000000000.0;
fmt::format_to(debug_string_buffer,
"PipelineTask[this = {}, id = {}, open = {}, eos = {}, finish = {}, dry run = "
"{}, elapse time = {}s, _wake_up_by_downstream = {}], block dependency = {}, is "
"{}, elapse time = {}s, _wake_up_early = {}], block dependency = {}, is "
"running = {}\noperators: ",
(void*)this, _index, _opened, _eos, _finalized, _dry_run, elapsed,
_wake_up_by_downstream.load(),
_wake_up_early.load(),
cur_blocked_dep && !_finalized ? cur_blocked_dep->debug_string() : "NULL",
is_running());
for (size_t i = 0; i < _operators.size(); i++) {
Expand Down
8 changes: 4 additions & 4 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class PipelineTask {

// if the pipeline create a bunch of pipeline task
// must be call after all pipeline task is finish to release resource
Status close(Status exec_status);
Status close(Status exec_status, bool close_sink = true);

PipelineFragmentContext* fragment_context() { return _fragment_context; }

Expand Down Expand Up @@ -135,7 +135,7 @@ class PipelineTask {
int task_id() const { return _index; };
bool is_finalized() const { return _finalized; }

void set_wake_up_by_downstream() { _wake_up_by_downstream = true; }
void set_wake_up_early() { _wake_up_early = true; }

void clear_blocking_state() {
_state->get_query_ctx()->get_execution_dependency()->set_always_ready();
Expand Down Expand Up @@ -237,7 +237,7 @@ class PipelineTask {

PipelineId pipeline_id() const { return _pipeline->id(); }

bool wake_up_by_downstream() const { return _wake_up_by_downstream; }
bool wake_up_early() const { return _wake_up_early; }

private:
friend class RuntimeFilterDependency;
Expand Down Expand Up @@ -319,7 +319,7 @@ class PipelineTask {

std::atomic<bool> _running = false;
std::atomic<bool> _eos = false;
std::atomic<bool> _wake_up_by_downstream = false;
std::atomic<bool> _wake_up_early = false;
};

} // namespace doris::pipeline
Loading