Skip to content

Commit

Permalink
[Bug](pipeline) make sink operator process eos signals after wake_up…
Browse files Browse the repository at this point in the history
…_early #45207  (#45400)

make sink operator process eos signals after wake_up_early #45207  (#45400)
  • Loading branch information
BiteTheDDDDt authored Dec 17, 2024
1 parent 191ef9b commit 7856662
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 82 deletions.
57 changes: 35 additions & 22 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,34 +137,47 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
}
}};

if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled()) {
if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() || !_eos) {
return Base::close(state, exec_status);
}

if (state->get_task()->wake_up_by_downstream()) {
if (_should_build_hash_table) {
// partitial ignore rf to make global rf work
try {
if (state->get_task()->wake_up_early()) {
// partitial ignore rf to make global rf work or ignore useless rf
RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
} else {
// do not publish filter coz local rf not inited and useless
return Base::close(state, exec_status);
}
} else if (_should_build_hash_table) {
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
{
SCOPED_TIMER(_runtime_filter_init_timer);
RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
}
if (hash_table_size > 1) {
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
} else if (_should_build_hash_table) {
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
{
SCOPED_TIMER(_runtime_filter_init_timer);
RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
}
if (hash_table_size > 1) {
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
}
}

SCOPED_TIMER(_publish_runtime_filter_timer);
RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table));
} catch (Exception& e) {
bool blocked_by_complete_build_stage = p._shared_hashtable_controller &&
!p._shared_hash_table_context->complete_build_stage;
bool blocked_by_shared_hash_table_signal = !_should_build_hash_table &&
p._shared_hashtable_controller &&
!p._shared_hash_table_context->signaled;

return Status::InternalError(
"rf process meet error: {}, wake_up_early: {}, should_build_hash_table: "
"{}, _finish_dependency: {}, blocked_by_complete_build_stage: {}, "
"blocked_by_shared_hash_table_signal: "
"{}",
e.to_string(), state->get_task()->wake_up_early(), _should_build_hash_table,
_finish_dependency->debug_string(), blocked_by_complete_build_stage,
blocked_by_shared_hash_table_signal);
}
SCOPED_TIMER(_publish_runtime_filter_timer);
RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table));
return Base::close(state, exec_status);
}

Expand Down Expand Up @@ -536,7 +549,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());

local_state._eos = eos;
if (local_state._should_build_hash_table) {
// If eos or have already met a null value using short-circuit strategy, we do not need to pull
// data from probe side.
Expand Down Expand Up @@ -654,6 +666,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
}

