Skip to content

Commit

Permalink
branch-3.0: [Improvement](shuffle) Use a knob to decide whether a ser…
Browse files Browse the repository at this point in the history
…ial exchange… #44676 (#44731)

Cherry-picked from #44676

Co-authored-by: Gabriel <[email protected]>
  • Loading branch information
github-actions[bot] and Gabriel39 authored Dec 2, 2024
1 parent 7337930 commit cb5a0b7
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.statistics.StatsRecursiveDerive;
import org.apache.doris.thrift.TExchangeNode;
Expand Down Expand Up @@ -165,6 +166,10 @@ public void setMergeInfo(SortInfo info) {

@Override
protected void toThrift(TPlanNode msg) {
// If this fragment has another scan node, this exchange node is serial or not should be decided by the scan
// node.
msg.setIsSerialOperator((isSerialOperator() || fragment.hasSerialScanNode())
&& fragment.useSerialSource(ConnectContext.get()));
msg.node_type = TPlanNodeType.EXCHANGE_NODE;
msg.exchange_node = new TExchangeNode();
for (TupleId tid : tupleIds) {
Expand Down Expand Up @@ -224,11 +229,17 @@ public void setRightChildOfBroadcastHashJoin(boolean value) {
*/
@Override
public boolean isSerialOperator() {
return partitionType == TPartitionType.UNPARTITIONED && mergeInfo != null;
return (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isUseSerialExchange()
|| partitionType == TPartitionType.UNPARTITIONED) && mergeInfo != null;
}

@Override
public boolean hasSerialChildren() {
return isSerialOperator();
}

@Override
public boolean hasSerialScanChildren() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -511,15 +511,13 @@ public boolean useSerialSource(ConnectContext context) {
&& !hasNullAwareLeftAntiJoin()
// If planRoot is not a serial operator and has serial children, we can use serial source and improve
// parallelism of non-serial operators.
&& sink instanceof DataStreamSink && !planRoot.isSerialOperator()
&& planRoot.hasSerialChildren();
// For bucket shuffle / colocate join fragment, always use serial source if the bucket scan nodes are
// serial.
&& (hasSerialScanNode() || (sink instanceof DataStreamSink && !planRoot.isSerialOperator()
&& planRoot.hasSerialChildren()));
}

public int getNumBackends() {
return numBackends;
}

public void setNumBackends(int numBackends) {
this.numBackends = numBackends;
public boolean hasSerialScanNode() {
return planRoot.hasSerialScanChildren();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1388,4 +1388,11 @@ public boolean hasSerialChildren() {
}
return children.stream().allMatch(PlanNode::hasSerialChildren);
}

public boolean hasSerialScanChildren() {
if (children.isEmpty()) {
return false;
}
return children.stream().anyMatch(PlanNode::hasSerialScanChildren);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -861,4 +861,9 @@ public boolean isSerialOperator() {
< ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() * numScanBackends()
|| (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().isForceToLocalShuffle());
}

@Override
public boolean hasSerialScanChildren() {
return isSerialOperator();
}
}
10 changes: 2 additions & 8 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1890,17 +1890,11 @@ protected void computeFragmentHosts() throws Exception {
return scanNode.getId().asInt() == planNodeId;
}).findFirst();

/**
* Ignore storage data distribution iff:
* 1. `parallelExecInstanceNum * numBackends` is larger than scan ranges.
* 2. Use Nereids planner.
*/
boolean sharedScan = true;
int expectedInstanceNum = Math.min(parallelExecInstanceNum,
leftMostNode.getNumInstances());
boolean ignoreStorageDataDistribution = node.isPresent()
&& fragment.useSerialSource(context);
if (node.isPresent() && ignoreStorageDataDistribution) {
boolean ignoreStorageDataDistribution = fragment.useSerialSource(context);
if (ignoreStorageDataDistribution) {
expectedInstanceNum = Math.max(expectedInstanceNum, 1);
// if have limit and no conjuncts, only need 1 instance to save cpu and
// mem resource
Expand Down
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String IGNORE_STORAGE_DATA_DISTRIBUTION = "ignore_storage_data_distribution";

public static final String USE_SERIAL_EXCHANGE = "use_serial_exchange";

public static final String ENABLE_PARALLEL_SCAN = "enable_parallel_scan";

// Limit the max count of scanners to prevent generate too many scanners.
Expand Down Expand Up @@ -1107,6 +1109,10 @@ public enum IgnoreSplitType {
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
private boolean ignoreStorageDataDistribution = true;

@VariableMgr.VarAttr(name = USE_SERIAL_EXCHANGE, fuzzy = true,
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
private boolean useSerialExchange = false;

@VariableMgr.VarAttr(
name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL,
description = {"是否在pipelineX引擎上开启local shuffle优化",
Expand Down Expand Up @@ -2327,6 +2333,7 @@ public void initFuzzyModeVariables() {
this.parallelPrepareThreshold = random.nextInt(32) + 1;
this.enableCommonExprPushdown = random.nextBoolean();
this.enableLocalExchange = random.nextBoolean();
this.useSerialExchange = random.nextBoolean();
// This will cause be dead loop, disable it first
// this.disableJoinReorder = random.nextBoolean();
this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean();
Expand Down Expand Up @@ -4535,6 +4542,10 @@ public boolean isEnableCooldownReplicaAffinity() {
return enableCooldownReplicaAffinity;
}

public boolean isUseSerialExchange() {
return useSerialExchange && getEnableLocalExchange();
}

public void setDisableInvertedIndexV1ForVaraint(boolean disableInvertedIndexV1ForVaraint) {
this.disableInvertedIndexV1ForVaraint = disableInvertedIndexV1ForVaraint;
}
Expand Down

0 comments on commit cb5a0b7

Please sign in to comment.