Skip to content

Commit

Permalink
[Bug](pipeline) make sink operator process eos signals after wake_up_…
Browse files Browse the repository at this point in the history
…early (#45207)

1. make sink operator process eos signals after wake_up_early 
2. set wake_up_early when `pipeline task meet
wake_up_by_downstream`/`sink reach limit`/`sink get eof status`
3. close non-sink operators after sink meet eos
  • Loading branch information
BiteTheDDDDt authored Dec 17, 2024
1 parent d670ec2 commit a5bf8a1
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 97 deletions.
6 changes: 3 additions & 3 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -498,12 +498,12 @@ class RuntimePredicateWrapper {
switch (_filter_type) {
case RuntimeFilterType::IN_FILTER: {
if (!_context->hybrid_set) {
_context->ignored = true;
set_ignored();
return Status::OK();
}
_context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
if (_max_in_num >= 0 && _context->hybrid_set->size() >= _max_in_num) {
_context->ignored = true;
set_ignored();
// release in filter
_context->hybrid_set.reset();
}
Expand Down Expand Up @@ -1337,7 +1337,7 @@ void IRuntimeFilter::set_synced_size(uint64_t global_size) {
}

void IRuntimeFilter::set_ignored() {
_wrapper->_context->ignored = true;
_wrapper->set_ignored();
}

bool IRuntimeFilter::get_ignored() {
Expand Down
50 changes: 21 additions & 29 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,26 +135,16 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
}
}};

if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled()) {
if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() || !_eos) {
return Base::close(state, exec_status);
}

try {
if (state->get_task()->wake_up_by_downstream()) {
if (_should_build_hash_table) {
// partitial ignore rf to make global rf work
RETURN_IF_ERROR(
_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
} else {
// do not publish filter coz local rf not inited and useless
return Base::close(state, exec_status);
}
if (state->get_task()->wake_up_early()) {
// partitial ignore rf to make global rf work or ignore useless rf
RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
} else if (_should_build_hash_table) {
if (p._shared_hashtable_controller &&
!p._shared_hash_table_context->complete_build_stage) {
return Status::InternalError("close before sink meet eos");
}
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
{
Expand All @@ -166,26 +156,25 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
}
} else if ((p._shared_hashtable_controller && !p._shared_hash_table_context->signaled) ||
(p._shared_hash_table_context &&
!p._shared_hash_table_context->complete_build_stage)) {
throw Exception(ErrorCode::INTERNAL_ERROR, "build_sink::close meet error state");
} else {
RETURN_IF_ERROR(
_runtime_filter_slots->copy_from_shared_context(p._shared_hash_table_context));
}

SCOPED_TIMER(_publish_runtime_filter_timer);
RETURN_IF_ERROR(_runtime_filter_slots->publish(state, !_should_build_hash_table));
} catch (Exception& e) {
bool blocked_by_complete_build_stage = p._shared_hashtable_controller &&
!p._shared_hash_table_context->complete_build_stage;
bool blocked_by_shared_hash_table_signal = !_should_build_hash_table &&
p._shared_hashtable_controller &&
!p._shared_hash_table_context->signaled;

return Status::InternalError(
"rf process meet error: {}, wake_up_by_downstream: {}, should_build_hash_table: "
"{}, _finish_dependency: {}, complete_build_stage: {}, shared_hash_table_signaled: "
"rf process meet error: {}, wake_up_early: {}, should_build_hash_table: "
"{}, _finish_dependency: {}, blocked_by_complete_build_stage: {}, "
"blocked_by_shared_hash_table_signal: "
"{}",
e.to_string(), state->get_task()->wake_up_by_downstream(), _should_build_hash_table,
_finish_dependency->debug_string(),
p._shared_hash_table_context && !p._shared_hash_table_context->complete_build_stage,
p._shared_hashtable_controller && !p._shared_hash_table_context->signaled);
e.to_string(), state->get_task()->wake_up_early(), _should_build_hash_table,
_finish_dependency->debug_string(), blocked_by_complete_build_stage,
blocked_by_shared_hash_table_signal);
}
return Base::close(state, exec_status);
}
Expand Down Expand Up @@ -479,7 +468,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());