if (eos) {
local_state._eos = eos;
local_state.init_short_circuit_for_probe();
// Since the comparison of null values is meaningless, null aware left anti/semi join should not output null
// when the build side is not empty.
Expand Down
7 changes: 6 additions & 1 deletion be/src/pipeline/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,12 @@ void Pipeline::make_all_runnable() {
if (_sink_x->count_down_destination()) {
for (auto* task : _tasks) {
if (task) {
task->clear_blocking_state(true);
task->set_wake_up_early();
}
}
for (auto* task : _tasks) {
if (task) {
task->clear_blocking_state();
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ Status PipelineTask::execute(bool* eos) {
return status;
}

Status PipelineTask::close(Status exec_status) {
Status PipelineTask::close(Status exec_status, bool close_sink) {
int64_t close_ns = 0;
Defer defer {[&]() {
if (_task_queue) {
Expand All @@ -335,7 +335,9 @@ Status PipelineTask::close(Status exec_status) {
Status s;
{
SCOPED_RAW_TIMER(&close_ns);
s = _sink->close(_state);
if (close_sink) {
s = _sink->close(_state);
}
for (auto& op : _operators) {
auto tem = op->close(_state);
if (!tem.ok() && s.ok()) {
Expand Down
5 changes: 3 additions & 2 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class PipelineTask {

// if the pipeline create a bunch of pipeline task
// must be call after all pipeline task is finish to release resource
virtual Status close(Status exec_status);
virtual Status close(Status exec_status, bool close_sink = true);

void put_in_runnable_queue() {
_schedule_time++;
Expand Down Expand Up @@ -293,7 +293,8 @@ class PipelineTask {

PipelineId pipeline_id() const { return _pipeline->id(); }

virtual void clear_blocking_state(bool wake_up_by_downstream = false) {}
virtual void clear_blocking_state() {}
virtual void set_wake_up_early() {}

protected:
void _finish_p_dependency() {
Expand Down
107 changes: 57 additions & 50 deletions be/src/pipeline/pipeline_x/pipeline_x_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ Status PipelineXTask::_open() {
}

Status PipelineXTask::execute(bool* eos) {
if (_eos) {
*eos = true;
return Status::OK();
}
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_TIMER(_exec_timer);
SCOPED_ATTACH_TASK(_state);
Expand All @@ -247,28 +251,23 @@ Status PipelineXTask::execute(bool* eos) {
cpu_qs->add_cpu_nanos(delta_cpu_time);
}
}};
*eos = _sink->is_finished(_state) || _wake_up_by_downstream || is_final_state(_cur_state);
if (*eos) {
return Status::OK();
}

if (has_dependency()) {
set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY);
return Status::OK();
}
if (_wake_up_by_downstream) {
*eos = true;
return Status::OK();
}
if (_runtime_filter_blocked_dependency() != nullptr) {
set_state(PipelineTaskState::BLOCKED_FOR_RF);
return Status::OK();
}
if (_wake_up_by_downstream) {
*eos = true;
return Status::OK();
}

// The status must be runnable
if (!_opened) {
if (_wake_up_early) {
*eos = true;
_eos = true;
return Status::OK();
}
{
SCOPED_RAW_TIMER(&time_spent);
RETURN_IF_ERROR(_open());
Expand All @@ -277,39 +276,31 @@ Status PipelineXTask::execute(bool* eos) {
set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
return Status::OK();
}
if (_wake_up_by_downstream) {
*eos = true;
return Status::OK();
}
if (!sink_can_write()) {
set_state(PipelineTaskState::BLOCKED_FOR_SINK);
return Status::OK();
}
if (_wake_up_by_downstream) {
*eos = true;
return Status::OK();
}
}

auto set_wake_up_and_dep_ready = [&]() {
if (wake_up_early()) {
return;
}
set_wake_up_early();
clear_blocking_state();
};

Status status = Status::OK();
set_begin_execute_time();
while (!_fragment_context->is_canceled()) {
if (_root->need_data_from_children(_state) && !source_can_read()) {
set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
break;
}
if (_wake_up_by_downstream) {
*eos = true;
return Status::OK();
}
if (!sink_can_write()) {
set_state(PipelineTaskState::BLOCKED_FOR_SINK);
break;
}
if (_wake_up_by_downstream) {
*eos = true;
return Status::OK();
}

/// When a task is cancelled,
/// its blocking state will be cleared and it will transition to a ready state (though it is not truly ready).
Expand All @@ -336,10 +327,15 @@ Status PipelineXTask::execute(bool* eos) {
Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task executing failed");
return status;
});

if (_sink->is_finished(_state)) {
set_wake_up_and_dep_ready();
}
// `_dry_run` means sink operator need no more data
*eos = wake_up_early() || _dry_run;

// Pull block from operator chain
if (_dry_run || _sink->is_finished(_state)) {
*eos = true;
} else {
if (!*eos) {
SCOPED_TIMER(_get_block_timer);
_get_block_counter->update(1);
try {
Expand All @@ -350,14 +346,21 @@ Status PipelineXTask::execute(bool* eos) {
}
}

if (*eos) {
RETURN_IF_ERROR(close(Status::OK(), false));
}

if (_block->rows() != 0 || *eos) {
SCOPED_TIMER(_sink_timer);
status = _sink->sink(_state, block, *eos);
if (!status.is<ErrorCode::END_OF_FILE>()) {
RETURN_IF_ERROR(status);
if (status.is<ErrorCode::END_OF_FILE>()) {
set_wake_up_and_dep_ready();
} else if (!status) {
return status;
}
*eos = status.is<ErrorCode::END_OF_FILE>() ? true : *eos;

if (*eos) { // just return, the scheduler will do finish work
_eos = true;
break;
}
}
Expand Down Expand Up @@ -425,17 +428,14 @@ void PipelineXTask::finalize() {
_le_state_map.clear();
}

Status PipelineXTask::close(Status exec_status) {
Status PipelineXTask::close(Status exec_status, bool close_sink) {
int64_t close_ns = 0;
Defer defer {[&]() {
if (_task_queue) {
_task_queue->update_statistics(this, close_ns);
}
}};
Status s;
{
SCOPED_RAW_TIMER(&close_ns);
s = _sink->close(_state, exec_status);
if (close_sink) {
s = _sink->close(_state, exec_status);
}
for (auto& op : _operators) {
auto tem = op->close(_state);
if (!tem.ok() && s.ok()) {
Expand All @@ -444,10 +444,17 @@ Status PipelineXTask::close(Status exec_status) {
}
}
if (_opened) {
_fresh_profile_counter();
COUNTER_SET(_close_timer, close_ns);
COUNTER_UPDATE(_close_timer, close_ns);
COUNTER_UPDATE(_task_profile->total_time_counter(), close_ns);
}
if (close_sink && _opened) {
_task_profile->add_info_string("WakeUpEarly", wake_up_early() ? "true" : "false");
_fresh_profile_counter();
}

if (_task_queue) {
_task_queue->update_statistics(this, close_ns);
}
return s;
}

Expand All @@ -468,13 +475,13 @@ std::string PipelineXTask::debug_string() {
// If at the same time FE cancel this pipeline task and logging debug_string before _blocked_dep is cleared,
// it will think _blocked_dep is not nullptr and call _blocked_dep->debug_string().
auto* cur_blocked_dep = _blocked_dep;
fmt::format_to(
debug_string_buffer,
"PipelineTask[this = {}, state = {}, dry run = {}, elapse time "
"= {}s], _wake_up_by_downstream = {}, block dependency = {}, is running = "
"{}\noperators: ",
(void*)this, get_state_name(_cur_state), _dry_run, elapsed, _wake_up_by_downstream,
cur_blocked_dep && !_finished ? cur_blocked_dep->debug_string() : "NULL", is_running());
fmt::format_to(debug_string_buffer,
"PipelineTask[this = {}, state = {}, dry run = {}, elapse time "
"= {}s], _wake_up_early = {}, block dependency = {}, is running = "
"{}\noperators: ",
(void*)this, get_state_name(_cur_state), _dry_run, elapsed, _wake_up_early,
cur_blocked_dep && !_finished ? cur_blocked_dep->debug_string() : "NULL",
is_running());
for (size_t i = 0; i < _operators.size(); i++) {
fmt::format_to(debug_string_buffer, "\n{}",
_opened && !_finished ? _operators[i]->debug_string(_state, i)
Expand Down
12 changes: 7 additions & 5 deletions be/src/pipeline/pipeline_x/pipeline_x_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class PipelineXTask : public PipelineTask {

// if the pipeline create a bunch of pipeline task
// must be call after all pipeline task is finish to release resource
Status close(Status exec_status) override;
Status close(Status exec_status, bool close_sink = true) override;

Status close_sink(Status exec_status);
bool source_can_read() override {
Expand Down Expand Up @@ -138,8 +138,7 @@ class PipelineXTask : public PipelineTask {

int task_id() const { return _index; };

void clear_blocking_state(bool wake_up_by_downstream = false) override {
_wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream;
void clear_blocking_state() override {
_state->get_query_ctx()->get_execution_dependency()->set_always_ready();
// We use a lock to assure all dependencies are not deconstructed here.
std::unique_lock<std::mutex> lc(_dependency_lock);
Expand Down Expand Up @@ -177,7 +176,9 @@ class PipelineXTask : public PipelineTask {

static bool should_revoke_memory(RuntimeState* state, int64_t revocable_mem_bytes);

bool wake_up_by_downstream() const { return _wake_up_by_downstream; }
bool wake_up_early() const { return _wake_up_early; }

void set_wake_up_early() override { _wake_up_early = true; }

private:
friend class RuntimeFilterDependency;
Expand Down Expand Up @@ -255,7 +256,8 @@ class PipelineXTask : public PipelineTask {

std::atomic<bool> _finished {false};
std::mutex _dependency_lock;
std::atomic<bool> _wake_up_by_downstream = false;
std::atomic<bool> _wake_up_early = false;
std::atomic<bool> _eos = false;
};

} // namespace doris::pipeline

0 comments on commit 7856662

Please sign in to comment.