-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug](pipeline) make sink operator process eos signals after wake_up_early #45207
Conversation
run buildall |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clang-tidy made some suggestions
run buildall |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clang-tidy made some suggestions
run buildall |
clang-tidy review says "All clean, LGTM! 👍" |
run buildall |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clang-tidy made some suggestions
run buildall |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clang-tidy made some suggestions
run buildall |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clang-tidy made some suggestions
run buildall |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clang-tidy made some suggestions
d51d4ca
to
fe5e99e
Compare
run buildall |
update fix update update fix fix fix fix update
fe5e99e
to
b1f5355
Compare
run buildall |
@@ -186,9 +186,6 @@ void ExchangeSinkLocalState::on_channel_finished(InstanceLoId channel_id) { | |||
_finished_channels.emplace(channel_id); | |||
if (_working_channels_count.fetch_sub(1) == 1) { | |||
set_reach_limit(); | |||
if (_finish_dependency) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why remove this code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set_reach_limit 实际上会使pipeline task触发is_finished的逻辑,这个pr中触发is_finished会直接set_wake_up_early,以统一的方式来set dependency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
如果当前task处于被finish dependency阻塞的状态,on_channel_finished是在rpc的callback中执行。之前可以在这里set ready来唤醒task,这么改完以后是如何唤醒task执行呢
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
不是,是当前task已经处在阻塞状态了。能走到这里的前提是这个task被唤醒起来执行了
@@ -186,9 +186,6 @@ void ExchangeSinkLocalState::on_channel_finished(InstanceLoId channel_id) { | |||
_finished_channels.emplace(channel_id); | |||
if (_working_channels_count.fetch_sub(1) == 1) { | |||
set_reach_limit(); | |||
if (_finish_dependency) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
如果当前task处于被finish dependency阻塞的状态,on_channel_finished是在rpc的callback中执行。之前可以在这里set ready来唤醒task,这么改完以后是如何唤醒task执行呢
run buildall |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clang-tidy made some suggestions
@@ -279,25 +271,22 @@ bool PipelineTask::_is_blocked() { | |||
_blocked_dep = op_dep->is_blocked_by(this); | |||
if (_blocked_dep != nullptr) { | |||
_blocked_dep->start_watcher(); | |||
if (_wake_up_by_downstream) { | |||
_eos = true; | |||
} | |||
return true; | |||
} | |||
} | |||
return false; | |||
} | |||
|
|||
Status PipelineTask::execute(bool* eos) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: function 'execute' has cognitive complexity of 70 (threshold 50) [readability-function-cognitive-complexity]
Status PipelineTask::execute(bool* eos) {
^
Additional context
be/src/pipeline/pipeline_task.cpp:280: +1, including nesting penalty of 0, nesting level increased to 1
if (_eos) {
^
be/src/pipeline/pipeline_task.cpp:290: +1, including nesting penalty of 0, nesting level increased to 1
DBUG_EXECUTE_IF("fault_inject::PipelineXTask::execute", {
^
be/src/util/debug_points.h:36: expanded from macro 'DBUG_EXECUTE_IF'
if (UNLIKELY(config::enable_debug_points)) { \
^
be/src/pipeline/pipeline_task.cpp:290: +2, including nesting penalty of 1, nesting level increased to 2
DBUG_EXECUTE_IF("fault_inject::PipelineXTask::execute", {
^
be/src/util/debug_points.h:38: expanded from macro 'DBUG_EXECUTE_IF'
if (dp) { \
^
be/src/pipeline/pipeline_task.cpp:308: +1, including nesting penalty of 0, nesting level increased to 1
if (_wait_to_start()) {
^
be/src/pipeline/pipeline_task.cpp:313: +1, including nesting penalty of 0, nesting level increased to 1
if (!_opened && !_fragment_context->is_canceled()) {
^
be/src/pipeline/pipeline_task.cpp:313: +1
if (!_opened && !_fragment_context->is_canceled()) {
^
be/src/pipeline/pipeline_task.cpp:314: +2, including nesting penalty of 1, nesting level increased to 2
if (_wake_up_early) {
^
be/src/pipeline/pipeline_task.cpp:319: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(_open());
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/pipeline/pipeline_task.cpp:319: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(_open());
^
be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/pipeline/pipeline_task.cpp:322: nesting level increased to 1
auto set_wake_up_and_dep_ready = [&]() {
^
be/src/pipeline/pipeline_task.cpp:323: +2, including nesting penalty of 1, nesting level increased to 2
if (wake_up_early()) {
^
be/src/pipeline/pipeline_task.cpp:332: +1, including nesting penalty of 0, nesting level increased to 1
while (!_fragment_context->is_canceled()) {
^
be/src/pipeline/pipeline_task.cpp:333: +2, including nesting penalty of 1, nesting level increased to 2
if (_is_blocked()) {
^
be/src/pipeline/pipeline_task.cpp:340: +2, including nesting penalty of 1, nesting level increased to 2
if (_fragment_context->is_canceled()) {
^
be/src/pipeline/pipeline_task.cpp:344: +2, including nesting penalty of 1, nesting level increased to 2
if (time_spent > THREAD_TIME_SLICE) {
^
be/src/pipeline/pipeline_task.cpp:352: +2, including nesting penalty of 1, nesting level increased to 2
if (should_revoke_memory(_state, sink_revocable_mem_size)) {
^
be/src/pipeline/pipeline_task.cpp:353: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(_sink->revoke_memory(_state));
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/pipeline/pipeline_task.cpp:353: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(_sink->revoke_memory(_state));
^
be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/pipeline/pipeline_task.cpp:356: +2, including nesting penalty of 1, nesting level increased to 2
DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", {
^
be/src/util/debug_points.h:36: expanded from macro 'DBUG_EXECUTE_IF'
if (UNLIKELY(config::enable_debug_points)) { \
^
be/src/pipeline/pipeline_task.cpp:356: +3, including nesting penalty of 2, nesting level increased to 3
DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", {
^
be/src/util/debug_points.h:38: expanded from macro 'DBUG_EXECUTE_IF'
if (dp) { \
^
be/src/pipeline/pipeline_task.cpp:362: +2, including nesting penalty of 1, nesting level increased to 2
if (_sink->is_finished(_state)) {
^
be/src/pipeline/pipeline_task.cpp:367: +1
*eos = wake_up_early() || _dry_run;
^
be/src/pipeline/pipeline_task.cpp:368: +2, including nesting penalty of 1, nesting level increased to 2
if (!*eos) {
^
be/src/pipeline/pipeline_task.cpp:371: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos));
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/pipeline/pipeline_task.cpp:371: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos));
^
be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/pipeline/pipeline_task.cpp:374: +2, including nesting penalty of 1, nesting level increased to 2
if (*eos) {
^
be/src/pipeline/pipeline_task.cpp:375: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(close(Status::OK(), false));
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/pipeline/pipeline_task.cpp:375: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(close(Status::OK(), false));
^
be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/pipeline/pipeline_task.cpp:378: +2, including nesting penalty of 1, nesting level increased to 2
if (_block->rows() != 0 || *eos) {
^
be/src/pipeline/pipeline_task.cpp:382: +3, including nesting penalty of 2, nesting level increased to 3
if (status.is<ErrorCode::END_OF_FILE>()) {
^
be/src/pipeline/pipeline_task.cpp:384: +1, nesting level increased to 3
} else if (!status) {
^
be/src/pipeline/pipeline_task.cpp:388: +3, including nesting penalty of 2, nesting level increased to 3
if (*eos) { // just return, the scheduler will do finish work
^
be/src/pipeline/pipeline_task.cpp:396: +1, including nesting penalty of 0, nesting level increased to 1
RETURN_IF_ERROR(get_task_queue()->push_back(this));
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/pipeline/pipeline_task.cpp:396: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(get_task_queue()->push_back(this));
^
be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
PR approved by at least one committer and no changes requested. |
PR approved by anyone and no changes requested. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…early (#45207) 1. make sink operator process eos signals after wake_up_early 2. set wake_up_early when `pipeline task meet wake_up_by_downstream`/`sink reach limit`/`sink get eof status` 3. close non-sink operators after sink meet eos
#45207) (#46374) …early (#45207) 1. make sink operator process eos signals after wake_up_early 2. set wake_up_early when `pipeline task meet wake_up_by_downstream`/`sink reach limit`/`sink get eof status` 3. close non-sink operators after sink meet eos ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [ ] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [ ] No. - [ ] Yes. <!-- Add document PR link here. eg: apache/doris-website#1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into -->
What problem does this PR solve?
pipeline task meet wake_up_by_downstream
/sink reach limit
/sink get eof status
Problem Summary:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)