Skip to content

Commit

Permalink
[Bug](runtime-filter) fix wrong build_bf_exactly when sync filter siz…
Browse files Browse the repository at this point in the history
…#44716 (#46965)

pick from #44716
  • Loading branch information
BiteTheDDDDt authored Jan 14, 2025
1 parent 8a6ade1 commit 9b3fb4a
Show file tree
Hide file tree
Showing 11 changed files with 66 additions and 51 deletions.
20 changes: 4 additions & 16 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -992,11 +992,10 @@ class RuntimePredicateWrapper {

Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc,
const TQueryOptions* query_options, const RuntimeFilterRole role,
int node_id, std::shared_ptr<IRuntimeFilter>* res,
bool build_bf_exactly) {
int node_id, std::shared_ptr<IRuntimeFilter>* res) {
*res = std::make_shared<IRuntimeFilter>(state, desc);
(*res)->set_role(role);
return (*res)->init_with_desc(desc, query_options, node_id, build_bf_exactly);
return (*res)->init_with_desc(desc, query_options, node_id);
}

RuntimeFilterContextSPtr& IRuntimeFilter::get_shared_context_ref() {
Expand Down Expand Up @@ -1368,7 +1367,7 @@ std::string IRuntimeFilter::formatted_state() const {
}

Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
int node_id, bool build_bf_exactly) {
int node_id) {
// if node_id == -1 , it shouldn't be a consumer
DCHECK(node_id >= 0 || (node_id == -1 && !is_consumer()));

Expand All @@ -1390,21 +1389,10 @@ Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const TQue
params.runtime_bloom_filter_max_size = options->__isset.runtime_bloom_filter_max_size
? options->runtime_bloom_filter_max_size
: 0;
auto sync_filter_size = desc->__isset.sync_filter_size && desc->sync_filter_size;
// We build runtime filter by exact distinct count iff three conditions are met:
// 1. Only 1 join key
// 2. Bloom filter
// 3. Size of all bloom filters will be same (size will be sync or this is a broadcast join).
params.build_bf_exactly =
build_bf_exactly && (_runtime_filter_type == RuntimeFilterType::BLOOM_FILTER ||
_runtime_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER);

params.build_bf_exactly = desc->__isset.build_bf_exactly && desc->build_bf_exactly;
params.bloom_filter_size_calculated_by_ndv = desc->bloom_filter_size_calculated_by_ndv;

if (!sync_filter_size) {
params.build_bf_exactly &= !_is_broadcast_join;
}

if (desc->__isset.bloom_filter_size_bytes) {
params.bloom_filter_size = desc->bloom_filter_size_bytes;
}
Expand Down
5 changes: 2 additions & 3 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ class IRuntimeFilter {

static Status create(RuntimeFilterParamsContext* state, const TRuntimeFilterDesc* desc,
const TQueryOptions* query_options, const RuntimeFilterRole role,
int node_id, std::shared_ptr<IRuntimeFilter>* res,
bool build_bf_exactly = false);
int node_id, std::shared_ptr<IRuntimeFilter>* res);

RuntimeFilterContextSPtr& get_shared_context_ref();

Expand Down Expand Up @@ -259,7 +258,7 @@ class IRuntimeFilter {

// init filter with desc
Status init_with_desc(const TRuntimeFilterDesc* desc, const TQueryOptions* options,
int node_id = -1, bool build_bf_exactly = false);
int node_id = -1);

// serialize _wrapper to protobuf
Status serialize(PMergeFilterRequest* request, void** data, int* len);
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
_hash_table_init(state);
_runtime_filters.resize(p._runtime_filter_descs.size());
for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
RETURN_IF_ERROR(state->register_producer_runtime_filter(
p._runtime_filter_descs[i], &_runtime_filters[i], _build_expr_ctxs.size() == 1));
RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i],
&_runtime_filters[i]));
}

_runtime_filter_slots =
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/nested_loop_join_build_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkSta
_runtime_filters.resize(p._runtime_filter_descs.size());
for (size_t i = 0; i < p._runtime_filter_descs.size(); i++) {
RETURN_IF_ERROR(state->register_producer_runtime_filter(p._runtime_filter_descs[i],
&_runtime_filters[i], false));
&_runtime_filters[i]));
}
return Status::OK();
}
Expand Down
22 changes: 10 additions & 12 deletions be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ std::vector<std::shared_ptr<IRuntimeFilter>> RuntimeFilterMgr::get_consume_filte
Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc,
const TQueryOptions& options, int node_id,
std::shared_ptr<IRuntimeFilter>* consumer_filter,
bool build_bf_exactly, bool need_local_merge) {
bool need_local_merge) {
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;
bool has_exist = false;
Expand All @@ -110,7 +110,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc
if (!has_exist) {
std::shared_ptr<IRuntimeFilter> filter;
RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options, RuntimeFilterRole::CONSUMER,
node_id, &filter, build_bf_exactly));
node_id, &filter));
_consumer_map[key].emplace_back(node_id, filter);
*consumer_filter = filter;
} else if (!need_local_merge) {
Expand All @@ -122,7 +122,7 @@ Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc& desc

Status RuntimeFilterMgr::register_local_merge_producer_filter(
const doris::TRuntimeFilterDesc& desc, const doris::TQueryOptions& options,
std::shared_ptr<IRuntimeFilter> producer_filter, bool build_bf_exactly) {
std::shared_ptr<IRuntimeFilter> producer_filter) {
DCHECK(_is_global);
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;
Expand All @@ -143,8 +143,7 @@ Status RuntimeFilterMgr::register_local_merge_producer_filter(
if (iter->second.filters.empty()) {
std::shared_ptr<IRuntimeFilter> merge_filter;
RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options,
RuntimeFilterRole::PRODUCER, -1, &merge_filter,
build_bf_exactly));
RuntimeFilterRole::PRODUCER, -1, &merge_filter));
merge_filter->set_ignored();
iter->second.filters.emplace_back(merge_filter);
}
Expand Down Expand Up @@ -181,10 +180,9 @@ doris::LocalMergeFilters* RuntimeFilterMgr::get_local_merge_producer_filters(int
return &iter->second;
}

Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc,
const TQueryOptions& options,
std::shared_ptr<IRuntimeFilter>* producer_filter,
bool build_bf_exactly) {
Status RuntimeFilterMgr::register_producer_filter(
const TRuntimeFilterDesc& desc, const TQueryOptions& options,
std::shared_ptr<IRuntimeFilter>* producer_filter) {
DCHECK(!_is_global);
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;
Expand All @@ -196,7 +194,7 @@ Status RuntimeFilterMgr::register_producer_filter(const TRuntimeFilterDesc& desc
return Status::InvalidArgument("filter has registed");
}
RETURN_IF_ERROR(IRuntimeFilter::create(_state, &desc, &options, RuntimeFilterRole::PRODUCER, -1,
producer_filter, build_bf_exactly));
producer_filter));
_producer_map.emplace(key, *producer_filter);
return Status::OK();
}
Expand Down Expand Up @@ -233,8 +231,8 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
cnt_val->filter = cnt_val->pool->add(new IRuntimeFilter(_state, runtime_filter_desc));

auto filter_id = runtime_filter_desc->filter_id;
RETURN_IF_ERROR(cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options,
-1, false));
RETURN_IF_ERROR(
cnt_val->filter->init_with_desc(&cnt_val->runtime_filter_desc, query_options, -1));
cnt_val->filter->set_ignored();
_filter_map.emplace(filter_id, cnt_val);
return Status::OK();
Expand Down
8 changes: 3 additions & 5 deletions be/src/runtime/runtime_filter_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,17 @@ class RuntimeFilterMgr {
// register filter
Status register_consumer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options,
int node_id, std::shared_ptr<IRuntimeFilter>* consumer_filter,
bool build_bf_exactly = false, bool need_local_merge = false);
bool need_local_merge = false);

Status register_local_merge_producer_filter(const TRuntimeFilterDesc& desc,
const TQueryOptions& options,
std::shared_ptr<IRuntimeFilter> producer_filter,
bool build_bf_exactly = false);
std::shared_ptr<IRuntimeFilter> producer_filter);

Status get_local_merge_producer_filters(int filter_id, LocalMergeFilters** local_merge_filters);
LocalMergeFilters* get_local_merge_producer_filters(int filter_id);

Status register_producer_filter(const TRuntimeFilterDesc& desc, const TQueryOptions& options,
std::shared_ptr<IRuntimeFilter>* producer_filter,
bool build_bf_exactly = false);
std::shared_ptr<IRuntimeFilter>* producer_filter);

// update filter by remote
void set_runtime_filter_params(const TRuntimeFilterParams& runtime_filter_params);
Expand Down
13 changes: 6 additions & 7 deletions be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -522,14 +522,13 @@ RuntimeFilterMgr* RuntimeState::global_runtime_filter_mgr() {
}

Status RuntimeState::register_producer_runtime_filter(
const TRuntimeFilterDesc& desc, std::shared_ptr<IRuntimeFilter>* producer_filter,
bool build_bf_exactly) {
const TRuntimeFilterDesc& desc, std::shared_ptr<IRuntimeFilter>* producer_filter) {
// Producers are created by local runtime filter mgr and shared by global runtime filter manager.
// When RF is published, consumers in both global and local RF mgr will be found.
RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(
desc, query_options(), producer_filter, build_bf_exactly));
RETURN_IF_ERROR(local_runtime_filter_mgr()->register_producer_filter(desc, query_options(),
producer_filter));
RETURN_IF_ERROR(global_runtime_filter_mgr()->register_local_merge_producer_filter(
desc, query_options(), *producer_filter, build_bf_exactly));
desc, query_options(), *producer_filter));
return Status::OK();
}

