Skip to content

Commit

Permalink
[Bug](runtime-filter) fix publish not inited rf when broadcast join m…
Browse files Browse the repository at this point in the history
…eet wake up by downsteam (#44823)

### What problem does this PR solve?
fix publish not inited rf when broadcast join meet wake up by downsteam

related with #44408 #41751
  • Loading branch information
BiteTheDDDDt authored Dec 2, 2024
1 parent c0b8478 commit 2d9d3f7
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 18 deletions.
2 changes: 1 addition & 1 deletion be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1542,7 +1542,7 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile(uint64_t local_merge_

std::string IRuntimeFilter::debug_string() const {
return fmt::format(
"RuntimeFilter: (id = {}, type = {}, is_broadcast: {}, ignored: {}"
"RuntimeFilter: (id = {}, type = {}, is_broadcast: {}, ignored: {}, "
"build_bf_cardinality: {}, dependency: {}, synced_size: {}, has_local_target: {}, "
"has_remote_target: {},error_msg: [{}]",
_filter_id, to_string(_runtime_filter_type), _is_broadcast_join,
Expand Down
38 changes: 21 additions & 17 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,28 +139,32 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
return Base::close(state, exec_status);
}

if (_should_build_hash_table) {
if (state->get_task()->wake_up_by_downstream()) {
if (state->get_task()->wake_up_by_downstream()) {
if (_should_build_hash_table) {
// partitial ignore rf to make global rf work
RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
} else {
if (p._shared_hashtable_controller &&
!p._shared_hash_table_context->complete_build_stage) {
return Status::InternalError("close before sink meet eos");
}
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
{
SCOPED_TIMER(_runtime_filter_init_timer);
RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
}
if (hash_table_size > 1) {
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
}
// do not publish filter coz local rf not inited and useless
return Base::close(state, exec_status);
}
} else if (_should_build_hash_table) {
if (p._shared_hashtable_controller && !p._shared_hash_table_context->complete_build_stage) {
return Status::InternalError("close before sink meet eos");
}
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
{
SCOPED_TIMER(_runtime_filter_init_timer);
RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
}
if (hash_table_size > 1) {
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
}
}

SCOPED_TIMER(_publish_runtime_filter_timer);
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
_runtime_filter_slots->publish(state, !_should_build_hash_table));
Expand Down

0 comments on commit 2d9d3f7

Please sign in to comment.