From 83ddbe9b1a5a573f0e3e1c86877061ee12564faa Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 28 Nov 2024 11:30:47 +0800 Subject: [PATCH] [fix](runtime filter) Fix wrong synced size --- be/src/exprs/runtime_filter.cpp | 8 +++----- be/src/exprs/runtime_filter_slots.h | 8 ++++++++ be/src/runtime/runtime_filter_mgr.h | 1 + 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 8d4c9d8693237c7..7075392fcc065d6 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1111,9 +1111,6 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt std::lock_guard l(*local_merge_filters->lock); local_merge_filters->merge_size_times--; local_merge_filters->local_merged_size += local_filter_size; - if (_has_local_target) { - set_synced_size(local_filter_size); - } if (local_merge_filters->merge_size_times) { return Status::OK(); } else { @@ -1530,9 +1527,10 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile(uint64_t local_merge_ std::string IRuntimeFilter::debug_string() const { return fmt::format( "RuntimeFilter: (id = {}, type = {}, is_broadcast: {}, " - "build_bf_cardinality: {}, error_msg: {}", + "build_bf_cardinality: {}, ignored: {}, error_msg: {}", _filter_id, to_string(_runtime_filter_type), _is_broadcast_join, - _wrapper->get_build_bf_cardinality(), _wrapper->_context->err_msg); + _wrapper->get_build_bf_cardinality(), _wrapper->is_ignored(), + _wrapper->_context->err_msg); } Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) { diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index ca012dc55b3bf2c..8f47d0a5cd17922 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -71,6 +71,9 @@ class VRuntimeFilterSlots { // process ignore duplicate IN_FILTER std::unordered_set has_in_filter; for (auto filter : _runtime_filters) { + if (filter->filter_id() == 20) { + LOG(WARNING) << "=======1 " << filter->debug_string(); + } if (filter->get_ignored()) { continue; } @@ -98,6 +101,9 @@ class VRuntimeFilterSlots { continue; } filter->set_ignored(); + if (filter->filter_id() == 20) { + LOG(WARNING) << "=======2 " << filter->debug_string(); + } } return Status::OK(); } @@ -112,6 +118,8 @@ class VRuntimeFilterSlots { Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) { // process IN_OR_BLOOM_FILTER's real type for (auto filter : _runtime_filters) { + LOG(WARNING) << "=======3 " << filter->debug_string() << " " << local_hash_table_size + << " " << filter->get_synced_size(); if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER && get_real_size(filter.get(), local_hash_table_size) > state->runtime_filter_max_in_num()) { diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index 8d9aa2e557af857..4a38b08ece50650 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -62,6 +62,7 @@ struct LocalMergeFilters { uint64_t local_merged_size = 0; std::vector> filters; MonotonicStopWatch merge_watcher; + bool contains_ignored_rf = false; }; /// producer: