Skip to content

Commit

Permalink
[BugFix] Cached fragment misuses exogenous runtime filter(backport St…
Browse files Browse the repository at this point in the history
…arRocks#51150)

Signed-off-by: satanson <[email protected]>
  • Loading branch information
satanson committed Sep 20, 2024
1 parent c715428 commit c99cf13
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ public boolean normalize() {
// Get leftmost path
List<PlanNode> leftNodesTopDown = Lists.newArrayList();
for (PlanNode currNode = root; currNode != null && currNode.getFragment() == fragment;
currNode = currNode.getChild(0)) {
currNode = currNode.getChild(0)) {
leftNodesTopDown.add(currNode);
}

Expand Down Expand Up @@ -774,15 +774,17 @@ public boolean normalize() {
// Not cacheable unless alien GRF(s) take effects on this PlanFragment.
// The alien GRF(s) mean the GRF(S) that not created by PlanNodes of the subtree rooted at
// the PlanFragment.planRoot.

Set<Integer> grfBuilders =
fragment.getProbeRuntimeFilters().values().stream().filter(RuntimeFilterDescription::isHasRemoteTargets)
.map(RuntimeFilterDescription::getBuildPlanNodeId).collect(Collectors.toSet());
if (!grfBuilders.isEmpty()) {
List<PlanFragment> rightSiblings = Lists.newArrayList();
collectRightSiblingFragments(root, rightSiblings, Sets.newHashSet());
Set<Integer> acceptableGrfBuilders = rightSiblings.stream().flatMap(
frag -> frag.getBuildRuntimeFilters().values().stream().map(
RuntimeFilterDescription::getBuildPlanNodeId)).collect(Collectors.toSet());
Set<Integer> acceptableGrfBuilders = rightSiblings.stream()
.flatMap(frag -> frag.getBuildRuntimeFilters().values().stream())
.map(RuntimeFilterDescription::getBuildPlanNodeId)
.collect(Collectors.toSet());
boolean hasAlienGrf = !Sets.difference(grfBuilders, acceptableGrfBuilders).isEmpty();
if (hasAlienGrf) {
return false;
Expand Down
55 changes: 27 additions & 28 deletions fe/fe-core/src/main/java/com/starrocks/planner/PlanFragment.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,15 @@ public class PlanFragment extends TreeNode<PlanFragment> {
protected int parallelExecNum = 1;
protected int pipelineDop = 1;

private double fragmentCost;

// Whether to assign scan ranges to each driver sequence of pipeline,
// for the normal backend assignment (not colocate, bucket, and replicated join).
protected boolean assignScanRangesPerDriverSeq = false;
protected boolean withLocalShuffle = false;

protected final Map<Integer, RuntimeFilterDescription> buildRuntimeFilters = Maps.newTreeMap();
protected final Map<Integer, RuntimeFilterDescription> probeRuntimeFilters = Maps.newTreeMap();
protected double fragmentCost;

protected Map<Integer, RuntimeFilterDescription> buildRuntimeFilters = Maps.newHashMap();
protected Map<Integer, RuntimeFilterDescription> probeRuntimeFilters = Maps.newHashMap();

protected List<Pair<Integer, ColumnDict>> queryGlobalDicts = Lists.newArrayList();
protected List<Pair<Integer, ColumnDict>> loadGlobalDicts = Lists.newArrayList();
Expand Down Expand Up @@ -601,34 +601,33 @@ public boolean isTransferQueryStatisticsWithEveryBatch() {
return transferQueryStatisticsWithEveryBatch;
}

public void collectBuildRuntimeFilters(PlanNode root) {
if (root instanceof ExchangeNode) {
return;
}

if (root instanceof RuntimeFilterBuildNode) {
RuntimeFilterBuildNode rfBuildNode = (RuntimeFilterBuildNode) root;
for (RuntimeFilterDescription description : rfBuildNode.getBuildRuntimeFilters()) {
buildRuntimeFilters.put(description.getFilterId(), description);
}
}

for (PlanNode node : root.getChildren()) {
collectBuildRuntimeFilters(node);
}
public void collectBuildRuntimeFilters() {
Map<Integer, RuntimeFilterDescription> filters = Maps.newHashMap();
collectNodes().stream()
.filter(node -> node instanceof RuntimeFilterBuildNode)
.flatMap(node -> ((RuntimeFilterBuildNode) node).getBuildRuntimeFilters().stream())
.forEach(desc -> filters.put(desc.getFilterId(), desc));
buildRuntimeFilters = filters;
}

public void collectProbeRuntimeFilters(PlanNode root) {
for (RuntimeFilterDescription description : root.getProbeRuntimeFilters()) {
probeRuntimeFilters.put(description.getFilterId(), description);
}
public void collectProbeRuntimeFilters() {
Map<Integer, RuntimeFilterDescription> filters = Maps.newHashMap();
collectNodes().stream()
.flatMap(node -> node.getProbeRuntimeFilters().stream())
.forEach(desc -> filters.put(desc.getFilterId(), desc));
probeRuntimeFilters = filters;
}

if (root instanceof ExchangeNode) {
return;
}
public List<PlanNode> collectNodes() {
List<PlanNode> nodes = Lists.newArrayList();
collectNodesImpl(getPlanRoot(), nodes);
return nodes;
}

for (PlanNode node : root.getChildren()) {
collectProbeRuntimeFilters(node);
private void collectNodesImpl(PlanNode root, List<PlanNode> nodes) {
nodes.add(root);
if (!(root instanceof ExchangeNode)) {
root.getChildren().forEach(child -> collectNodesImpl(child, nodes));
}
}

Expand Down
2 changes: 0 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1317,8 +1317,6 @@ private void setGlobalRuntimeFilterParams(CoordinatorPreprocessor.FInstanceExecP
Map<Integer, List<TRuntimeFilterProberParams>> idToProbePrams = new HashMap<>();

for (PlanFragment fragment : fragments) {
fragment.collectBuildRuntimeFilters(fragment.getPlanRoot());
fragment.collectProbeRuntimeFilters(fragment.getPlanRoot());
CoordinatorPreprocessor.FragmentExecParams params =
coordinatorPreprocessor.getFragmentExecParamsMap().get(fragment.getFragmentId());
for (Map.Entry<Integer, RuntimeFilterDescription> kv : fragment.getProbeRuntimeFilters().entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,8 @@ private static ExecPlan finalizeFragments(ExecPlan execPlan, TResultSinkType res
}

fragments.forEach(PlanFragment::removeDictMappingProbeRuntimeFilters);
fragments.forEach(PlanFragment::collectBuildRuntimeFilters);
fragments.forEach(PlanFragment::collectProbeRuntimeFilters);

if (useQueryCache(execPlan)) {
for (PlanFragment fragment : execPlan.getFragments()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -856,4 +856,19 @@ public void testNoCTEOperatorPropertyDerived() throws Exception {
" 24:EXCHANGE"));
}

@Test
public void testQueryCacheMisuseExogenousRuntimeFilter() throws Exception {
String savedSv = connectContext.getSessionVariable().getJsonString();
try {
connectContext.getSessionVariable().setEnableQueryCache(true);
QueryDumpInfo dumpInfo =
getDumpInfoFromJson(getDumpInfoFromFile("query_dump/query_cache_misuse_exogenous_runtime_filter"));
ExecPlan execPlan = UtFrameUtils.getPlanFragmentFromQueryDump(connectContext, dumpInfo);
Assert.assertTrue(execPlan.getFragments().stream().noneMatch(frag -> frag.getCacheParam() != null));
Assert.assertTrue(
execPlan.getFragments().stream().anyMatch(frag -> !frag.getProbeRuntimeFilters().isEmpty()));
} finally {
connectContext.getSessionVariable().replayFromJson(savedSv);
}
}
}

Large diffs are not rendered by default.

0 comments on commit c99cf13

Please sign in to comment.