Skip to content

Commit

Permalink
[Bug](runtime-filter) fix publish not inited rf when broadcast join m…
Browse files Browse the repository at this point in the history
…eet wake up by downsteam #44823 (#44859)

pick from #44823
  • Loading branch information
BiteTheDDDDt authored Dec 2, 2024
1 parent eee4f59 commit 0b681d3
Showing 1 changed file with 11 additions and 7 deletions.
18 changes: 11 additions & 7 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,23 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
}

if (state->get_task()->wake_up_by_downstream()) {
RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
} else {
if (_should_build_hash_table) {
// partitial ignore rf to make global rf work
RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
} else {
// do not publish filter coz local rf not inited and useless
return Base::close(state, exec_status);
}
} else if (_should_build_hash_table) {
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
{
SCOPED_TIMER(_runtime_filter_init_timer);
if (_should_build_hash_table) {
RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size));
}
RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
}
if (_should_build_hash_table && hash_table_size > 1) {
if (hash_table_size > 1) {
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
}
Expand Down

0 comments on commit 0b681d3

Please sign in to comment.