Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug](runtime-filter) fix wrong build_bf_exactly when sync filter size disabled #44716

Merged
merged 5 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 4 additions & 16 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -975,11 +975,10 @@ class RuntimePredicateWrapper {

Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc,
const TQueryOptions* query_options, const RuntimeFilterRole role,
int node_id, std::shared_ptr<IRuntimeFilter>* res,
bool build_bf_exactly) {
int node_id, std::shared_ptr<IRuntimeFilter>* res) {
*res = std::make_shared<IRuntimeFilter>(state, desc);
(*res)->set_role(role);
return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly);
return (*res)->init_with_desc(desc, query_options, node_id);
}

RuntimeFilterContextSPtr& IRuntimeFilter::get_shared_context_ref() {
Expand Down Expand Up @@ -1344,7 +1343,7 @@ std::string IRuntimeFilter::formatted_state() const {
}

Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
int node_id, bool build_bf_exactly) {
int node_id) {
// if node_id == -1 , it shouldn't be a consumer
DCHECK(node_id >= 0 || (node_id == -1 && !is_consumer()));

Expand All @@ -1366,21 +1365,10 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue
params.runtime_bloom_filter_max_size = options->__isset.runtime_bloom_filter_max_size
? options->runtime_bloom_filter_max_size
: 0;
auto sync_filter_size = desc->__isset.sync_filter_size && desc->sync_filter_size;
// We build runtime filter by exact distinct count if all of 3 conditions are met:
// 1. Only 1 join key
// 2. Bloom filter
// 3. Size of all bloom filters will be same (size will be sync or this is a broadcast join).
params.build_bf_exactly =
build_bf_exactly && (_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER ||
_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER);

params.build_bf_exactly = desc->build_bf_exactly;
BiteTheDDDDt marked this conversation as resolved.
Show resolved Hide resolved
params.bloom_filter_size_calculated_by_ndv = desc->bloom_filter_size_calculated_by_ndv;

if (!sync_filter_size) {
params.build_bf_exactly &= !_is_broadcast_join;
}

if (desc->__isset.bloom_filter_size_bytes) {
params.bloom_filter_size = desc->bloom_filter_size_bytes;
}
Expand Down
5 changes: 2 additions & 3 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,7 @@ class IRuntimeFilter {

static Status create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc,
const TQueryOptions* query_options, const RuntimeFilterRole role,
int node_id, std::shared_ptr<IRuntimeFilter>* res,
bool build_bf_exactly = false);
int node_id, std::shared_ptr<IRuntimeFilter>* res);

RuntimeFilterContextSPtr& get_shared_context_ref();

Expand Down Expand Up @@ -260,7 +259,7 @@ class IRuntimeFilter {

// init filter with desc
Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
int node_id = -1, bool build_bf_exactly = false);
int node_id = -1);

// serialize _wrapper to protobuf
Status serialize(PMergeFilterRequest* request, void** data, int* len);
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
RETURN_IF_ERROR(_hash_table_init(state));
_runtime_filters.resize(p._runtime_filter_descs.size());
for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
RETURN_IF_ERROR(state->register_producer_runtime_filter(
p._runtime_filter_descs[i], &_runtime_filters[i], _build_expr_ctxs.size() == 1));
RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i],
&_runtime_filters[i]));
}

