Skip to content

Commit

Permalink
[fix](runtime filter) Fix wrong synced size
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 committed Nov 28, 2024
1 parent eb506a1 commit 83ddbe9
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 5 deletions.
8 changes: 3 additions & 5 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1111,9 +1111,6 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt
std::lock_guard l(*local_merge_filters->lock);
local_merge_filters->merge_size_times--;
local_merge_filters->local_merged_size += local_filter_size;
if (_has_local_target) {
set_synced_size(local_filter_size);
}
if (local_merge_filters->merge_size_times) {
return Status::OK();
} else {
Expand Down Expand Up @@ -1530,9 +1527,10 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile(uint64_t local_merge_
std::string IRuntimeFilter::debug_string() const {
return fmt::format(
"RuntimeFilter: (id = {}, type = {}, is_broadcast: {}, "
"build_bf_cardinality: {}, error_msg: {}",
"build_bf_cardinality: {}, ignored: {}, error_msg: {}",
_filter_id, to_string(_runtime_filter_type), _is_broadcast_join,
_wrapper->get_build_bf_cardinality(), _wrapper->_context->err_msg);
_wrapper->get_build_bf_cardinality(), _wrapper->is_ignored(),
_wrapper->_context->err_msg);
}

Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
Expand Down
8 changes: 8 additions & 0 deletions be/src/exprs/runtime_filter_slots.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class VRuntimeFilterSlots {
// process ignore duplicate IN_FILTER
std::unordered_set<int> has_in_filter;
for (auto filter : _runtime_filters) {
if (filter->filter_id() == 20) {
LOG(WARNING) << "=======1 " << filter->debug_string();
}
if (filter->get_ignored()) {
continue;
}
Expand Down Expand Up @@ -98,6 +101,9 @@ class VRuntimeFilterSlots {
continue;
}
filter->set_ignored();
if (filter->filter_id() == 20) {
LOG(WARNING) << "=======2 " << filter->debug_string();
}
}
return Status::OK();
}
Expand All @@ -112,6 +118,8 @@ class VRuntimeFilterSlots {
Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {
// process IN_OR_BLOOM_FILTER's real type
for (auto filter : _runtime_filters) {
LOG(WARNING) << "=======3 " << filter->debug_string() << " " << local_hash_table_size
<< " " << filter->get_synced_size();
if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
get_real_size(filter.get(), local_hash_table_size) >
state->runtime_filter_max_in_num()) {
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/runtime_filter_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ struct LocalMergeFilters {
uint64_t local_merged_size = 0;
std::vector<std::shared_ptr<IRuntimeFilter>> filters;
MonotonicStopWatch merge_watcher;
bool contains_ignored_rf = false;
};

/// producer:
Expand Down

0 comments on commit 83ddbe9

Please sign in to comment.