From 633f531176f305e93e5bea074c00c224a8b2e16d Mon Sep 17 00:00:00 2001 From: lw112 <131352377+felixwluo@users.noreply.github.com> Date: Thu, 17 Oct 2024 18:07:23 +0800 Subject: [PATCH] [fix](Nereids) fixed the limit offset error (#39316) (#41936) cherry-pick from master #39316 --- .../translator/PhysicalPlanTranslator.java | 33 +++++++++++++-- .../post/AddOffsetIntoDistribute.java | 42 ------------------- .../processor/post/PlanPostProcessors.java | 1 - .../apache/doris/planner/ExchangeNode.java | 6 +++ .../sub_query_correlated.out | 9 ---- .../data/nereids_syntax_p0/test_limit.out | 7 ++++ .../nereids_syntax_p0/test_limit.groovy | 39 +++++++++++++++-- 7 files changed, 77 insertions(+), 60 deletions(-) delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/AddOffsetIntoDistribute.java create mode 100644 regression-test/data/nereids_syntax_p0/test_limit.out diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 64bcc550183930..6f9229e06bd554 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -1560,10 +1560,35 @@ public PlanFragment visitPhysicalNestedLoopJoin( public PlanFragment visitPhysicalLimit(PhysicalLimit physicalLimit, PlanTranslatorContext context) { PlanFragment inputFragment = physicalLimit.child(0).accept(this, context); PlanNode child = inputFragment.getPlanRoot(); - child.setLimit(MergeLimits.mergeLimit(physicalLimit.getLimit(), physicalLimit.getOffset(), child.getLimit())); - // TODO: plan node don't support limit - // child.setOffset(MergeLimits.mergeOffset(physicalLimit.getOffset(), child.getOffset())); - updateLegacyPlanIdToPhysicalPlan(child, physicalLimit); + + if (physicalLimit.getPhase().isLocal()) { + child.setLimit(MergeLimits.mergeLimit(physicalLimit.getLimit(), physicalLimit.getOffset(), + child.getLimit())); + } else if (physicalLimit.getPhase().isGlobal()) { + if (!(child instanceof ExchangeNode)) { + ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), child); + exchangeNode.setLimit(physicalLimit.getLimit()); + exchangeNode.setOffset(physicalLimit.getOffset()); + exchangeNode.setPartitionType(TPartitionType.UNPARTITIONED); + exchangeNode.setNumInstances(1); + PlanFragment fragment = new PlanFragment(context.nextFragmentId(), exchangeNode, + DataPartition.UNPARTITIONED); + inputFragment.setDestination(exchangeNode); + inputFragment.setOutputPartition(DataPartition.UNPARTITIONED); + DataStreamSink sink = new DataStreamSink(exchangeNode.getId()); + sink.setOutputPartition(DataPartition.UNPARTITIONED); + inputFragment.setSink(sink); + context.addPlanFragment(fragment); + inputFragment = fragment; + } else { + ExchangeNode exchangeNode = (ExchangeNode) child; + exchangeNode.setLimit(MergeLimits.mergeLimit(physicalLimit.getLimit(), physicalLimit.getOffset(), + exchangeNode.getLimit())); + exchangeNode.setOffset(MergeLimits.mergeOffset(physicalLimit.getOffset(), exchangeNode.getOffset())); + } + } + + updateLegacyPlanIdToPhysicalPlan(inputFragment.getPlanRoot(), physicalLimit); return inputFragment; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/AddOffsetIntoDistribute.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/AddOffsetIntoDistribute.java deleted file mode 100644 index dc8173212982aa..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/AddOffsetIntoDistribute.java +++ /dev/null @@ -1,42 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.processor.post; - -import org.apache.doris.nereids.CascadesContext; -import org.apache.doris.nereids.properties.DistributionSpecGather; -import org.apache.doris.nereids.trees.plans.Plan; -import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute; -import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit; - -/** - * Offset just can be in exchangeNode. - * So, `offset` action is after `limit` action. - * So, `limit` should update with `offset + limit` - */ -public class AddOffsetIntoDistribute extends PlanPostProcessor { - @Override - public Plan visitPhysicalLimit(PhysicalLimit limit, CascadesContext context) { - limit = (PhysicalLimit) super.visit(limit, context); - if (limit.getPhase().isLocal() || limit.getOffset() == 0) { - return limit; - } - - return new PhysicalDistribute<>(DistributionSpecGather.INSTANCE, - limit.withLimit(limit.getLimit() + limit.getOffset())).copyStatsAndGroupIdFrom(limit); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java index c5b2cf8456c286..a74902c42e1ed2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java @@ -62,7 +62,6 @@ public List getProcessors() { builder.add(new ColumnPruningPostProcessor()); builder.add(new MergeProjectPostProcessor()); builder.add(new RecomputeLogicalPropertiesProcessor()); - builder.add(new AddOffsetIntoDistribute()); builder.add(new TopNScanOpt()); // after generate rf, DO NOT replace PLAN NODE builder.add(new FragmentProcessor()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java index 7c412f3ce88b7e..dc2175da85313f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java @@ -29,6 +29,7 @@ import org.apache.doris.statistics.StatsRecursiveDerive; import org.apache.doris.thrift.TExchangeNode; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TPartitionType; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; @@ -64,6 +65,7 @@ public class ExchangeNode extends PlanNode { private SortInfo mergeInfo; private boolean isRightChildOfBroadcastHashJoin = false; + private TPartitionType partitionType; /** * use for Nereids only. @@ -77,6 +79,10 @@ public ExchangeNode(PlanNodeId id, PlanNode inputNode) { computeTupleIds(); } + public void setPartitionType(TPartitionType partitionType) { + this.partitionType = partitionType; + } + /** * Create ExchangeNode that consumes output of inputNode. * An ExchangeNode doesn't have an input node as a child, which is why we diff --git a/regression-test/data/nereids_syntax_p0/sub_query_correlated.out b/regression-test/data/nereids_syntax_p0/sub_query_correlated.out index e7758d02a5f660..d57a673339b517 100644 --- a/regression-test/data/nereids_syntax_p0/sub_query_correlated.out +++ b/regression-test/data/nereids_syntax_p0/sub_query_correlated.out @@ -187,15 +187,6 @@ -- !exist_corr_limit0 -- -- !exist_unCorrelated_limit1_offset1 -- -1 2 -1 3 -2 4 -2 5 -3 3 -3 4 -20 2 -22 3 -24 4 -- !exist_unCorrelated_limit0_offset1 -- diff --git a/regression-test/data/nereids_syntax_p0/test_limit.out b/regression-test/data/nereids_syntax_p0/test_limit.out new file mode 100644 index 00000000000000..5ef4497f2f1f85 --- /dev/null +++ b/regression-test/data/nereids_syntax_p0/test_limit.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !limit1 -- +2 7844 TURNER SALESMAN 7698 1981-09-08 1500.0 0.0 30 + +-- !lmit2 -- +3 7934 MILLER CLERK 7782 1982-01-23 1300.0 0.0 10 + diff --git a/regression-test/suites/nereids_syntax_p0/test_limit.groovy b/regression-test/suites/nereids_syntax_p0/test_limit.groovy index 64e48195a178d9..aae261624421ed 100644 --- a/regression-test/suites/nereids_syntax_p0/test_limit.groovy +++ b/regression-test/suites/nereids_syntax_p0/test_limit.groovy @@ -16,10 +16,6 @@ // under the License. suite("test_limit") { - sql 'set enable_nereids_planner=true' - sql 'set enable_fallback_to_original_planner=false' - - sql """ drop table if exists test1 """ @@ -36,4 +32,39 @@ suite("test_limit") { sql "select * from test1 limit 2 offset 1" result([[1]]) } + + sql """ + drop table if exists row_number_limit_tbl; + """ + sql """ + CREATE TABLE row_number_limit_tbl ( + k1 INT NULL, + k2 VARCHAR(255) NULL, + k3 VARCHAR(255) NULL, + k4 INT NULL, + k5 VARCHAR(255) NULL, + k6 FLOAT NULL, + k7 FLOAT NULL, + k8 INT NULL + ) ENGINE=OLAP + DUPLICATE KEY(k1, k2) + DISTRIBUTED BY HASH(k1) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql """ INSERT INTO row_number_limit_tbl VALUES (7788, 'SCOTT', 'ANALYST', 7566, '1987-04-19', 3000, 0, 20); """ + sql """ INSERT INTO row_number_limit_tbl VALUES (7844, 'TURNER', 'SALESMAN', 7698, '1981-09-08', 1500, 0, 30); """ + qt_limit1 """ + select row_number() over(order by k6 desc) k6s, t.* from row_number_limit_tbl t order by k6s limit 1 offset 1; + """ + + sql """ truncate table row_number_limit_tbl; """ + sql """ INSERT INTO row_number_limit_tbl VALUES (7788, 'SCOTT', 'ANALYST', 7566, '1987-04-19', 3000, 0, 20); """ + sql """ INSERT INTO row_number_limit_tbl VALUES (7844, 'TURNER', 'SALESMAN', 7698, '1981-09-08', 1500, 0, 30); """ + sql """ INSERT INTO row_number_limit_tbl VALUES (7934, 'MILLER', 'CLERK', 7782, '1982-01-23', 1300, 0, 10); """ + + qt_lmit2 """ + select row_number() over(order by k6 desc) k6s, t.* from row_number_limit_tbl t limit 1 offset 2; + """ }