From 2d9d3f79df01c475a1a0bbc1b68d2084323f04f5 Mon Sep 17 00:00:00 2001 From: Pxl Date: Mon, 2 Dec 2024 10:26:56 +0800 Subject: [PATCH] [Bug](runtime-filter) fix publish not inited rf when broadcast join meet wake up by downsteam (#44823) ### What problem does this PR solve? fix publish not inited rf when broadcast join meet wake up by downsteam related with #44408 #41751 --- be/src/exprs/runtime_filter.cpp | 2 +- be/src/pipeline/exec/hashjoin_build_sink.cpp | 38 +++++++++++--------- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index e26452c9ef69e5..c983af0fb3ea71 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1542,7 +1542,7 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile(uint64_t local_merge_ std::string IRuntimeFilter::debug_string() const { return fmt::format( - "RuntimeFilter: (id = {}, type = {}, is_broadcast: {}, ignored: {}" + "RuntimeFilter: (id = {}, type = {}, is_broadcast: {}, ignored: {}, " "build_bf_cardinality: {}, dependency: {}, synced_size: {}, has_local_target: {}, " "has_remote_target: {},error_msg: [{}]", _filter_id, to_string(_runtime_filter_type), _is_broadcast_join, diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 0a71b86bed0e86..cec0c77da8a61d 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -139,28 +139,32 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu return Base::close(state, exec_status); } - if (_should_build_hash_table) { - if (state->get_task()->wake_up_by_downstream()) { + if (state->get_task()->wake_up_by_downstream()) { + if (_should_build_hash_table) { + // partitial ignore rf to make global rf work RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency)); RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters()); } else { - if (p._shared_hashtable_controller && - !p._shared_hash_table_context->complete_build_stage) { - return Status::InternalError("close before sink meet eos"); - } - auto* block = _shared_state->build_block.get(); - uint64_t hash_table_size = block ? block->rows() : 0; - { - SCOPED_TIMER(_runtime_filter_init_timer); - RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size)); - RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state)); - } - if (hash_table_size > 1) { - SCOPED_TIMER(_runtime_filter_compute_timer); - _runtime_filter_slots->insert(block); - } + // do not publish filter coz local rf not inited and useless + return Base::close(state, exec_status); + } + } else if (_should_build_hash_table) { + if (p._shared_hashtable_controller && !p._shared_hash_table_context->complete_build_stage) { + return Status::InternalError("close before sink meet eos"); + } + auto* block = _shared_state->build_block.get(); + uint64_t hash_table_size = block ? block->rows() : 0; + { + SCOPED_TIMER(_runtime_filter_init_timer); + RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size)); + RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state)); + } + if (hash_table_size > 1) { + SCOPED_TIMER(_runtime_filter_compute_timer); + _runtime_filter_slots->insert(block); } } + SCOPED_TIMER(_publish_runtime_filter_timer); RETURN_IF_ERROR_OR_CATCH_EXCEPTION( _runtime_filter_slots->publish(state, !_should_build_hash_table));