_runtime_filter_slots =
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/nested_loop_join_build_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta
_runtime_filters.resize(p._runtime_filter_descs.size());
for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i],
&_runtime_filters[i], false));
&_runtime_filters[i]));
}
return Status::OK();
}
Expand Down
22 changes: 10 additions & 12 deletions be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ std::vector<std::shared_ptr<IRuntimeFilter>> RuntimeFilterMgr::get_consume_filte
Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc,
const TQueryOptions& options, int node_id,
std::shared_ptr<IRuntimeFilter>* consumer_filter,
bool build_bf_exactly, bool need_local_merge) {
bool need_local_merge) {
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;
bool has_exist = false;
Expand All @@ -110,7 +110,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc
if (!has_exist) {
std::shared_ptr<IRuntimeFilter> filter;
RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options, RuntimeFilterRole::CONSUMER,
node_id, &filter, build_bf_exactly));
node_id, &filter));
_consumer_map[key].emplace_back(node_id, filter);
*consumer_filter = filter;
} else if (!need_local_merge) {
Expand All @@ -122,7 +122,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc

Status RuntimeFilterMgr::register_local_merge_producer_filter(
const doris::TRuntimeFilterDesc& desc, const doris::TQueryOptions& options,
std::shared_ptr<IRuntimeFilter> producer_filter, bool build_bf_exactly) {
std::shared_ptr<IRuntimeFilter> producer_filter) {
DCHECK(_is_global);
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;
Expand All @@ -143,8 +143,7 @@ Status RuntimeFilterMgr::register_local_merge_producer_filter(
if (iter->second.filters.empty()) {
std::shared_ptr<IRuntimeFilter> merge_filter;
RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options,
RuntimeFilterRole::PRODUCER, -1, &merge_filter,
build_bf_exactly));
RuntimeFilterRole::PRODUCER, -1, &merge_filter));
merge_filter->set_ignored();
iter->second.filters.emplace_back(merge_filter);
}
Expand Down Expand Up @@ -181,10 +180,9 @@ doris::LocalMergeFilters* RuntimeFilterMgr::get_local_merge_producer_filters(int
return &iter->second;
}

Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc,
const TQueryOptions& options,
std::shared_ptr<IRuntimeFilter>* producer_filter,
bool build_bf_exactly) {
Status RuntimeFilterMgr::register_producer_filter(
const TRuntimeFilterDesc& desc, const TQueryOptions& options,
std::shared_ptr<IRuntimeFilter>* producer_filter) {
DCHECK(!_is_global);
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;
Expand All @@ -196,7 +194,7 @@ Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc
return Status::InvalidArgument("filter has registed");
}
RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options, RuntimeFilterRole::PRODUCER, -1,
producer_filter, build_bf_exactly));
producer_filter));
_producer_map.emplace(key, *producer_filter);
return Status::OK();
}
Expand Down Expand Up @@ -233,8 +231,8 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
cnt_val->filter = cnt_val->pool->add(new IRuntimeFilter(_state, runtime_filter_desc));

auto filter_id = runtime_filter_desc->filter_id;
RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options,
-1, false));
RETURN_IF_ERROR(
cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options, -1));
cnt_val->filter->set_ignored();
_filter_map.emplace(filter_id, cnt_val);
return Status::OK();
Expand Down
8 changes: 3 additions & 5 deletions be/src/runtime/runtime_filter_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,17 @@ class RuntimeFilterMgr {
// register filter
Status register_consumer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options,
int node_id, std::shared_ptr<IRuntimeFilter>* consumer_filter,
bool build_bf_exactly = false, bool need_local_merge = false);
bool need_local_merge = false);

Status register_local_merge_producer_filter(const TRuntimeFilterDesc& desc,
const TQueryOptions& options,
std::shared_ptr<IRuntimeFilter> producer_filter,
bool build_bf_exactly = false);
std::shared_ptr<IRuntimeFilter> producer_filter);

Status get_local_merge_producer_filters(int filter_id, LocalMergeFilters** local_merge_filters);
LocalMergeFilters* get_local_merge_producer_filters(int filter_id);

Status register_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options,
std::shared_ptr<IRuntimeFilter>* producer_filter,
bool build_bf_exactly = false);
std::shared_ptr<IRuntimeFilter>* producer_filter);

// update filter by remote
void set_runtime_filter_params(const TRuntimeFilterParams& runtime_filter_params);
Expand Down
13 changes: 6 additions & 7 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -516,14 +516,13 @@ RuntimeFilterMgr* RuntimeState::global_runtime_filter_mgr() {
}

Status RuntimeState::register_producer_runtime_filter(
const TRuntimeFilterDesc& desc, std::shared_ptr<IRuntimeFilter>* producer_filter,
bool build_bf_exactly) {
const TRuntimeFilterDesc& desc, std::shared_ptr<IRuntimeFilter>* producer_filter) {
// Producers are created by local runtime filter mgr and shared by global runtime filter manager.
// When RF is published, consumers in both global and local RF mgr will be found.
RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(
desc, query_options(), producer_filter, build_bf_exactly));
RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(desc, query_options(),
producer_filter));
RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merge_producer_filter(
desc, query_options(), *producer_filter, build_bf_exactly));
desc, query_options(), *producer_filter));
return Status::OK();
}

Expand All @@ -532,10 +531,10 @@ Status RuntimeState::register_consumer_runtime_filter(
std::shared_ptr<IRuntimeFilter>* consumer_filter) {
if (desc.has_remote_targets || need_local_merge) {
return global_runtime_filter_mgr()->register_consumer_filter(desc, query_options(), node_id,
consumer_filter, false, true);
consumer_filter, true);
} else {
return local_runtime_filter_mgr()->register_consumer_filter(desc, query_options(), node_id,
consumer_filter, false, false);
consumer_filter, false);
}
}

Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,7 @@ class RuntimeState {
}

