Skip to content

Commit

Permalink
[fix](nereids)disable parallel scan in some case (apache#25089)
Browse files Browse the repository at this point in the history
  • Loading branch information
starocean999 authored Oct 11, 2023
1 parent df7724d commit e9554e3
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.doris.nereids.properties.DistributionSpecStorageAny;
import org.apache.doris.nereids.properties.DistributionSpecStorageGather;
import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.implementation.LogicalWindowToPhysicalWindow.WindowFrameGroup;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.nereids.trees.UnaryNode;
Expand Down Expand Up @@ -1537,6 +1538,12 @@ public PlanFragment visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends P
PartitionSortNode partitionSortNode = translatePartitionSortNode(
partitionTopN, inputFragment.getPlanRoot(), context);
addPlanRoot(inputFragment, partitionSortNode, partitionTopN);
// in pipeline engine, we use parallel scan by default, but it broke the rule of data distribution
// we need turn of parallel scan to ensure to get correct result.
// TODO: nereids forbid all parallel scan under PhysicalSetOperation temporary
if (findOlapScanNodesByPassExchangeAndJoinNode(inputFragment.getPlanRoot())) {
inputFragment.setHasColocatePlanNode(true);
}
return inputFragment;
}

Expand Down Expand Up @@ -1737,6 +1744,14 @@ public PlanFragment visitPhysicalSetOperation(
setPlanRoot(setOperationFragment, setOperationNode, setOperation);
}

// in pipeline engine, we use parallel scan by default, but it broke the rule of data distribution
// we need turn of parallel scan to ensure to get correct result.
// TODO: nereids forbid all parallel scan under PhysicalSetOperation temporary
if (!setOperation.getPhysicalProperties().equals(PhysicalProperties.ANY)
&& findOlapScanNodesByPassExchangeAndJoinNode(setOperationFragment.getPlanRoot())) {
setOperationFragment.setHasColocatePlanNode(true);
}

return setOperationFragment;
}

Expand Down Expand Up @@ -1994,6 +2009,13 @@ public PlanFragment visitPhysicalWindow(PhysicalWindow<? extends Plan> physicalW
bufferedTupleDesc
);
inputPlanFragment.addPlanRoot(analyticEvalNode);

// in pipeline engine, we use parallel scan by default, but it broke the rule of data distribution
// we need turn of parallel scan to ensure to get correct result.
// TODO: nereids forbid all parallel scan under PhysicalSetOperation temporary
if (findOlapScanNodesByPassExchangeAndJoinNode(inputPlanFragment.getPlanRoot())) {
inputPlanFragment.setHasColocatePlanNode(true);
}
return inputPlanFragment;
}

Expand Down Expand Up @@ -2359,4 +2381,13 @@ private PhysicalCTEConsumer getCTEConsumerChild(PhysicalPlan root) {
return getCTEConsumerChild((PhysicalPlan) root.child(0));
}
}

private boolean findOlapScanNodesByPassExchangeAndJoinNode(PlanNode root) {
if (root instanceof OlapScanNode) {
return true;
} else if (!(root instanceof JoinNodeBase || root instanceof ExchangeNode)) {
return root.getChildren().stream().anyMatch(child -> findOlapScanNodesByPassExchangeAndJoinNode(child));
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ public String getExplainString(TExplainLevel explainLevel) {
}
str.append("\n");
str.append(" PARTITION: " + dataPartition.getExplainString(explainLevel) + "\n");
str.append(" HAS_COLO_PLAN_NODE: " + hasColocatePlanNode + "\n");
str.append("\n");
if (sink != null) {
str.append(sink.getExplainString(" ", explainLevel) + "\n");
}
Expand Down
4 changes: 4 additions & 0 deletions regression-test/data/performance_p0/redundant_conjuncts.out
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ PLAN FRAGMENT 0
`v1`
PARTITION: HASH_PARTITIONED: `default_cluster:regression_test_performance_p0`.`redundant_conjuncts`.`k1`

HAS_COLO_PLAN_NODE: false

VRESULT SINK
MYSQL_PROTOCAL

Expand All @@ -21,6 +23,8 @@ PLAN FRAGMENT 0
`v1`
PARTITION: HASH_PARTITIONED: `default_cluster:regression_test_performance_p0`.`redundant_conjuncts`.`k1`

HAS_COLO_PLAN_NODE: false

VRESULT SINK
MYSQL_PROTOCAL

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_disable_parallel_scan") {
sql "SET enable_nereids_planner=true"
sql "SET enable_fallback_to_original_planner=false"
sql """drop table if exists sequence_count_test3;"""
sql """CREATE TABLE sequence_count_test3(
`uid` int COMMENT 'user id',
`date` datetime COMMENT 'date time',
`number` int NULL COMMENT 'number'
)
DUPLICATE KEY(uid)
DISTRIBUTED BY HASH(uid) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);"""
explain {
sql("""
SELECT
uid,
DATE,
e.NUMBER AS EVENT_GROUP,
CASE
WHEN (
(
UNIX_TIMESTAMP( DATE ) - (
lag ( UNIX_TIMESTAMP( DATE ), 1, 0 ) over ( PARTITION BY uid ORDER BY DATE )
)
) > 600
) THEN
1 ELSE 0
END AS SESSION_FLAG
FROM
sequence_count_test3 e
""")
contains "HAS_COLO_PLAN_NODE: true"
}

sql """drop table if exists sequence_count_test3;"""
}

0 comments on commit e9554e3

Please sign in to comment.