Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Chore](runtime-filter) improve rf::debug_string and add some check about rf #44697

Merged
merged 2 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1535,11 +1535,13 @@ void IRuntimeFilter::update_runtime_filter_type_to_profile(uint64_t local_merge_

std::string IRuntimeFilter::debug_string() const {
return fmt::format(
"RuntimeFilter: (id = {}, type = {}, is_broadcast: {}, "
"build_bf_cardinality: {}, ignored: {}, error_msg: {}",
"RuntimeFilter: (id = {}, type = {}, is_broadcast: {}, ignored: {}"
"build_bf_cardinality: {}, dependency: {}, synced_size: {}, has_local_target: {}, "
"has_remote_target: {},error_msg: [{}]",
_filter_id, to_string(_runtime_filter_type), _is_broadcast_join,
_wrapper->get_build_bf_cardinality(), _wrapper->is_ignored(),
_wrapper->_context->err_msg);
_wrapper->_context->ignored, _wrapper->get_build_bf_cardinality(),
_dependency ? _dependency->debug_string() : "none", _synced_size, _has_local_target,
_has_remote_target, _wrapper->_context->err_msg);
}

Status IRuntimeFilter::merge_from(const RuntimePredicateWrapper* wrapper) {
Expand Down Expand Up @@ -1592,9 +1594,7 @@ RuntimeFilterType IRuntimeFilter::get_real_type() {
}

bool IRuntimeFilter::need_sync_filter_size() {
return (type() == RuntimeFilterType::IN_OR_BLOOM_FILTER ||
type() == RuntimeFilterType::BLOOM_FILTER) &&
_wrapper->get_build_bf_cardinality() && !_is_broadcast_join;
return _wrapper->get_build_bf_cardinality() && !_is_broadcast_join;
}

void IRuntimeFilter::update_filter(std::shared_ptr<RuntimePredicateWrapper> wrapper,
Expand Down
9 changes: 6 additions & 3 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,9 +354,12 @@ class IRuntimeFilter {
void set_finish_dependency(
const std::shared_ptr<pipeline::CountedFinishDependency>& dependency);

int64_t get_synced_size() const { return _synced_size; }

bool isset_synced_size() const { return _synced_size != -1; }
int64_t get_synced_size() const {
if (_synced_size == -1) {
throw Status::InternalError("sync filter size meet error, filter: {}", debug_string());
}
return _synced_size;
}

protected:
// serialize _wrapper to protobuf
Expand Down
9 changes: 2 additions & 7 deletions be/src/exprs/runtime_filter_slots.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ class VRuntimeFilterSlots {
}

// use synced size when this rf has global merged
static uint64_t get_real_size(IRuntimeFilter* runtime_filter, uint64_t hash_table_size) {
return runtime_filter->isset_synced_size() ? runtime_filter->get_synced_size()
: hash_table_size;
static uint64_t get_real_size(IRuntimeFilter* filter, uint64_t hash_table_size) {
return filter->need_sync_filter_size() ? filter->get_synced_size() : hash_table_size;
}

Status ignore_filters(RuntimeState* state) {
Expand Down Expand Up @@ -119,10 +118,6 @@ class VRuntimeFilterSlots {
}

if (filter->get_real_type() == RuntimeFilterType::BLOOM_FILTER) {
if (filter->need_sync_filter_size() != filter->isset_synced_size()) {
return Status::InternalError("sync filter size meet error, filter: {}",
filter->debug_string());
}
RETURN_IF_ERROR(filter->init_bloom_filter(
get_real_size(filter.get(), local_hash_table_size)));
}
Expand Down
7 changes: 6 additions & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,14 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
if (_closed) {
return Status::OK();
}
auto p = _parent->cast<HashJoinBuildSinkOperatorX>();
Defer defer {[&]() {
if (!_should_build_hash_table) {
return;
}
// The build side hash key column maybe no need output, but we need to keep the column in block
// because it is used to compare with probe side hash key column
auto p = _parent->cast<HashJoinBuildSinkOperatorX>();

if (p._should_keep_hash_key_column && _build_col_ids.size() == 1) {
p._should_keep_column_flags[_build_col_ids[0]] = true;
}
Expand All @@ -143,6 +144,10 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency));
RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
} else {
if (p._shared_hashtable_controller &&
!p._shared_hash_table_context->complete_build_stage) {
return Status::InternalError("close before sink meet eos");
}
auto* block = _shared_state->build_block.get();
uint64_t hash_table_size = block ? block->rows() : 0;
{
Expand Down
Loading