From e9554e36a8d2e946708d2132688722fc9491adb0 Mon Sep 17 00:00:00 2001 From: starocean999 <40539150+starocean999@users.noreply.github.com> Date: Wed, 11 Oct 2023 16:32:09 +0800 Subject: [PATCH] [fix](nereids)disable parallel scan in some case (#25089) --- .../translator/PhysicalPlanTranslator.java | 31 ++++++++++ .../apache/doris/planner/PlanFragment.java | 2 + .../performance_p0/redundant_conjuncts.out | 4 ++ .../test_disable_parallel_scan.groovy | 56 +++++++++++++++++++ 4 files changed, 93 insertions(+) create mode 100644 regression-test/suites/nereids_p0/test_disable_parallel_scan.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 8dd0ad742ebfa7..7473415407b72c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -63,6 +63,7 @@ import org.apache.doris.nereids.properties.DistributionSpecStorageAny; import org.apache.doris.nereids.properties.DistributionSpecStorageGather; import org.apache.doris.nereids.properties.OrderKey; +import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.rules.implementation.LogicalWindowToPhysicalWindow.WindowFrameGroup; import org.apache.doris.nereids.stats.StatsErrorEstimator; import org.apache.doris.nereids.trees.UnaryNode; @@ -1537,6 +1538,12 @@ public PlanFragment visitPhysicalPartitionTopN(PhysicalPartitionTopN physicalW bufferedTupleDesc ); inputPlanFragment.addPlanRoot(analyticEvalNode); + + // in pipeline engine, we use parallel scan by default, but it broke the rule of data distribution + // we need turn of parallel scan to ensure to get correct result. + // TODO: nereids forbid all parallel scan under PhysicalSetOperation temporary + if (findOlapScanNodesByPassExchangeAndJoinNode(inputPlanFragment.getPlanRoot())) { + inputPlanFragment.setHasColocatePlanNode(true); + } return inputPlanFragment; } @@ -2359,4 +2381,13 @@ private PhysicalCTEConsumer getCTEConsumerChild(PhysicalPlan root) { return getCTEConsumerChild((PhysicalPlan) root.child(0)); } } + + private boolean findOlapScanNodesByPassExchangeAndJoinNode(PlanNode root) { + if (root instanceof OlapScanNode) { + return true; + } else if (!(root instanceof JoinNodeBase || root instanceof ExchangeNode)) { + return root.getChildren().stream().anyMatch(child -> findOlapScanNodesByPassExchangeAndJoinNode(child)); + } + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java index 6336799b460b74..26c09e439e66d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java @@ -326,6 +326,8 @@ public String getExplainString(TExplainLevel explainLevel) { } str.append("\n"); str.append(" PARTITION: " + dataPartition.getExplainString(explainLevel) + "\n"); + str.append(" HAS_COLO_PLAN_NODE: " + hasColocatePlanNode + "\n"); + str.append("\n"); if (sink != null) { str.append(sink.getExplainString(" ", explainLevel) + "\n"); } diff --git a/regression-test/data/performance_p0/redundant_conjuncts.out b/regression-test/data/performance_p0/redundant_conjuncts.out index 6a7fd4fd9318ac..e0ce1f3f75cd38 100644 --- a/regression-test/data/performance_p0/redundant_conjuncts.out +++ b/regression-test/data/performance_p0/redundant_conjuncts.out @@ -5,6 +5,8 @@ PLAN FRAGMENT 0 `v1` PARTITION: HASH_PARTITIONED: `default_cluster:regression_test_performance_p0`.`redundant_conjuncts`.`k1` + HAS_COLO_PLAN_NODE: false + VRESULT SINK MYSQL_PROTOCAL @@ -21,6 +23,8 @@ PLAN FRAGMENT 0 `v1` PARTITION: HASH_PARTITIONED: `default_cluster:regression_test_performance_p0`.`redundant_conjuncts`.`k1` + HAS_COLO_PLAN_NODE: false + VRESULT SINK MYSQL_PROTOCAL diff --git a/regression-test/suites/nereids_p0/test_disable_parallel_scan.groovy b/regression-test/suites/nereids_p0/test_disable_parallel_scan.groovy new file mode 100644 index 00000000000000..24c1b256c477b5 --- /dev/null +++ b/regression-test/suites/nereids_p0/test_disable_parallel_scan.groovy @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one + // or more contributor license agreements. See the NOTICE file + // distributed with this work for additional information + // regarding copyright ownership. The ASF licenses this file + // to you under the Apache License, Version 2.0 (the + // "License"); you may not use this file except in compliance + // with the License. You may obtain a copy of the License at + // + // http://www.apache.org/licenses/LICENSE-2.0 + // + // Unless required by applicable law or agreed to in writing, + // software distributed under the License is distributed on an + // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + // KIND, either express or implied. See the License for the + // specific language governing permissions and limitations + // under the License. + + suite("test_disable_parallel_scan") { + sql "SET enable_nereids_planner=true" + sql "SET enable_fallback_to_original_planner=false" + sql """drop table if exists sequence_count_test3;""" + sql """CREATE TABLE sequence_count_test3( + `uid` int COMMENT 'user id', + `date` datetime COMMENT 'date time', + `number` int NULL COMMENT 'number' + ) + DUPLICATE KEY(uid) + DISTRIBUTED BY HASH(uid) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + );""" + explain { + sql(""" + SELECT + uid, + DATE, + e.NUMBER AS EVENT_GROUP, + CASE + + WHEN ( + ( + UNIX_TIMESTAMP( DATE ) - ( + lag ( UNIX_TIMESTAMP( DATE ), 1, 0 ) over ( PARTITION BY uid ORDER BY DATE ) + ) + ) > 600 + ) THEN + 1 ELSE 0 + END AS SESSION_FLAG + FROM + sequence_count_test3 e + """) + contains "HAS_COLO_PLAN_NODE: true" + } + + sql """drop table if exists sequence_count_test3;""" + } \ No newline at end of file