local_state._eos = eos;
if (local_state._should_build_hash_table) {
// If eos or have already met a null value using short-circuit strategy, we do not need to pull
// data from probe side.
Expand Down Expand Up @@ -556,6 +544,9 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
return _shared_hash_table_context->status;
}

RETURN_IF_ERROR(local_state._runtime_filter_slots->copy_from_shared_context(
_shared_hash_table_context));

local_state.profile()->add_info_string(
"SharedHashTableFrom",
print_id(
Expand All @@ -581,6 +572,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
}

if (eos) {
local_state._eos = true;
local_state.init_short_circuit_for_probe();
// Since the comparison of null values is meaningless, null aware left anti/semi join should not output null
// when the build side is not empty.
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
// If we use a short-circuit strategy, should return block directly by add additional null data.
auto block_rows = local_state._probe_block.rows();
if (local_state._probe_eos && block_rows == 0) {
*eos = local_state._probe_eos;
*eos = true;
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ void Pipeline::make_all_runnable() {
if (_sink->count_down_destination()) {
for (auto* task : _tasks) {
if (task) {
task->set_wake_up_by_downstream();
task->set_wake_up_early();
}
}
for (auto* task : _tasks) {
Expand Down
116 changes: 57 additions & 59 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <glog/logging.h>
#include <stddef.h>

#include <algorithm>
#include <ostream>
#include <vector>

Expand Down Expand Up @@ -223,19 +224,13 @@ bool PipelineTask::_wait_to_start() {
_blocked_dep = _execution_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
static_cast<Dependency*>(_blocked_dep)->start_watcher();
if (_wake_up_by_downstream) {
_eos = true;
}
return true;
}

for (auto* op_dep : _filter_dependencies) {
_blocked_dep = op_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
if (_wake_up_by_downstream) {
_eos = true;
}
return true;
}
}
Expand All @@ -257,9 +252,6 @@ bool PipelineTask::_is_blocked() {
_blocked_dep = dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
if (_wake_up_by_downstream) {
_eos = true;
}
return true;
}
}
Expand All @@ -279,25 +271,22 @@ bool PipelineTask::_is_blocked() {
_blocked_dep = op_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
if (_wake_up_by_downstream) {
_eos = true;
}
return true;
}
}
return false;
}

Status PipelineTask::execute(bool* eos) {
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_TIMER(_exec_timer);
SCOPED_ATTACH_TASK(_state);
_eos = _sink->is_finished(_state) || _eos || _wake_up_by_downstream;
*eos = _eos;
if (_eos) {
// If task is waken up by finish dependency, `_eos` is set to true by last execution, and we should return here.
*eos = true;
return Status::OK();
}

SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_TIMER(_exec_timer);
SCOPED_ATTACH_TASK(_state);

int64_t time_spent = 0;
DBUG_EXECUTE_IF("fault_inject::PipelineXTask::execute", {
Status status = Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task execute failed");
Expand All @@ -320,27 +309,31 @@ Status PipelineTask::execute(bool* eos) {
if (_wait_to_start()) {
return Status::OK();
}
if (_wake_up_by_downstream) {
_eos = true;
*eos = true;
return Status::OK();
}

// The status must be runnable
if (!_opened && !_fragment_context->is_canceled()) {
if (_wake_up_early) {
*eos = true;
_eos = true;
return Status::OK();
}
RETURN_IF_ERROR(_open());
}

auto set_wake_up_and_dep_ready = [&]() {
if (wake_up_early()) {
return;
}
set_wake_up_early();
clear_blocking_state();
};

_task_profile->add_info_string("TaskState", "Runnable");
_task_profile->add_info_string("BlockedByDependency", "");
while (!_fragment_context->is_canceled()) {
if (_is_blocked()) {
return Status::OK();
}
if (_wake_up_by_downstream) {
_eos = true;
*eos = true;
return Status::OK();
}

/// When a task is cancelled,
/// its blocking state will be cleared and it will transition to a ready state (though it is not truly ready).
Expand All @@ -361,47 +354,47 @@ Status PipelineTask::execute(bool* eos) {
RETURN_IF_ERROR(_sink->revoke_memory(_state));
continue;
}
*eos = _eos;
DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", {
Status status =
Status::Error<INTERNAL_ERROR>("fault_inject pipeline_task executing failed");
return status;
});
// `_dry_run` means sink operator need no more data
// `_sink->is_finished(_state)` means sink operator should be finished
if (_dry_run || _sink->is_finished(_state)) {
*eos = true;
_eos = true;
} else {
if (_sink->is_finished(_state)) {
set_wake_up_and_dep_ready();
}

// `_dry_run` means sink operator need no more data
*eos = wake_up_early() || _dry_run;
if (!*eos) {
SCOPED_TIMER(_get_block_timer);
_get_block_counter->update(1);
RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos));
}

if (*eos) {
RETURN_IF_ERROR(close(Status::OK(), false));
}

if (_block->rows() != 0 || *eos) {
SCOPED_TIMER(_sink_timer);
Status status = Status::OK();
// Define a lambda function to catch sink exception, because sink will check
// return error status with EOF, it is special, could not return directly.
auto sink_function = [&]() -> Status {
Status internal_st;
internal_st = _sink->sink(_state, block, *eos);
return internal_st;
};
status = sink_function();
if (!status.is<ErrorCode::END_OF_FILE>()) {
RETURN_IF_ERROR(status);
Status status = _sink->sink(_state, block, *eos);

if (status.is<ErrorCode::END_OF_FILE>()) {
set_wake_up_and_dep_ready();
} else if (!status) {
return status;
}
*eos = status.is<ErrorCode::END_OF_FILE>() ? true : *eos;

if (*eos) { // just return, the scheduler will do finish work
_eos = true;
_task_profile->add_info_string("TaskState", "Finished");
_eos = true;
return Status::OK();
}
}
}

static_cast<void>(get_task_queue()->push_back(this));
RETURN_IF_ERROR(get_task_queue()->push_back(this));
return Status::OK();
}

Expand Down Expand Up @@ -470,17 +463,14 @@ void PipelineTask::finalize() {
_le_state_map.clear();
}

Status PipelineTask::close(Status exec_status) {
Status PipelineTask::close(Status exec_status, bool close_sink) {
int64_t close_ns = 0;
Defer defer {[&]() {
if (_task_queue) {
_task_queue->update_statistics(this, close_ns);
}
}};
Status s;
{
SCOPED_RAW_TIMER(&close_ns);
s = _sink->close(_state, exec_status);
if (close_sink) {
s = _sink->close(_state, exec_status);
}
for (auto& op : _operators) {
auto tem = op->close(_state);
if (!tem.ok() && s.ok()) {
Expand All @@ -489,10 +479,18 @@ Status PipelineTask::close(Status exec_status) {
}
}
if (_opened) {
_fresh_profile_counter();
COUNTER_SET(_close_timer, close_ns);
COUNTER_UPDATE(_close_timer, close_ns);
COUNTER_UPDATE(_task_profile->total_time_counter(), close_ns);
}

if (close_sink && _opened) {
_task_profile->add_info_string("WakeUpEarly", wake_up_early() ? "true" : "false");
_fresh_profile_counter();
}

if (_task_queue) {
_task_queue->update_statistics(this, close_ns);
}
return s;
}

Expand All @@ -508,10 +506,10 @@ std::string PipelineTask::debug_string() {
auto elapsed = _fragment_context->elapsed_time() / 1000000000.0;
fmt::format_to(debug_string_buffer,
"PipelineTask[this = {}, id = {}, open = {}, eos = {}, finish = {}, dry run = "
"{}, elapse time = {}s, _wake_up_by_downstream = {}], block dependency = {}, is "
"{}, elapse time = {}s, _wake_up_early = {}], block dependency = {}, is "
"running = {}\noperators: ",
(void*)this, _index, _opened, _eos, _finalized, _dry_run, elapsed,
_wake_up_by_downstream.load(),
_wake_up_early.load(),
cur_blocked_dep && !_finalized ? cur_blocked_dep->debug_string() : "NULL",
is_running());
for (size_t i = 0; i < _operators.size(); i++) {
Expand Down
Loading

0 comments on commit a5bf8a1

Please sign in to comment.