From eb506a1c19839564cc779417a0fb1c722db7bcd0 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 19 Nov 2024 09:49:28 +0800 Subject: [PATCH] =?UTF-8?q?[Improvement](shuffle)=20Use=20a=20knob=20to=20?= =?UTF-8?q?decide=20whether=20a=20serial=20=E2=80=A6=20(#44076)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/apache/doris/planner/ExchangeNode.java | 13 ++++++++++++- .../org/apache/doris/planner/PlanFragment.java | 14 ++++++-------- .../java/org/apache/doris/planner/PlanNode.java | 7 +++++++ .../java/org/apache/doris/planner/ScanNode.java | 5 +++++ .../main/java/org/apache/doris/qe/Coordinator.java | 10 ++-------- .../java/org/apache/doris/qe/SessionVariable.java | 11 +++++++++++ 6 files changed, 43 insertions(+), 17 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java index 97d46b109b700f..d904397a305da7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java @@ -25,6 +25,7 @@ import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; import org.apache.doris.common.UserException; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.statistics.StatsRecursiveDerive; import org.apache.doris.thrift.TExchangeNode; @@ -165,6 +166,10 @@ public void setMergeInfo(SortInfo info) { @Override protected void toThrift(TPlanNode msg) { + // If this fragment has another scan node, this exchange node is serial or not should be decided by the scan + // node. + msg.setIsSerialOperator((isSerialOperator() || fragment.hasSerialScanNode()) + && fragment.useSerialSource(ConnectContext.get())); msg.node_type = TPlanNodeType.EXCHANGE_NODE; msg.exchange_node = new TExchangeNode(); for (TupleId tid : tupleIds) { @@ -224,11 +229,17 @@ public void setRightChildOfBroadcastHashJoin(boolean value) { */ @Override public boolean isSerialOperator() { - return partitionType == TPartitionType.UNPARTITIONED && mergeInfo != null; + return (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isUseSerialExchange() + || partitionType == TPartitionType.UNPARTITIONED) && mergeInfo != null; } @Override public boolean hasSerialChildren() { return isSerialOperator(); } + + @Override + public boolean hasSerialScanChildren() { + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index 0ebd023ed411ee..fe386acdaf2a10 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -511,15 +511,13 @@ public boolean useSerialSource(ConnectContext context) { && !hasNullAwareLeftAntiJoin() // If planRoot is not a serial operator and has serial children, we can use serial source and improve // parallelism of non-serial operators. - && sink instanceof DataStreamSink && !planRoot.isSerialOperator() - && planRoot.hasSerialChildren(); + // For bucket shuffle / colocate join fragment, always use serial source if the bucket scan nodes are + // serial. + && (hasSerialScanNode() || (sink instanceof DataStreamSink && !planRoot.isSerialOperator() + && planRoot.hasSerialChildren())); } - public int getNumBackends() { - return numBackends; - } - - public void setNumBackends(int numBackends) { - this.numBackends = numBackends; + public boolean hasSerialScanNode() { + return planRoot.hasSerialScanChildren(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 14bd34e93e1f43..73768435154b76 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -1388,4 +1388,11 @@ public boolean hasSerialChildren() { } return children.stream().allMatch(PlanNode::hasSerialChildren); } + + public boolean hasSerialScanChildren() { + if (children.isEmpty()) { + return false; + } + return children.stream().anyMatch(PlanNode::hasSerialScanChildren); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index a2583868346704..b4033a0535ef3d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -861,4 +861,9 @@ public boolean isSerialOperator() { < ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() * numScanBackends() || (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isForceToLocalShuffle()); } + + @Override + public boolean hasSerialScanChildren() { + return isSerialOperator(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 833fec1b5a0f00..78493a46ad192a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1890,17 +1890,11 @@ protected void computeFragmentHosts() throws Exception { return scanNode.getId().asInt() == planNodeId; }).findFirst(); - /** - * Ignore storage data distribution iff: - * 1. `parallelExecInstanceNum * numBackends` is larger than scan ranges. - * 2. Use Nereids planner. - */ boolean sharedScan = true; int expectedInstanceNum = Math.min(parallelExecInstanceNum, leftMostNode.getNumInstances()); - boolean ignoreStorageDataDistribution = node.isPresent() - && fragment.useSerialSource(context); - if (node.isPresent() && ignoreStorageDataDistribution) { + boolean ignoreStorageDataDistribution = fragment.useSerialSource(context); + if (ignoreStorageDataDistribution) { expectedInstanceNum = Math.max(expectedInstanceNum, 1); // if have limit and no conjuncts, only need 1 instance to save cpu and // mem resource diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 8b1442b74642e0..bc8e0c779b0ab4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -264,6 +264,8 @@ public class SessionVariable implements Serializable, Writable { public static final String IGNORE_STORAGE_DATA_DISTRIBUTION = "ignore_storage_data_distribution"; + public static final String USE_SERIAL_EXCHANGE = "use_serial_exchange"; + public static final String ENABLE_PARALLEL_SCAN = "enable_parallel_scan"; // Limit the max count of scanners to prevent generate too many scanners. @@ -1107,6 +1109,10 @@ public enum IgnoreSplitType { varType = VariableAnnotation.EXPERIMENTAL, needForward = true) private boolean ignoreStorageDataDistribution = true; + @VariableMgr.VarAttr(name = USE_SERIAL_EXCHANGE, fuzzy = true, + varType = VariableAnnotation.EXPERIMENTAL, needForward = true) + private boolean useSerialExchange = false; + @VariableMgr.VarAttr( name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, description = {"是否在pipelineX引擎上开启local shuffle优化", @@ -2323,6 +2329,7 @@ public void initFuzzyModeVariables() { this.parallelPrepareThreshold = random.nextInt(32) + 1; this.enableCommonExprPushdown = random.nextBoolean(); this.enableLocalExchange = random.nextBoolean(); + this.useSerialExchange = random.nextBoolean(); // This will cause be dead loop, disable it first // this.disableJoinReorder = random.nextBoolean(); this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean(); @@ -4534,6 +4541,10 @@ public TSerdeDialect getSerdeDialect() { } } + public boolean isUseSerialExchange() { + return useSerialExchange && getEnableLocalExchange(); + } + public boolean isEnableCooldownReplicaAffinity() { return enableCooldownReplicaAffinity; }