diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index a89bad5022a1779..6550729218eb9d6 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -252,9 +252,6 @@ bool PipelineTask::_is_blocked() { _blocked_dep = dep->is_blocked_by(this); if (_blocked_dep != nullptr) { _blocked_dep->start_watcher(); - if (_wake_up_early) { - _eos = true; - } return true; } } @@ -274,9 +271,6 @@ bool PipelineTask::_is_blocked() { _blocked_dep = op_dep->is_blocked_by(this); if (_blocked_dep != nullptr) { _blocked_dep->start_watcher(); - if (_wake_up_early) { - _eos = true; - } return true; } } @@ -284,6 +278,11 @@ bool PipelineTask::_is_blocked() { } Status PipelineTask::execute(bool* eos) { + if (_eos) { + *eos = true; + return Status::OK(); + } + SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_TIMER(_exec_timer); SCOPED_ATTACH_TASK(_state); @@ -296,7 +295,6 @@ Status PipelineTask::execute(bool* eos) { ThreadCpuStopWatch cpu_time_stop_watch; cpu_time_stop_watch.start(); Defer defer {[&]() { - _eos = _eos | *eos; if (_task_queue) { _task_queue->update_statistics(this, time_spent); } @@ -382,7 +380,7 @@ Status PipelineTask::execute(bool* eos) { if (*eos) { // just return, the scheduler will do finish work RETURN_IF_ERROR(close(status, false)); _task_profile->add_info_string("TaskState", "Finished"); - + _eos = true; return Status::OK(); } } @@ -469,11 +467,12 @@ Status PipelineTask::close(Status exec_status, bool close_sink) { SCOPED_RAW_TIMER(&close_ns); if (close_sink) { s = _sink->close(_state, exec_status); - } - for (auto& op : _operators) { - auto tem = op->close(_state); - if (!tem.ok() && s.ok()) { - s = tem; + } else { + for (auto& op : _operators) { + auto tem = op->close(_state); + if (!tem.ok() && s.ok()) { + s = tem; + } } } }