Skip to content

Commit

Permalink
[BugFix] Cached fragment misuses exogenous runtime filter (backport #…
Browse files Browse the repository at this point in the history
…51150) (#51199)

Co-authored-by: satanson <[email protected]>
  • Loading branch information
mergify[bot] and satanson authored Sep 20, 2024
1 parent 5fc9bab commit f3070ef
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ public boolean normalize() {
// Get leftmost path
List<PlanNode> leftNodesTopDown = Lists.newArrayList();
for (PlanNode currNode = root; currNode != null && currNode.getFragment() == fragment;
currNode = currNode.getChild(0)) {
currNode = currNode.getChild(0)) {
leftNodesTopDown.add(currNode);
}

Expand Down Expand Up @@ -774,15 +774,17 @@ public boolean normalize() {
// Not cacheable unless alien GRF(s) take effects on this PlanFragment.
// The alien GRF(s) mean the GRF(S) that not created by PlanNodes of the subtree rooted at
// the PlanFragment.planRoot.

Set<Integer> grfBuilders =
fragment.getProbeRuntimeFilters().values().stream().filter(RuntimeFilterDescription::isHasRemoteTargets)
.map(RuntimeFilterDescription::getBuildPlanNodeId).collect(Collectors.toSet());
if (!grfBuilders.isEmpty()) {
List<PlanFragment> rightSiblings = Lists.newArrayList();
collectRightSiblingFragments(root, rightSiblings, Sets.newHashSet());
Set<Integer> acceptableGrfBuilders = rightSiblings.stream().flatMap(
frag -> frag.getBuildRuntimeFilters().values().stream().map(
RuntimeFilterDescription::getBuildPlanNodeId)).collect(Collectors.toSet());
Set<Integer> acceptableGrfBuilders = rightSiblings.stream()
.flatMap(frag -> frag.getBuildRuntimeFilters().values().stream())
.map(RuntimeFilterDescription::getBuildPlanNodeId)
.collect(Collectors.toSet());
boolean hasAlienGrf = !Sets.difference(grfBuilders, acceptableGrfBuilders).isEmpty();
if (hasAlienGrf) {
return false;
Expand Down
53 changes: 27 additions & 26 deletions fe/fe-core/src/main/java/com/starrocks/planner/PlanFragment.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.LinkedList;
Expand All @@ -67,6 +68,7 @@
import java.util.Queue;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -152,8 +154,8 @@ public class PlanFragment extends TreeNode<PlanFragment> {

protected double fragmentCost;

protected final Map<Integer, RuntimeFilterDescription> buildRuntimeFilters = Maps.newTreeMap();
protected final Map<Integer, RuntimeFilterDescription> probeRuntimeFilters = Maps.newTreeMap();
protected Map<Integer, RuntimeFilterDescription> buildRuntimeFilters = Maps.newHashMap();
protected Map<Integer, RuntimeFilterDescription> probeRuntimeFilters = Maps.newHashMap();

protected List<Pair<Integer, ColumnDict>> queryGlobalDicts = Lists.newArrayList();
protected List<Pair<Integer, ColumnDict>> loadGlobalDicts = Lists.newArrayList();
Expand Down Expand Up @@ -635,34 +637,33 @@ public boolean isTransferQueryStatisticsWithEveryBatch() {
return transferQueryStatisticsWithEveryBatch;
}

public void collectBuildRuntimeFilters(PlanNode root) {
if (root instanceof ExchangeNode) {
return;
}

if (root instanceof RuntimeFilterBuildNode) {
RuntimeFilterBuildNode rfBuildNode = (RuntimeFilterBuildNode) root;
for (RuntimeFilterDescription description : rfBuildNode.getBuildRuntimeFilters()) {
buildRuntimeFilters.put(description.getFilterId(), description);
}
}

for (PlanNode node : root.getChildren()) {
collectBuildRuntimeFilters(node);
}
public void collectBuildRuntimeFilters() {
Map<Integer, RuntimeFilterDescription> filters = Maps.newHashMap();
collectNodes().stream()
.filter(node -> node instanceof RuntimeFilterBuildNode)
.flatMap(node -> ((RuntimeFilterBuildNode) node).getBuildRuntimeFilters().stream())
.forEach(desc -> filters.put(desc.getFilterId(), desc));
buildRuntimeFilters = filters;
}

public void collectProbeRuntimeFilters(PlanNode root) {
for (RuntimeFilterDescription description : root.getProbeRuntimeFilters()) {
probeRuntimeFilters.put(description.getFilterId(), description);
}
public void collectProbeRuntimeFilters() {
Map<Integer, RuntimeFilterDescription> filters = Maps.newHashMap();
collectNodes().stream()
.flatMap(node -> node.getProbeRuntimeFilters().stream())
.forEach(desc -> filters.put(desc.getFilterId(), desc));
probeRuntimeFilters = filters;
}

if (root instanceof ExchangeNode) {
return;
}
public List<PlanNode> collectNodes() {
List<PlanNode> nodes = Lists.newArrayList();
collectNodesImpl(getPlanRoot(), nodes);
return nodes;
}

for (PlanNode node : root.getChildren()) {
collectProbeRuntimeFilters(node);
private void collectNodesImpl(PlanNode root, List<PlanNode> nodes) {
nodes.add(root);
if (!(root instanceof ExchangeNode)) {
root.getChildren().forEach(child -> collectNodesImpl(child, nodes));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,8 +653,6 @@ private void setGlobalRuntimeFilterParams(ExecutionFragment topParams,

for (ExecutionFragment execFragment : executionDAG.getFragmentsInPreorder()) {
PlanFragment fragment = execFragment.getPlanFragment();
fragment.collectBuildRuntimeFilters(fragment.getPlanRoot());
fragment.collectProbeRuntimeFilters(fragment.getPlanRoot());
for (Map.Entry<Integer, RuntimeFilterDescription> kv : fragment.getProbeRuntimeFilters().entrySet()) {
List<TRuntimeFilterProberParams> probeParamList = Lists.newArrayList();
for (final FragmentInstance instance : execFragment.getInstances()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ private static ExecPlan finalizeFragments(ExecPlan execPlan, TResultSinkType res
}

fragments.forEach(PlanFragment::removeDictMappingProbeRuntimeFilters);
fragments.forEach(PlanFragment::collectBuildRuntimeFilters);
fragments.forEach(PlanFragment::collectProbeRuntimeFilters);

if (useQueryCache(execPlan)) {
for (PlanFragment fragment : execPlan.getFragments()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -939,4 +939,20 @@ public void testQueryTimeout() {
Assert.assertThrows(StarRocksPlannerException.class,
() -> getPlanFragment(getDumpInfoFromFile("query_dump/query_timeout"), null, TExplainLevel.NORMAL));
}

@Test
public void testQueryCacheMisuseExogenousRuntimeFilter() throws Exception {
String savedSv = connectContext.getSessionVariable().getJsonString();
try {
connectContext.getSessionVariable().setEnableQueryCache(true);
QueryDumpInfo dumpInfo =
getDumpInfoFromJson(getDumpInfoFromFile("query_dump/query_cache_misuse_exogenous_runtime_filter"));
ExecPlan execPlan = UtFrameUtils.getPlanFragmentFromQueryDump(connectContext, dumpInfo);
Assert.assertTrue(execPlan.getFragments().stream().noneMatch(frag -> frag.getCacheParam() != null));
Assert.assertTrue(
execPlan.getFragments().stream().anyMatch(frag -> !frag.getProbeRuntimeFilters().isEmpty()));
} finally {
connectContext.getSessionVariable().replayFromJson(savedSv);
}
}
}

Large diffs are not rendered by default.

0 comments on commit f3070ef

Please sign in to comment.