Expand All @@ -538,10 +537,10 @@ Status RuntimeState::register_consumer_runtime_filter(
std::shared_ptr<IRuntimeFilter>* consumer_filter) {
if (desc.has_remote_targets || need_local_merge) {
return global_runtime_filter_mgr()->register_consumer_filter(desc, query_options(), node_id,
consumer_filter, false, true);
consumer_filter, true);
} else {
return local_runtime_filter_mgr()->register_consumer_filter(desc, query_options(), node_id,
consumer_filter, false, false);
consumer_filter, false);
}
}

Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,7 @@ class RuntimeState {
}

Status register_producer_runtime_filter(const doris::TRuntimeFilterDesc& desc,
std::shared_ptr<IRuntimeFilter>* producer_filter,
bool build_bf_exactly);
std::shared_ptr<IRuntimeFilter>* producer_filter);

Status register_consumer_runtime_filter(const doris::TRuntimeFilterDesc& desc,
bool need_local_merge, int node_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public void createLegacyRuntimeFilter(RuntimeFilter filter, JoinNodeBase node, P
targetTupleIdMapList, context.getLimits());
if (node instanceof HashJoinNode) {
origFilter.setIsBroadcast(((HashJoinNode) node).getDistributionMode() == DistributionMode.BROADCAST);
origFilter.setSingleEq(((HashJoinNode) node).getEqJoinConjuncts().size());
} else {
// nest loop join
origFilter.setIsBroadcast(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ public final class RuntimeFilter {

private boolean bloomFilterSizeCalculatedByNdv = false;

private boolean singleEq = false;

/**
* Internal representation of a runtime filter target.
*/
Expand Down Expand Up @@ -216,9 +218,36 @@ public TRuntimeFilterDesc toThrift() {
tFilter.setIsBroadcastJoin(isBroadcastJoin);
tFilter.setHasLocalTargets(hasLocalTargets);
tFilter.setHasRemoteTargets(hasRemoteTargets);

boolean hasSerialTargets = false;
for (RuntimeFilterTarget target : targets) {
tFilter.putToPlanIdToTargetExpr(target.node.getId().asInt(), target.expr.treeToThrift());
hasSerialTargets = hasSerialTargets
|| (target.node.isSerialOperator() && target.node.fragment.useSerialSource(ConnectContext.get()));
}

boolean enableSyncFilterSize = ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable().enableSyncRuntimeFilterSize();

// there are two cases has local exchange between join and scan
// 1. hasRemoteTargets is true means join probe side do least once shuffle (has shuffle between join and scan)
// 2. hasSerialTargets is true means scan is pooled (has local shuffle between join and scan)
boolean needShuffle = hasRemoteTargets || hasSerialTargets;

// There are two cases where all instances of rf have the same size.
// 1. enableSyncFilterSize is true means backends will collect global size and send to every instance
// 2. isBroadcastJoin is true means each join node instance have the same full amount of data
boolean hasGlobalSize = enableSyncFilterSize || isBroadcastJoin;

// build runtime filter by exact distinct count if all of 3 conditions are met:
// 1. only single eq conjunct
// 2. rf type may be bf
// 3. each filter only acts on self instance(do not need any shuffle), or size of
// all filters will be same
boolean buildBfExactly = singleEq && (runtimeFilterType == TRuntimeFilterType.IN_OR_BLOOM
|| runtimeFilterType == TRuntimeFilterType.BLOOM) && (!needShuffle || hasGlobalSize);
tFilter.setBuildBfExactly(buildBfExactly);

tFilter.setType(runtimeFilterType);
tFilter.setBloomFilterSizeBytes(filterSizeBytes);
if (runtimeFilterType.equals(TRuntimeFilterType.BITMAP)) {
Expand All @@ -239,8 +268,6 @@ public TRuntimeFilterDesc toThrift() {
tFilter.setNullAware(false);
}
}
tFilter.setSyncFilterSize(ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable().enableSyncRuntimeFilterSize());
return tFilter;
}

Expand Down Expand Up @@ -597,6 +624,10 @@ public void addTarget(RuntimeFilterTarget target) {
targets.add(target);
}

public void setSingleEq(int eqJoinConjunctsNumbers) {
singleEq = (eqJoinConjunctsNumbers == 1);
}

public void setIsBroadcast(boolean isBroadcast) {
isBroadcastJoin = isBroadcast;
}
Expand Down
4 changes: 3 additions & 1 deletion gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -1301,7 +1301,9 @@ struct TRuntimeFilterDesc {
// true, if join type is null aware like <=>. rf should dispose the case
15: optional bool null_aware;

16: optional bool sync_filter_size;
16: optional bool sync_filter_size; // Deprecated

17: optional bool build_bf_exactly;
}


Expand Down

0 comments on commit 9b3fb4a

Please sign in to comment.