Skip to content

Commit

Permalink
[fix](mtmv) Fix compensate union all wrongly when query rewrite by ma…
Browse files Browse the repository at this point in the history
…terialized view
  • Loading branch information
seawinde committed Sep 13, 2024
1 parent 6fa6b4f commit 4ccb8ee
Show file tree
Hide file tree
Showing 5 changed files with 620 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.nereids.rules.exploration.mv;

import org.apache.doris.catalog.MTMV;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.jobs.executor.Rewriter;
Expand All @@ -38,6 +39,7 @@
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.VirtualSlotReference;
import org.apache.doris.nereids.trees.expressions.functions.Function;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
Expand Down Expand Up @@ -324,6 +326,46 @@ protected Expression tryRewriteExpression(StructInfo queryStructInfo, Expression
return rewrittenExpression;
}

/**
* Not all query after rewritten successfully can compensate union all
* Such as:
* mv def sql is as following, partition column is a
* select a, b, count(*) from t1 group by a, b
* Query is as following:
* select b, count(*) from t1 group by b, after rewritten by materialized view successfully
* If mv part partition is invalid, can not compensate union all, because result is wrong after
* compensate union all.
*/
@Override
protected boolean canUnionRewrite(Plan queryPlan, MTMV mtmv, CascadesContext cascadesContext) {
// check query plan is contain the partition column
Optional<LogicalAggregate<Plan>> logicalAggregateOptional =
queryPlan.collectFirst(planTreeNode -> planTreeNode instanceof LogicalAggregate);
if (!logicalAggregateOptional.isPresent()) {
return true;
}

List<Expression> groupByExpressions = logicalAggregateOptional.get().getGroupByExpressions();
if (groupByExpressions.isEmpty()) {
// Scalar aggregate can not compensate union all
return false;
}
String relatedCol = mtmv.getMvPartitionInfo().getRelatedCol();
boolean canUnionRewrite = false;
// Check the query plan group by expression contains partition col or not
for (Expression expression : groupByExpressions) {
Expression shuttledExpression =
ExpressionUtils.shuttleExpressionWithLineage(expression, queryPlan, new BitSet());
canUnionRewrite = !shuttledExpression.collectToSet(expr -> expr instanceof SlotReference
&& ((SlotReference) expr).isColumnFromTable()
&& ((SlotReference) expr).getColumn().get().getName().equals(relatedCol)).isEmpty();
if (canUnionRewrite) {
break;
}
}
return canUnionRewrite;
}

/**
* Check query and view aggregate compatibility
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,17 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
return rewriteResults;
}
boolean partitionNeedUnion = needUnionRewrite(invalidPartitions, cascadesContext);
boolean canUnionRewrite = canUnionRewrite(queryPlan,
((AsyncMaterializationContext) materializationContext).getMtmv(),
cascadesContext);
if (partitionNeedUnion && !canUnionRewrite) {
materializationContext.recordFailReason(queryStructInfo,
"need compensate union all, but can not, because the query structInfo",
() -> String.format("mv partition info is %s, and the query plan is %s",
((AsyncMaterializationContext) materializationContext).getMtmv()
.getMvPartitionInfo(), queryPlan.treeString()));
return rewriteResults;
}
final Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, Set<String>>> finalInvalidPartitions =
invalidPartitions;
if (partitionNeedUnion) {
Expand Down Expand Up @@ -377,6 +388,20 @@ protected boolean needUnionRewrite(
&& (!invalidPartitions.key().isEmpty() || !invalidPartitions.value().isEmpty());
}

/**
* Not all query after rewritten successfully can compensate union all
* Such as:
* mv def sql is as following, partition column is a
* select a, b, count(*) from t1 group by a, b
* Query is as following:
* select b, count(*) from t1 group by b, after rewritten by materialized view successfully
* If mv part partition is invalid, can not compensate union all, because result is wrong after
* compensate union all.
*/
protected boolean canUnionRewrite(Plan queryPlan, MTMV mtmv, CascadesContext cascadesContext) {
return true;
}

// Normalize expression such as nullable property and output slot id
protected Plan normalizeExpressions(Plan rewrittenPlan, Plan originPlan) {
if (rewrittenPlan.getOutput().size() != originPlan.getOutput().size()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !query1_0_before --
28

-- !query1_0_after --
28

-- !query1_1_before --
32

-- !query1_1_after --
32

-- !query2_0_before --
a 4
b 28

-- !query2_0_after --
a 2
b 26

-- !query3_0_before --
a 4
b 28

-- !query3_0_after --
a 4
b 28

-- !query4_0_before --
2024-09-12 8
2024-09-13 8
2024-09-14 8
2024-09-15 8

-- !query4_0_after --
2024-09-12 4
2024-09-13 8
2024-09-14 8
2024-09-15 8

-- !query5_0_before --
2024-09-12 8
2024-09-13 8
2024-09-14 8
2024-09-15 8

-- !query5_0_after --
2024-09-12 8
2024-09-13 8
2024-09-14 8
2024-09-15 8

-- !query6_0_before --
a 1
a 1
a 1
a 1
a 1
a 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1

-- !query6_0_after --
a 1
a 1
a 1
a 1
a 1
a 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1

-- !query7_0_before --
a 1
a 1
a 1
a 1
a 1
a 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1

-- !query7_0_after --
a 1
a 1
a 1
a 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1
b 1

Original file line number Diff line number Diff line change
Expand Up @@ -1555,7 +1555,9 @@ class Suite implements GroovyInterceptable {
def mv_rewrite_success_without_check_chosen = { query_sql, mv_name ->
explain {
sql(" memo plan ${query_sql}")
contains("${mv_name} not chose")
check { result ->
result.contains("${mv_name} chose") || result.contains("${mv_name} not chose")
}
}
}

Expand Down Expand Up @@ -1613,7 +1615,9 @@ class Suite implements GroovyInterceptable {

explain {
sql(" memo plan ${query_sql}")
notContains("${mv_name} fail")
check { result ->
result.contains("${mv_name} chose") || result.contains("${mv_name} not chose")
}
}
}

Expand All @@ -1636,8 +1640,7 @@ class Suite implements GroovyInterceptable {

explain {
sql(" memo plan ${query_sql}")
notContains("${mv_name} chose")
notContains("${mv_name} not chose")
contains("${mv_name} fail")
}
}

Expand Down
Loading

0 comments on commit 4ccb8ee

Please sign in to comment.