-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix](runtime filter) Fix runtime filter not found #43885
Conversation
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
run buildall |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clang-tidy made some suggestions
@@ -534,6 +548,10 @@ RuntimeFilterParamsContext* RuntimeFilterParamsContext::create(RuntimeState* sta | |||
return params; | |||
} | |||
|
|||
RuntimeFilterMgr* RuntimeFilterParamsContext::global_runtime_filter_mgr() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: method 'global_runtime_filter_mgr' can be made const [readability-make-member-function-const]
be/src/runtime/runtime_filter_mgr.h:289:
- RuntimeFilterMgr* global_runtime_filter_mgr();
+ RuntimeFilterMgr* global_runtime_filter_mgr() const;
RuntimeFilterMgr* RuntimeFilterParamsContext::global_runtime_filter_mgr() { | |
RuntimeFilterMgr* RuntimeFilterParamsContext::global_runtime_filter_mgr() const { |
run buildall |
run buildall |
PR approved by at least one committer and no changes requested. |
PR approved by anyone and no changes requested. |
DCHECK(!filters.empty()); | ||
// push down | ||
auto send_to_local = [&](std::shared_ptr<RuntimePredicateWrapper> wrapper, bool global) { | ||
std::vector<std::shared_ptr<IRuntimeFilter>> filters = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not add new api get_consume_filters
? why not find not return error
be/src/exprs/runtime_filter.cpp
Outdated
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id, filters)); | ||
DCHECK(!filters.empty()); | ||
// push down | ||
auto send_to_local = [&](std::shared_ptr<RuntimePredicateWrapper> wrapper, bool global) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so here need use global ? why still name send_to_local
?
_filter_id, &local_merge_filters)); | ||
std::lock_guard l(*local_merge_filters->lock); | ||
local_merge_filters->merge_size_times--; | ||
local_merge_filters->local_merged_size += local_filter_size; | ||
if (_has_local_target) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the code is wired, why here use global rf mgr get filter? has_local_target?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clang-tidy made some suggestions
be/src/exprs/runtime_filter.cpp
Outdated
@@ -993,50 +993,54 @@ void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, size_t sta | |||
Status IRuntimeFilter::publish(bool publish_local) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: function 'publish' has cognitive complexity of 82 (threshold 50) [readability-function-cognitive-complexity]
Status IRuntimeFilter::publish(bool publish_local) {
^
Additional context
be/src/exprs/runtime_filter.cpp:995: nesting level increased to 1
auto send_to_remote_targets = [&](IRuntimeFilter* filter) {
^
be/src/exprs/runtime_filter.cpp:998: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_merge_addr(&addr));
^
be/src/common/status.h:630: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:998: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_merge_addr(&addr));
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1001: nesting level increased to 1
auto send_to_local_targets = [&](std::shared_ptr<RuntimePredicateWrapper> wrapper,
^
be/src/exprs/runtime_filter.cpp:1013: nesting level increased to 1
auto do_merge = [&]() {
^
be/src/exprs/runtime_filter.cpp:1014: +2, including nesting penalty of 1, nesting level increased to 2
if (!_state->global_runtime_filter_mgr()->get_consume_filters(_filter_id).empty()) {
^
be/src/exprs/runtime_filter.cpp:1016: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_local_merge_producer_filters(
^
be/src/common/status.h:630: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1016: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_local_merge_producer_filters(
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1019: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper.get()));
^
be/src/common/status.h:630: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1019: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper.get()));
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1021: +3, including nesting penalty of 2, nesting level increased to 3
if (local_merge_filters->merge_time == 0) {
^
be/src/exprs/runtime_filter.cpp:1022: +4, including nesting penalty of 3, nesting level increased to 4
if (_has_local_target) {
^
be/src/exprs/runtime_filter.cpp:1023: +5, including nesting penalty of 4, nesting level increased to 5
RETURN_IF_ERROR(
^
be/src/common/status.h:630: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1023: +6, including nesting penalty of 5, nesting level increased to 6
RETURN_IF_ERROR(
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1025: +1, nesting level increased to 4
} else {
^
be/src/exprs/runtime_filter.cpp:1026: +5, including nesting penalty of 4, nesting level increased to 5
RETURN_IF_ERROR(send_to_remote_targets(local_merge_filters->filters[0].get()));
^
be/src/common/status.h:630: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1026: +6, including nesting penalty of 5, nesting level increased to 6
RETURN_IF_ERROR(send_to_remote_targets(local_merge_filters->filters[0].get()));
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1033: +1, including nesting penalty of 0, nesting level increased to 1
if (_has_local_target) {
^
be/src/exprs/runtime_filter.cpp:1036: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(do_merge());
^
be/src/common/status.h:630: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1036: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(do_merge());
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1037: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(send_to_local_targets(_wrapper, false));
^
be/src/common/status.h:630: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1037: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(send_to_local_targets(_wrapper, false));
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1038: +1, nesting level increased to 1
} else if (!publish_local) {
^
be/src/exprs/runtime_filter.cpp:1039: +2, including nesting penalty of 1, nesting level increased to 2
if (_is_broadcast_join || _state->get_query_ctx()->be_exec_version() < USE_NEW_SERDE) {
^
be/src/exprs/runtime_filter.cpp:1039: +1
if (_is_broadcast_join || _state->get_query_ctx()->be_exec_version() < USE_NEW_SERDE) {
^
be/src/exprs/runtime_filter.cpp:1040: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(send_to_remote_targets(this));
^
be/src/common/status.h:630: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1040: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(send_to_remote_targets(this));
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1041: +1, nesting level increased to 2
} else {
^
be/src/exprs/runtime_filter.cpp:1042: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(do_merge());
^
be/src/common/status.h:630: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1042: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(do_merge());
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1044: +1, nesting level increased to 1
} else {
^
run buildall |
TeamCity be ut coverage result: |
run buildall |
1 similar comment
run buildall |
TeamCity be ut coverage result: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
PR approved by at least one committer and no changes requested. |
1d5aa5c
run buildall |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clang-tidy made some suggestions
@@ -993,50 +993,54 @@ void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, size_t sta | |||
Status IRuntimeFilter::publish(RuntimeState* state, bool publish_local) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: function 'publish' has cognitive complexity of 82 (threshold 50) [readability-function-cognitive-complexity]
Status IRuntimeFilter::publish(RuntimeState* state, bool publish_local) {
^
Additional context
be/src/exprs/runtime_filter.cpp:995: nesting level increased to 1
auto send_to_remote_targets = [&](IRuntimeFilter* filter) {
^
be/src/exprs/runtime_filter.cpp:998: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_merge_addr(&addr));
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:998: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_merge_addr(&addr));
^
be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1001: nesting level increased to 1
auto send_to_local_targets = [&](std::shared_ptr<RuntimePredicateWrapper> wrapper,
^
be/src/exprs/runtime_filter.cpp:1013: nesting level increased to 1
auto do_merge = [&]() {
^
be/src/exprs/runtime_filter.cpp:1014: +2, including nesting penalty of 1, nesting level increased to 2
if (!_state->global_runtime_filter_mgr()->get_consume_filters(_filter_id).empty()) {
^
be/src/exprs/runtime_filter.cpp:1016: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_local_merge_producer_filters(
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1016: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(_state->global_runtime_filter_mgr()->get_local_merge_producer_filters(
^
be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1019: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper.get()));
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1019: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper.get()));
^
be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1021: +3, including nesting penalty of 2, nesting level increased to 3
if (local_merge_filters->merge_time == 0) {
^
be/src/exprs/runtime_filter.cpp:1022: +4, including nesting penalty of 3, nesting level increased to 4
if (_has_local_target) {
^
be/src/exprs/runtime_filter.cpp:1023: +5, including nesting penalty of 4, nesting level increased to 5
RETURN_IF_ERROR(
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1023: +6, including nesting penalty of 5, nesting level increased to 6
RETURN_IF_ERROR(
^
be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1025: +1, nesting level increased to 4
} else {
^
be/src/exprs/runtime_filter.cpp:1026: +5, including nesting penalty of 4, nesting level increased to 5
RETURN_IF_ERROR(send_to_remote_targets(local_merge_filters->filters[0].get()));
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1026: +6, including nesting penalty of 5, nesting level increased to 6
RETURN_IF_ERROR(send_to_remote_targets(local_merge_filters->filters[0].get()));
^
be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1033: +1, including nesting penalty of 0, nesting level increased to 1
if (_has_local_target) {
^
be/src/exprs/runtime_filter.cpp:1036: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(do_merge());
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1036: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(do_merge());
^
be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1037: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(send_to_local_targets(_wrapper, false));
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1037: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(send_to_local_targets(_wrapper, false));
^
be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1038: +1, nesting level increased to 1
} else if (!publish_local) {
^
be/src/exprs/runtime_filter.cpp:1039: +2, including nesting penalty of 1, nesting level increased to 2
if (_is_broadcast_join || _state->get_query_ctx()->be_exec_version() < USE_NEW_SERDE) {
^
be/src/exprs/runtime_filter.cpp:1039: +1
if (_is_broadcast_join || _state->get_query_ctx()->be_exec_version() < USE_NEW_SERDE) {
^
be/src/exprs/runtime_filter.cpp:1040: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(send_to_remote_targets(this));
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1040: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(send_to_remote_targets(this));
^
be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1041: +1, nesting level increased to 2
} else {
^
be/src/exprs/runtime_filter.cpp:1042: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(do_merge());
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1042: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(do_merge());
^
be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1044: +1, nesting level increased to 1
} else {
^
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
PR approved by at least one committer and no changes requested. |
TeamCity be ut coverage result: |
### What problem does this PR solve? Filter size should be set by a global size instead of a local size. This behaviour is introduced by #43885 .
Filter size should be set by a global size instead of a local size. This behaviour is introduced by apache#43885 .
What problem does this PR solve?
Problem Summary:
A runtime filter may have multiple targets and some of those are local-merge RF and others are not.
So for all runtime filters' producers,
publish
should notify all consumers in global RF mgr which manages local-merge RF and local RF mgr which manages others.Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)