From 791a2425eeccf2a7529c2d74adc49f9c24adf222 Mon Sep 17 00:00:00 2001 From: feiniaofeiafei Date: Tue, 24 Dec 2024 22:21:53 +0800 Subject: [PATCH] fix comment --- .../doris/nereids/jobs/executor/Rewriter.java | 7 +- .../apache/doris/nereids/rules/RuleType.java | 2 +- .../implementation/AggregateStrategies.java | 11 +- .../rules/rewrite/CheckMultiDistinct.java | 31 +++ ...inctSplit.java => SplitMultiDistinct.java} | 55 ++--- .../functions/agg/SupportMultiDistinct.java | 4 +- .../rules/rewrite/DistinctSplitTest.java | 97 --------- .../rules/rewrite/SplitMultiDistinctTest.java | 191 ++++++++++++++++++ 8 files changed, 264 insertions(+), 134 deletions(-) rename fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/{DistinctSplit.java => SplitMultiDistinct.java} (86%) delete mode 100644 fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/DistinctSplitTest.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/SplitMultiDistinctTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java index 38aebc44154996a..d4ed9c21776224e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java @@ -56,7 +56,7 @@ import org.apache.doris.nereids.rules.rewrite.CountLiteralRewrite; import org.apache.doris.nereids.rules.rewrite.CreatePartitionTopNFromWindow; import org.apache.doris.nereids.rules.rewrite.DeferMaterializeTopNResult; -import org.apache.doris.nereids.rules.rewrite.DistinctSplit; +import org.apache.doris.nereids.rules.rewrite.SplitMultiDistinct; import org.apache.doris.nereids.rules.rewrite.EliminateAggCaseWhen; import org.apache.doris.nereids.rules.rewrite.EliminateAggregate; import org.apache.doris.nereids.rules.rewrite.EliminateAssertNumRows; @@ -445,7 +445,6 @@ public class Rewriter extends AbstractBatchJobExecutor { new CollectCteConsumerOutput() ) ), - // topic("distinct split", topDown(new DistinctSplit())), topic("Collect used column", custom(RuleType.COLLECT_COLUMNS, QueryColumnCollector::new) ) ) @@ -552,8 +551,8 @@ private static List getWholeTreeRewriteJobs( rewriteJobs.addAll(jobs(topic("or expansion", custom(RuleType.OR_EXPANSION, () -> OrExpansion.INSTANCE)))); } - rewriteJobs.addAll(jobs(topic("distinct split", - custom(RuleType.DISTINCT_SPLIT, () -> DistinctSplit.INSTANCE)))); + rewriteJobs.addAll(jobs(topic("split multi distinct", + custom(RuleType.SPLIT_MULTI_DISTINCT, () -> SplitMultiDistinct.INSTANCE)))); if (needSubPathPushDown) { rewriteJobs.addAll(jobs( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index 95f3ff53a573af1..238242c2e1b82ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -321,7 +321,7 @@ public enum RuleType { MERGE_TOP_N(RuleTypeClass.REWRITE), BUILD_AGG_FOR_UNION(RuleTypeClass.REWRITE), COUNT_DISTINCT_REWRITE(RuleTypeClass.REWRITE), - DISTINCT_SPLIT(RuleTypeClass.REWRITE), + SPLIT_MULTI_DISTINCT(RuleTypeClass.REWRITE), INNER_TO_CROSS_JOIN(RuleTypeClass.REWRITE), CROSS_TO_INNER_JOIN(RuleTypeClass.REWRITE), PRUNE_EMPTY_PARTITION(RuleTypeClass.REWRITE), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java index e98a9c6767daeab..0a1b1c4e9b2d30f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/AggregateStrategies.java @@ -54,6 +54,7 @@ import org.apache.doris.nereids.trees.expressions.functions.agg.Min; import org.apache.doris.nereids.trees.expressions.functions.agg.Sum; import org.apache.doris.nereids.trees.expressions.functions.agg.Sum0; +import org.apache.doris.nereids.trees.expressions.functions.agg.SupportMultiDistinct; import org.apache.doris.nereids.trees.expressions.functions.scalar.If; import org.apache.doris.nereids.trees.expressions.literal.Literal; import org.apache.doris.nereids.trees.expressions.literal.NullLiteral; @@ -1808,14 +1809,8 @@ private List getHashAggregatePartitionExpressions( } private AggregateFunction tryConvertToMultiDistinct(AggregateFunction function) { - if (function instanceof Count && function.isDistinct()) { - return ((Count) function).convertToMultiDistinct(); - } else if (function instanceof Sum && function.isDistinct()) { - return ((Sum) function).convertToMultiDistinct(); - } else if (function instanceof Sum0 && function.isDistinct()) { - return ((Sum0) function).convertToMultiDistinct(); - } else if (function instanceof GroupConcat && function.isDistinct()) { - return ((GroupConcat) function).convertToMultiDistinct(); + if (function instanceof SupportMultiDistinct && function.isDistinct()) { + return ((SupportMultiDistinct) function).convertToMultiDistinct(); } return function; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CheckMultiDistinct.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CheckMultiDistinct.java index 4488a94b8d14c02..bc6fd8437239aff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CheckMultiDistinct.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CheckMultiDistinct.java @@ -20,6 +20,7 @@ import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.expressions.OrderExpression; import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction; import org.apache.doris.nereids.trees.expressions.functions.agg.Avg; import org.apache.doris.nereids.trees.expressions.functions.agg.Count; @@ -57,6 +58,36 @@ private LogicalAggregate checkDistinct(LogicalAggregate aggregat } } } + + boolean distinctMultiColumns = false; + for (AggregateFunction func : aggregate.getAggregateFunctions()) { + if (!func.isDistinct()) { + continue; + } + if (func.arity() <= 1) { + continue; + } + for (int i = 1; i < func.arity(); i++) { + if (!func.child(i).getInputSlots().isEmpty() && !(func.child(i) instanceof OrderExpression)) { + // think about group_concat(distinct col_1, ',') + distinctMultiColumns = true; + break; + } + } + if (distinctMultiColumns) { + break; + } + } + + long distinctFunctionNum = 0; + for (AggregateFunction aggregateFunction : aggregate.getAggregateFunctions()) { + distinctFunctionNum += aggregateFunction.isDistinct() ? 1 : 0; + } + + if (distinctMultiColumns && distinctFunctionNum > 1) { + // throw new AnalysisException( + // "The query contains multi count distinct or sum distinct, each can't have multi columns"); + } return aggregate; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctSplit.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SplitMultiDistinct.java similarity index 86% rename from fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctSplit.java rename to fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SplitMultiDistinct.java index b273c5b6ba71f76..56df1485a83829c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DistinctSplit.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SplitMultiDistinct.java @@ -20,7 +20,7 @@ import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.jobs.JobContext; -import org.apache.doris.nereids.rules.rewrite.DistinctSplit.DistinctSplitContext; +import org.apache.doris.nereids.rules.rewrite.SplitMultiDistinct.DistinctSplitContext; import org.apache.doris.nereids.trees.copier.DeepCopierContext; import org.apache.doris.nereids.trees.copier.LogicalPlanDeepCopier; import org.apache.doris.nereids.trees.expressions.Alias; @@ -67,8 +67,8 @@ * +--LogicalAggregate(output:count(distinct b)) * +--LogicalCTEConsumer * */ -public class DistinctSplit extends DefaultPlanRewriter implements CustomRewriter { - public static DistinctSplit INSTANCE = new DistinctSplit(); +public class SplitMultiDistinct extends DefaultPlanRewriter implements CustomRewriter { + public static SplitMultiDistinct INSTANCE = new SplitMultiDistinct(); /**DistinctSplitContext*/ public static class DistinctSplitContext { @@ -111,6 +111,8 @@ public Plan visitLogicalCTEAnchor( @Override public Plan visitLogicalAggregate(LogicalAggregate agg, DistinctSplitContext ctx) { + Plan newChild = agg.child().accept(this, ctx); + agg = agg.withChildren(ImmutableList.of(newChild)); List distinctFuncWithAlias = new ArrayList<>(); List otherAggFuncs = new ArrayList<>(); if (!needTransform((LogicalAggregate) agg, distinctFuncWithAlias, otherAggFuncs)) { @@ -137,18 +139,12 @@ public Plan visitLogicalAggregate(LogicalAggregate agg, Distinct List outputJoinGroupBys = new ArrayList<>(); for (int i = 0; i < distinctFuncWithAlias.size(); ++i) { Expression distinctAggFunc = distinctFuncWithAlias.get(i).child(0); - LogicalCTEConsumer consumer = new LogicalCTEConsumer(ctx.statementContext.getNextRelationId(), - producer.getCteId(), "", producer); - ctx.cascadesContext.putCTEIdToConsumer(consumer); Map producerToConsumerSlotMap = new HashMap<>(); - for (Map.Entry entry : consumer.getConsumerToProducerOutputMap().entrySet()) { - producerToConsumerSlotMap.put(entry.getValue(), entry.getKey()); - } - List replacedGroupBy = ExpressionUtils.replace(cloneAgg.getGroupByExpressions(), - producerToConsumerSlotMap); + List outputExpressions = new ArrayList<>(); + List replacedGroupBy = new ArrayList<>(); + LogicalCTEConsumer consumer = constructConsumerAndReplaceGroupBy(ctx, producer, cloneAgg, outputExpressions, + producerToConsumerSlotMap, replacedGroupBy); Expression newDistinctAggFunc = ExpressionUtils.replace(distinctAggFunc, producerToConsumerSlotMap); - List outputExpressions = replacedGroupBy.stream() - .map(Slot.class::cast).collect(Collectors.toList()); Alias alias = new Alias(newDistinctAggFunc); outputExpressions.add(alias); if (i == 0) { @@ -171,17 +167,11 @@ private static void buildOtherAggFuncAggregate(List otherAggFuncs, Logica if (otherAggFuncs.isEmpty()) { return; } - LogicalCTEConsumer consumer = new LogicalCTEConsumer(ctx.statementContext.getNextRelationId(), - producer.getCteId(), "", producer); - ctx.cascadesContext.putCTEIdToConsumer(consumer); Map producerToConsumerSlotMap = new HashMap<>(); - for (Map.Entry entry : consumer.getConsumerToProducerOutputMap().entrySet()) { - producerToConsumerSlotMap.put(entry.getValue(), entry.getKey()); - } - List replacedGroupBy = ExpressionUtils.replace(cloneAgg.getGroupByExpressions(), - producerToConsumerSlotMap); - List outputExpressions = replacedGroupBy.stream() - .map(Slot.class::cast).collect(Collectors.toList()); + List outputExpressions = new ArrayList<>(); + List replacedGroupBy = new ArrayList<>(); + LogicalCTEConsumer consumer = constructConsumerAndReplaceGroupBy(ctx, producer, cloneAgg, outputExpressions, + producerToConsumerSlotMap, replacedGroupBy); List otherAggFuncAliases = otherAggFuncs.stream() .map(e -> ExpressionUtils.replace(e, producerToConsumerSlotMap)).collect(Collectors.toList()); for (Expression otherAggFuncAlias : otherAggFuncAliases) { @@ -194,6 +184,20 @@ private static void buildOtherAggFuncAggregate(List otherAggFuncs, Logica newAggs.add(newAgg); } + private static LogicalCTEConsumer constructConsumerAndReplaceGroupBy(DistinctSplitContext ctx, + LogicalCTEProducer producer, LogicalAggregate cloneAgg, List outputExpressions, + Map producerToConsumerSlotMap, List replacedGroupBy) { + LogicalCTEConsumer consumer = new LogicalCTEConsumer(ctx.statementContext.getNextRelationId(), + producer.getCteId(), "", producer); + ctx.cascadesContext.putCTEIdToConsumer(consumer); + for (Map.Entry entry : consumer.getConsumerToProducerOutputMap().entrySet()) { + producerToConsumerSlotMap.put(entry.getValue(), entry.getKey()); + } + replacedGroupBy.addAll(ExpressionUtils.replace(cloneAgg.getGroupByExpressions(), producerToConsumerSlotMap)); + outputExpressions.addAll(replacedGroupBy.stream().map(Slot.class::cast).collect(Collectors.toList())); + return consumer; + } + private static boolean isDistinctMultiColumns(AggregateFunction func) { if (func.arity() <= 1) { return false; @@ -230,6 +234,11 @@ private static boolean needTransform(LogicalAggregate agg, List ali if (distinctFunc.size() <= 1) { return false; } + // when this aggregate is not distinctMultiColumns, and group by expressions is not empty + // e.g. sql1: select count(distinct a), count(distinct b) from t1 group by c; + // sql2: select count(distinct a) from t1 group by c; + // the physical plan of sql1 and sql2 is similar, both are 2-phase aggregate, + // so there is no need to do this rewrite if (!distinctMultiColumns && !agg.getGroupByExpressions().isEmpty()) { return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/SupportMultiDistinct.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/SupportMultiDistinct.java index 848c529e5c32b24..9feaf2025c4f639 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/SupportMultiDistinct.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/SupportMultiDistinct.java @@ -17,7 +17,9 @@ package org.apache.doris.nereids.trees.expressions.functions.agg; -/** MultiDistinctTrait*/ +/** aggregate functions which have corresponding MultiDistinctXXX class, + * e.g. SUM,SUM0,COUNT,GROUP_CONCAT + * */ public interface SupportMultiDistinct { AggregateFunction convertToMultiDistinct(); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/DistinctSplitTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/DistinctSplitTest.java deleted file mode 100644 index 41801abd352568b..000000000000000 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/DistinctSplitTest.java +++ /dev/null @@ -1,97 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.rules.rewrite; - -import org.apache.doris.nereids.trees.plans.Plan; -import org.apache.doris.nereids.util.MatchingUtils; -import org.apache.doris.nereids.util.MemoPatternMatchSupported; -import org.apache.doris.nereids.util.PlanChecker; -import org.apache.doris.utframe.TestWithFeService; - -import org.junit.jupiter.api.Test; - -public class DistinctSplitTest extends TestWithFeService implements MemoPatternMatchSupported { - @Override - protected void runBeforeAll() throws Exception { - createDatabase("test"); - createTable("create table test.test_distinct_multi(a int, b int, c int, d varchar(10), e date)" - + "distributed by hash(a) properties('replication_num'='1');"); - connectContext.setDatabase("test"); - connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION"); - } - - @Test - void multiCountWithoutGby() { - String sql = "select count(distinct b), count(distinct a) from test_distinct_multi"; - PlanChecker.from(connectContext).checkExplain(sql, planner -> { - Plan plan = planner.getOptimizedPlan(); - MatchingUtils.assertMatches(plan, physicalCTEAnchor(physicalCTEProducer(any()), physicalResultSink(physicalProject(physicalNestedLoopJoin( - physicalHashAggregate(physicalDistribute(physicalHashAggregate(physicalHashAggregate(physicalDistribute(physicalHashAggregate(any())))))), - physicalDistribute(physicalHashAggregate(physicalDistribute(physicalHashAggregate(physicalHashAggregate(physicalDistribute(physicalHashAggregate(any()))))))) - ))))); - }); - } - - @Test - void multiSumWithoutGby() { - String sql = "select sum(distinct b), sum(distinct a) from test_distinct_multi"; - PlanChecker.from(connectContext).checkExplain(sql, planner -> { - Plan plan = planner.getOptimizedPlan(); - MatchingUtils.assertMatches(plan, physicalCTEAnchor(physicalCTEProducer(any()), physicalResultSink(physicalProject(physicalNestedLoopJoin( - physicalHashAggregate(physicalDistribute(physicalHashAggregate(physicalHashAggregate(physicalDistribute(physicalHashAggregate(any())))))), - physicalDistribute(physicalHashAggregate(physicalDistribute(physicalHashAggregate(physicalHashAggregate(physicalDistribute(physicalHashAggregate(any()))))))) - ))))); - }); - } - - @Test - void sumCountWithoutGby() { - String sql = "select sum(distinct b), count(distinct a) from test_distinct_multi"; - PlanChecker.from(connectContext).checkExplain(sql, planner -> { - Plan plan = planner.getOptimizedPlan(); - MatchingUtils.assertMatches(plan, physicalCTEAnchor(physicalCTEProducer(any()), physicalResultSink(physicalProject(physicalNestedLoopJoin( - physicalHashAggregate(physicalDistribute(physicalHashAggregate(physicalHashAggregate(physicalDistribute(physicalHashAggregate(any())))))), - physicalDistribute(physicalHashAggregate(physicalDistribute(physicalHashAggregate(physicalHashAggregate(physicalDistribute(physicalHashAggregate(any()))))))) - ))))); - }); - } - - @Test - void countMultiColumnsWithoutGby() { - String sql = "select count(distinct b,c), count(distinct a,b) from test_distinct_multi"; - PlanChecker.from(connectContext).checkExplain(sql, planner -> { - Plan plan = planner.getOptimizedPlan(); - MatchingUtils.assertMatches(plan, physicalCTEAnchor(physicalCTEProducer(any()), physicalResultSink(physicalProject(physicalNestedLoopJoin( - physicalHashAggregate(physicalHashAggregate(physicalDistribute(physicalHashAggregate(any())))), - physicalDistribute(physicalHashAggregate(physicalHashAggregate(physicalDistribute(physicalHashAggregate(any()))))) - ))))); - }); - } - - @Test - void countMultiColumnsWithGby() { - String sql = "select count(distinct b,c), count(distinct a,b) from test_distinct_multi group by d"; - PlanChecker.from(connectContext).checkExplain(sql, planner -> { - Plan plan = planner.getOptimizedPlan(); - MatchingUtils.assertMatches(plan, physicalCTEAnchor(physicalCTEProducer(any()), physicalResultSink(physicalDistribute(physicalProject(physicalHashJoin( - physicalHashAggregate(physicalHashAggregate(physicalDistribute(physicalHashAggregate(any())))), - physicalHashAggregate(physicalHashAggregate(physicalDistribute(physicalHashAggregate(any())))) - )))))); - }); - } -} diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/SplitMultiDistinctTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/SplitMultiDistinctTest.java new file mode 100644 index 000000000000000..074135695a1a316 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/SplitMultiDistinctTest.java @@ -0,0 +1,191 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.rewrite; + +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.util.MatchingUtils; +import org.apache.doris.nereids.util.MemoPatternMatchSupported; +import org.apache.doris.nereids.util.PlanChecker; +import org.apache.doris.utframe.TestWithFeService; + +import org.junit.jupiter.api.Test; + +public class SplitMultiDistinctTest extends TestWithFeService implements MemoPatternMatchSupported { + @Override + protected void runBeforeAll() throws Exception { + createDatabase("test"); + createTable("create table test.test_distinct_multi(a int, b int, c int, d varchar(10), e date)" + + "distributed by hash(a) properties('replication_num'='1');"); + connectContext.setDatabase("test"); + connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION"); + } + + @Test + void multiCountWithoutGby() { + String sql = "select count(distinct b), count(distinct a) from test_distinct_multi"; + PlanChecker.from(connectContext).checkExplain(sql, planner -> { + Plan plan = planner.getOptimizedPlan(); + MatchingUtils.assertMatches(plan, + physicalCTEAnchor( + physicalCTEProducer(any()), + physicalResultSink( + physicalProject( + physicalNestedLoopJoin( + physicalHashAggregate( + physicalDistribute( + physicalHashAggregate( + physicalHashAggregate( + physicalDistribute( + physicalHashAggregate(any())))))), + physicalDistribute( + physicalHashAggregate( + physicalDistribute( + physicalHashAggregate( + physicalHashAggregate( + physicalDistribute( + physicalHashAggregate(any()))))))) + ) + ) + ) + ) + ); + }); + } + + @Test + void multiSumWithoutGby() { + String sql = "select sum(distinct b), sum(distinct a) from test_distinct_multi"; + PlanChecker.from(connectContext).checkExplain(sql, planner -> { + Plan plan = planner.getOptimizedPlan(); + MatchingUtils.assertMatches(plan, + physicalCTEAnchor( + physicalCTEProducer(any()), + physicalResultSink( + physicalProject( + physicalNestedLoopJoin( + physicalHashAggregate( + physicalDistribute( + physicalHashAggregate( + physicalHashAggregate( + physicalDistribute( + physicalHashAggregate(any())))))), + physicalDistribute( + physicalHashAggregate( + physicalDistribute( + physicalHashAggregate( + physicalHashAggregate( + physicalDistribute( + physicalHashAggregate(any()))))))) + ) + ) + ) + ) + ); + }); + } + + @Test + void sumCountWithoutGby() { + String sql = "select sum(distinct b), count(distinct a) from test_distinct_multi"; + PlanChecker.from(connectContext).checkExplain(sql, planner -> { + Plan plan = planner.getOptimizedPlan(); + MatchingUtils.assertMatches(plan, + physicalCTEAnchor( + physicalCTEProducer(any()), + physicalResultSink( + physicalProject( + physicalNestedLoopJoin( + physicalHashAggregate( + physicalDistribute( + physicalHashAggregate( + physicalHashAggregate( + physicalDistribute( + physicalHashAggregate(any())))))), + physicalDistribute( + physicalHashAggregate( + physicalDistribute( + physicalHashAggregate( + physicalHashAggregate( + physicalDistribute( + physicalHashAggregate(any()))))))) + ) + ) + ) + ) + ); + }); + } + + @Test + void countMultiColumnsWithoutGby() { + String sql = "select count(distinct b,c), count(distinct a,b) from test_distinct_multi"; + PlanChecker.from(connectContext).checkExplain(sql, planner -> { + Plan plan = planner.getOptimizedPlan(); + MatchingUtils.assertMatches(plan, + physicalCTEAnchor( + physicalCTEProducer(any()), + physicalResultSink( + physicalProject( + physicalNestedLoopJoin( + physicalHashAggregate( + physicalHashAggregate( + physicalDistribute( + physicalHashAggregate(any())))), + physicalDistribute( + physicalHashAggregate( + physicalHashAggregate( + physicalDistribute( + physicalHashAggregate(any()))))) + ) + ) + ) + ) + ); + }); + } + + @Test + void countMultiColumnsWithGby() { + String sql = "select count(distinct b,c), count(distinct a,b) from test_distinct_multi group by d"; + PlanChecker.from(connectContext).checkExplain(sql, planner -> { + Plan plan = planner.getOptimizedPlan(); + MatchingUtils.assertMatches(plan, + physicalCTEAnchor( + physicalCTEProducer( + any()), + physicalResultSink( + physicalDistribute( + physicalProject( + physicalHashJoin( + physicalHashAggregate( + physicalHashAggregate( + physicalDistribute( + physicalHashAggregate(any())))), + physicalHashAggregate( + physicalHashAggregate( + physicalDistribute( + physicalHashAggregate(any())))) + ) + ) + ) + ) + ) + ); + }); + } +}