Status register_producer_runtime_filter(const doris::TRuntimeFilterDesc& desc,
std::shared_ptr<IRuntimeFilter>* producer_filter,
bool build_bf_exactly);
std::shared_ptr<IRuntimeFilter>* producer_filter);

Status register_consumer_runtime_filter(const doris::TRuntimeFilterDesc& desc,
bool need_local_merge, int node_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public void createLegacyRuntimeFilter(RuntimeFilter filter, JoinNodeBase node, P
targetTupleIdMapList, context.getLimits());
if (node instanceof HashJoinNode) {
origFilter.setIsBroadcast(((HashJoinNode) node).getDistributionMode() == DistributionMode.BROADCAST);
origFilter.setSingleEq(((HashJoinNode) node).getEqJoinConjuncts().size());
} else {
// nest loop join
origFilter.setIsBroadcast(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ public final class RuntimeFilter {

private boolean bloomFilterSizeCalculatedByNdv = false;

private boolean singleEq = false;

/**
* Internal representation of a runtime filter target.
*/
Expand Down Expand Up @@ -216,9 +218,36 @@ public TRuntimeFilterDesc toThrift() {
tFilter.setIsBroadcastJoin(isBroadcastJoin);
tFilter.setHasLocalTargets(hasLocalTargets);
tFilter.setHasRemoteTargets(hasRemoteTargets);

boolean hasSerialTargets = false;
for (RuntimeFilterTarget target : targets) {
tFilter.putToPlanIdToTargetExpr(target.node.getId().asInt(), target.expr.treeToThrift());
hasSerialTargets = hasSerialTargets
|| (target.node.isSerialOperator() && target.node.fragment.useSerialSource(ConnectContext.get()));
}

boolean enableSyncFilterSize = ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable().enableSyncRuntimeFilterSize();
// there are two cases need merge rf
// 1. hasRemoteTargets is true means join type is hash shuffle join then rf will
BiteTheDDDDt marked this conversation as resolved.
Show resolved Hide resolved
// merged into one
// 2. hasSerialTargets is true means scan is pooled then rf need merged into one
boolean needMerge = hasRemoteTargets || hasSerialTargets;

// There are two cases where all instances of rf have the same size.
// 1. enableSyncFilterSize is true means backends will collect global size and send to every instance
// 2. isBroadcastJoin is true means each join node instance have the same full amount of data
boolean hasGlobalSize = enableSyncFilterSize || isBroadcastJoin;

// build runtime filter by exact distinct count if all of 3 conditions are met:
// 1. only single eq conjunct
// 2. rf type may be bf
// 3. each filter only acts on self instance(do not need any merge), or size of
// all filters will be same
boolean buildBfExactly = singleEq && (runtimeFilterType == TRuntimeFilterType.IN_OR_BLOOM
|| runtimeFilterType == TRuntimeFilterType.BLOOM) && (!needMerge || hasGlobalSize);
tFilter.setBuildBfExactly(buildBfExactly);

tFilter.setType(runtimeFilterType);
tFilter.setBloomFilterSizeBytes(filterSizeBytes);
if (runtimeFilterType.equals(TRuntimeFilterType.BITMAP)) {
Expand All @@ -239,8 +268,6 @@ public TRuntimeFilterDesc toThrift() {
tFilter.setNullAware(false);
}
}
tFilter.setSyncFilterSize(ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable().enableSyncRuntimeFilterSize());
return tFilter;
}

Expand Down Expand Up @@ -597,6 +624,14 @@ public void addTarget(RuntimeFilterTarget target) {
targets.add(target);
}

public void setSingleEq(int eqJoinConjunctsNumbers) {
singleEq = (eqJoinConjunctsNumbers == 1);
}

public boolean getSingleEq() {
BiteTheDDDDt marked this conversation as resolved.
Show resolved Hide resolved
return singleEq;
}

public void setIsBroadcast(boolean isBroadcast) {
isBroadcastJoin = isBroadcast;
}
Expand Down
4 changes: 3 additions & 1 deletion gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1293,7 +1293,9 @@ struct TRuntimeFilterDesc {
// true, if join type is null aware like <=>. rf should dispose the case
15: optional bool null_aware;

16: optional bool sync_filter_size;
16: optional bool sync_filter_size; // Deprecated

17: optional bool build_bf_exactly;
}


Expand Down
Loading