Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug](pipeline) make sink operator process eos signals after wake_up_early #45207 #45400

Merged
merged 5 commits into from
Dec 17, 2024

Conversation

BiteTheDDDDt
Copy link
Contributor

@BiteTheDDDDt BiteTheDDDDt commented Dec 13, 2024

pick from #45207

xinyiZzz and others added 2 commits December 13, 2024 14:41
…5210)

fix:
```
*** Aborted at 1733734087 (unix time) try "date -d @1733734087" if you are using GNU date ***
*** Current BE git commitID: 43f06a5 ***
*** SIGSEGV address not mapped to object (@0x0) received by PID 1671420 (TID 1671420 OR 0x7f4f35f74ac0) from PID 0; stack trace: ***
 0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, siginfo_t*, void*) at /home/zcp/repo_center/doris_release/doris/be/src/common/signal_handler.h:421
 1# PosixSignals::chained_handler(int, siginfo*, void*) [clone .part.0] in /usr/local/jdk-17.0.2/lib/server/libjvm.so
 2# JVM_handle_linux_signal in /usr/local/jdk-17.0.2/lib/server/libjvm.so
 3# 0x00007F4F34340400 in /lib64/libc.so.6
 4# je_arena_dalloc_promoted at ../src/arena.c:1277
 5# je_free_default at ../src/jemalloc.c:3014
 6# __pthread_create_2_1 in /lib64/libpthread.so.0
 7# os::create_thread(Thread*, os::ThreadType, unsigned long) in /usr/local/jdk-17.0.2/lib/server/libjvm.so
 8# JVM_StartThread in /usr/local/jdk-17.0.2/lib/server/libjvm.so
 9# 0x00007F4F0A96C918
```
@hello-stephen
Copy link
Contributor

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@BiteTheDDDDt
Copy link
Contributor Author

run buildall

Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clang-tidy made some suggestions

@@ -226,6 +226,10 @@ Status PipelineXTask::_open() {
}

Status PipelineXTask::execute(bool* eos) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'execute' has cognitive complexity of 76 (threshold 50) [readability-function-cognitive-complexity]

Status PipelineXTask::execute(bool* eos) {
                      ^
Additional context

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:228: +1, including nesting penalty of 0, nesting level increased to 1

    if (_eos) {
    ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:236: +1, including nesting penalty of 0, nesting level increased to 1

    DBUG_EXECUTE_IF("fault_inject::PipelineXTask::execute", {
    ^

be/src/util/debug_points.h:36: expanded from macro 'DBUG_EXECUTE_IF'

    if (UNLIKELY(config::enable_debug_points)) {                              \
    ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:236: +2, including nesting penalty of 1, nesting level increased to 2

    DBUG_EXECUTE_IF("fault_inject::PipelineXTask::execute", {
    ^

be/src/util/debug_points.h:38: expanded from macro 'DBUG_EXECUTE_IF'

        if (dp) {                                                             \
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:254: +1, including nesting penalty of 0, nesting level increased to 1

    if (has_dependency()) {
    ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:258: +1, including nesting penalty of 0, nesting level increased to 1

    if (_runtime_filter_blocked_dependency() != nullptr) {
    ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:264: +1, including nesting penalty of 0, nesting level increased to 1

    if (!_opened) {
    ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:265: +2, including nesting penalty of 1, nesting level increased to 2

        if (_wake_up_early) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:272: +2, including nesting penalty of 1, nesting level increased to 2

            RETURN_IF_ERROR(_open());
            ^

be/src/common/status.h:622: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:272: +3, including nesting penalty of 2, nesting level increased to 3

            RETURN_IF_ERROR(_open());
            ^

be/src/common/status.h:624: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:274: +2, including nesting penalty of 1, nesting level increased to 2

        if (!source_can_read()) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:278: +2, including nesting penalty of 1, nesting level increased to 2

        if (!sink_can_write()) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:284: nesting level increased to 1

    auto set_wake_up_and_dep_ready = [&]() {
                                     ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:285: +2, including nesting penalty of 1, nesting level increased to 2

        if (wake_up_early()) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:294: +1, including nesting penalty of 0, nesting level increased to 1

    while (!_fragment_context->is_canceled()) {
    ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:295: +2, including nesting penalty of 1, nesting level increased to 2

        if (_root->need_data_from_children(_state) && !source_can_read()) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:299: +2, including nesting penalty of 1, nesting level increased to 2

        if (!sink_can_write()) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:307: +2, including nesting penalty of 1, nesting level increased to 2

        if (_fragment_context->is_canceled()) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:311: +2, including nesting penalty of 1, nesting level increased to 2

        if (time_spent > THREAD_TIME_SLICE) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:320: +2, including nesting penalty of 1, nesting level increased to 2

        if (should_revoke_memory(_state, sink_revocable_mem_size)) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:321: +3, including nesting penalty of 2, nesting level increased to 3

            RETURN_IF_ERROR(_sink->revoke_memory(_state));
            ^

be/src/common/status.h:622: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:321: +4, including nesting penalty of 3, nesting level increased to 4

            RETURN_IF_ERROR(_sink->revoke_memory(_state));
            ^

be/src/common/status.h:624: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:324: +2, including nesting penalty of 1, nesting level increased to 2

        DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", {
        ^

be/src/util/debug_points.h:36: expanded from macro 'DBUG_EXECUTE_IF'

    if (UNLIKELY(config::enable_debug_points)) {                              \
    ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:324: +3, including nesting penalty of 2, nesting level increased to 3

        DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", {
        ^

be/src/util/debug_points.h:38: expanded from macro 'DBUG_EXECUTE_IF'

        if (dp) {                                                             \
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:330: +2, including nesting penalty of 1, nesting level increased to 2

        if (_sink->is_finished(_state)) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:334: +1

        *eos = wake_up_early() || _dry_run;
                               ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:337: +2, including nesting penalty of 1, nesting level increased to 2

        if (!*eos) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:341: +3, including nesting penalty of 2, nesting level increased to 3

                RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos));
                ^

be/src/common/status.h:622: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:341: +4, including nesting penalty of 3, nesting level increased to 4

                RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos));
                ^

be/src/common/status.h:624: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:342: +3, including nesting penalty of 2, nesting level increased to 3

            } catch (const Exception& e) {
              ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:348: +2, including nesting penalty of 1, nesting level increased to 2

        if (_block->rows() != 0 || *eos) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:351: +3, including nesting penalty of 2, nesting level increased to 3

            if (status.is<ErrorCode::END_OF_FILE>()) {
            ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:353: +1, nesting level increased to 3

            } else if (!status) {
                   ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:357: +3, including nesting penalty of 2, nesting level increased to 3

            if (*eos) { // just return, the scheduler will do finish work
            ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:358: +4, including nesting penalty of 3, nesting level increased to 4

                RETURN_IF_ERROR(close(status, false));
                ^

be/src/common/status.h:622: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:358: +5, including nesting penalty of 4, nesting level increased to 5

                RETURN_IF_ERROR(close(status, false));
                ^

be/src/common/status.h:624: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

@BiteTheDDDDt
Copy link
Contributor Author

run buildall

@BiteTheDDDDt BiteTheDDDDt changed the title Cp 1213 2 [Bug](pipeline) make sink operator process eos signals after wake_up_early #45207 Dec 17, 2024
@BiteTheDDDDt
Copy link
Contributor Author

run buildall

@BiteTheDDDDt
Copy link
Contributor Author

run buildall

Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clang-tidy made some suggestions

@@ -226,6 +226,10 @@ Status PipelineXTask::_open() {
}

Status PipelineXTask::execute(bool* eos) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'execute' has cognitive complexity of 76 (threshold 50) [readability-function-cognitive-complexity]

Status PipelineXTask::execute(bool* eos) {
                      ^
Additional context

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:228: +1, including nesting penalty of 0, nesting level increased to 1

    if (_eos) {
    ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:236: +1, including nesting penalty of 0, nesting level increased to 1

    DBUG_EXECUTE_IF("fault_inject::PipelineXTask::execute", {
    ^

be/src/util/debug_points.h:36: expanded from macro 'DBUG_EXECUTE_IF'

    if (UNLIKELY(config::enable_debug_points)) {                              \
    ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:236: +2, including nesting penalty of 1, nesting level increased to 2

    DBUG_EXECUTE_IF("fault_inject::PipelineXTask::execute", {
    ^

be/src/util/debug_points.h:38: expanded from macro 'DBUG_EXECUTE_IF'

        if (dp) {                                                             \
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:254: +1, including nesting penalty of 0, nesting level increased to 1

    if (has_dependency()) {
    ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:258: +1, including nesting penalty of 0, nesting level increased to 1

    if (_runtime_filter_blocked_dependency() != nullptr) {
    ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:264: +1, including nesting penalty of 0, nesting level increased to 1

    if (!_opened) {
    ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:265: +2, including nesting penalty of 1, nesting level increased to 2

        if (_wake_up_early) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:272: +2, including nesting penalty of 1, nesting level increased to 2

            RETURN_IF_ERROR(_open());
            ^

be/src/common/status.h:622: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:272: +3, including nesting penalty of 2, nesting level increased to 3

            RETURN_IF_ERROR(_open());
            ^

be/src/common/status.h:624: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:274: +2, including nesting penalty of 1, nesting level increased to 2

        if (!source_can_read()) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:278: +2, including nesting penalty of 1, nesting level increased to 2

        if (!sink_can_write()) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:284: nesting level increased to 1

    auto set_wake_up_and_dep_ready = [&]() {
                                     ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:285: +2, including nesting penalty of 1, nesting level increased to 2

        if (wake_up_early()) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:294: +1, including nesting penalty of 0, nesting level increased to 1

    while (!_fragment_context->is_canceled()) {
    ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:295: +2, including nesting penalty of 1, nesting level increased to 2

        if (_root->need_data_from_children(_state) && !source_can_read()) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:299: +2, including nesting penalty of 1, nesting level increased to 2

        if (!sink_can_write()) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:307: +2, including nesting penalty of 1, nesting level increased to 2

        if (_fragment_context->is_canceled()) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:311: +2, including nesting penalty of 1, nesting level increased to 2

        if (time_spent > THREAD_TIME_SLICE) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:320: +2, including nesting penalty of 1, nesting level increased to 2

        if (should_revoke_memory(_state, sink_revocable_mem_size)) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:321: +3, including nesting penalty of 2, nesting level increased to 3

            RETURN_IF_ERROR(_sink->revoke_memory(_state));
            ^

be/src/common/status.h:622: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:321: +4, including nesting penalty of 3, nesting level increased to 4

            RETURN_IF_ERROR(_sink->revoke_memory(_state));
            ^

be/src/common/status.h:624: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:324: +2, including nesting penalty of 1, nesting level increased to 2

        DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", {
        ^

be/src/util/debug_points.h:36: expanded from macro 'DBUG_EXECUTE_IF'

    if (UNLIKELY(config::enable_debug_points)) {                              \
    ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:324: +3, including nesting penalty of 2, nesting level increased to 3

        DBUG_EXECUTE_IF("fault_inject::PipelineXTask::executing", {
        ^

be/src/util/debug_points.h:38: expanded from macro 'DBUG_EXECUTE_IF'

        if (dp) {                                                             \
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:330: +2, including nesting penalty of 1, nesting level increased to 2

        if (_sink->is_finished(_state)) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:334: +1

        *eos = wake_up_early() || _dry_run;
                               ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:337: +2, including nesting penalty of 1, nesting level increased to 2

        if (!*eos) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:341: +3, including nesting penalty of 2, nesting level increased to 3

                RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos));
                ^

be/src/common/status.h:622: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:341: +4, including nesting penalty of 3, nesting level increased to 4

                RETURN_IF_ERROR(_root->get_block_after_projects(_state, block, eos));
                ^

be/src/common/status.h:624: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:342: +3, including nesting penalty of 2, nesting level increased to 3

            } catch (const Exception& e) {
              ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:348: +2, including nesting penalty of 1, nesting level increased to 2

        if (*eos) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:349: +3, including nesting penalty of 2, nesting level increased to 3

            RETURN_IF_ERROR(close(Status::OK(), false));
            ^

be/src/common/status.h:622: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:349: +4, including nesting penalty of 3, nesting level increased to 4

            RETURN_IF_ERROR(close(Status::OK(), false));
            ^

be/src/common/status.h:624: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:352: +2, including nesting penalty of 1, nesting level increased to 2

        if (_block->rows() != 0 || *eos) {
        ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:355: +3, including nesting penalty of 2, nesting level increased to 3

            if (status.is<ErrorCode::END_OF_FILE>()) {
            ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:357: +1, nesting level increased to 3

            } else if (!status) {
                   ^

be/src/pipeline/pipeline_x/pipeline_x_task.cpp:361: +3, including nesting penalty of 2, nesting level increased to 3

            if (*eos) { // just return, the scheduler will do finish work
            ^

@github-actions github-actions bot added the approved Indicates a PR has been approved by one committer. label Dec 17, 2024
Copy link
Contributor

PR approved by at least one committer and no changes requested.

Copy link
Contributor

PR approved by anyone and no changes requested.

@BiteTheDDDDt BiteTheDDDDt merged commit 7856662 into branch-2.1 Dec 17, 2024
26 of 30 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by one committer. reviewed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants