Skip to content

Commit

Permalink
[pick](mtmv) Pick three PR: #44779 #44786 #44857 (#45130)
Browse files Browse the repository at this point in the history
  • Loading branch information
seawinde authored Dec 9, 2024
1 parent cd40d3d commit e70f7f8
Show file tree
Hide file tree
Showing 20 changed files with 1,172 additions and 214 deletions.
14 changes: 11 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,18 @@ public class MTMVCache {
// The materialized view plan which should be optimized by the same rules to query
// and will remove top sink and unused sort
private final Plan logicalPlan;
// The original plan of mv def sql
// The original rewritten plan of mv def sql
private final Plan originalPlan;
// The analyzed plan of mv def sql, which is used by tableCollector,should not be optimized by rbo
private final Plan analyzedPlan;
private final Statistics statistics;
private final StructInfo structInfo;

public MTMVCache(Plan logicalPlan, Plan originalPlan, Statistics statistics, StructInfo structInfo) {
public MTMVCache(Plan logicalPlan, Plan originalPlan, Plan analyzedPlan,
Statistics statistics, StructInfo structInfo) {
this.logicalPlan = logicalPlan;
this.originalPlan = originalPlan;
this.analyzedPlan = analyzedPlan;
this.statistics = statistics;
this.structInfo = structInfo;
}
Expand All @@ -71,6 +75,10 @@ public Plan getOriginalPlan() {
return originalPlan;
}

public Plan getAnalyzedPlan() {
return analyzedPlan;
}

public Statistics getStatistics() {
return statistics;
}
Expand Down Expand Up @@ -117,7 +125,7 @@ public Plan visitLogicalResultSink(LogicalResultSink<? extends Plan> logicalResu
Optional<StructInfo> structInfoOptional = MaterializationContext.constructStructInfo(mvPlan, originPlan,
planner.getCascadesContext(),
new BitSet());
return new MTMVCache(mvPlan, originPlan, needCost
return new MTMVCache(mvPlan, originPlan, planner.getAnalyzedPlan(), needCost
? planner.getCascadesContext().getMemo().getRoot().getStatistics() : null,
structInfoOptional.orElseGet(() -> null));
}
Expand Down
31 changes: 9 additions & 22 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,10 @@
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.commands.info.CreateMTMVInfo;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.visitor.TableCollector;
import org.apache.doris.nereids.trees.plans.visitor.TableCollector.TableCollectorContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -104,31 +102,20 @@ private static void setCatalogAndDb(ConnectContext ctx, MTMV mtmv) {
public static MTMVRelation generateMTMVRelation(MTMV mtmv, ConnectContext ctx) {
// Should not make table without data to empty relation when analyze the related table,
// so add disable rules
SessionVariable sessionVariable = ctx.getSessionVariable();
Set<String> tempDisableRules = sessionVariable.getDisableNereidsRuleNames();
sessionVariable.setDisableNereidsRules(CreateMTMVInfo.MTMV_PLANER_DISABLE_RULES);
if (ctx.getStatementContext() != null) {
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
Plan plan;
try {
plan = getPlanBySql(mtmv.getQuerySql(), ctx);
} finally {
sessionVariable.setDisableNereidsRules(String.join(",", tempDisableRules));
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
return generateMTMVRelation(plan);
Plan plan = getAnalyzePlanBySql(mtmv.getQuerySql(), ctx);
return generateMTMVRelation(plan, ctx);
}

public static MTMVRelation generateMTMVRelation(Plan plan) {
return new MTMVRelation(getBaseTables(plan, true), getBaseTables(plan, false), getBaseViews(plan));
public static MTMVRelation generateMTMVRelation(Plan plan, ConnectContext connectContext) {
return new MTMVRelation(getBaseTables(plan, true, connectContext),
getBaseTables(plan, false, connectContext), getBaseViews(plan));
}

private static Set<BaseTableInfo> getBaseTables(Plan plan, boolean expand) {
private static Set<BaseTableInfo> getBaseTables(Plan plan, boolean expand, ConnectContext connectContext) {
TableCollectorContext collectorContext =
new TableCollector.TableCollectorContext(
com.google.common.collect.Sets
.newHashSet(TableType.values()), expand);
.newHashSet(TableType.values()), expand, connectContext);
plan.accept(TableCollector.INSTANCE, collectorContext);
Set<TableIf> collectedTables = collectorContext.getCollectedTables();
return transferTableIfToInfo(collectedTables);
Expand All @@ -146,7 +133,7 @@ private static Set<BaseTableInfo> transferTableIfToInfo(Set<TableIf> tables) {
return result;
}

private static Plan getPlanBySql(String querySql, ConnectContext ctx) {
private static Plan getAnalyzePlanBySql(String querySql, ConnectContext ctx) {
List<StatementBase> statements;
try {
statements = new NereidsParser().parseSQL(querySql);
Expand All @@ -159,7 +146,7 @@ private static Plan getPlanBySql(String querySql, ConnectContext ctx) {
ctx.setStatementContext(new StatementContext());
try {
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
return planner.planWithLock(logicalPlan, PhysicalProperties.ANY, ExplainLevel.NONE);
return planner.planWithLock(logicalPlan, PhysicalProperties.ANY, ExplainLevel.ANALYZED_PLAN);
} finally {
ctx.setStatementContext(original);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.jobs.joinorder.hypergraph;

/**
* This is the common base class for all
* */
public interface HyperElement {

// Get the references nodes
long getReferenceNodes();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.nereids.jobs.joinorder.hypergraph.edge;

import org.apache.doris.common.Pair;
import org.apache.doris.nereids.jobs.joinorder.hypergraph.HyperElement;
import org.apache.doris.nereids.jobs.joinorder.hypergraph.bitmap.LongBitmap;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
Expand All @@ -32,7 +33,7 @@
/**
* Edge in HyperGraph
*/
public abstract class Edge {
public abstract class Edge implements HyperElement {
private final int index;
private final double selectivity;

Expand Down Expand Up @@ -192,6 +193,7 @@ public boolean isSub(Edge edge) {
return LongBitmap.isSubset(getReferenceNodes(), otherBitmap);
}

@Override
public long getReferenceNodes() {
return LongBitmap.newBitmapUnion(leftExtendedNodes, rightExtendedNodes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.nereids.jobs.joinorder.hypergraph.node;

import org.apache.doris.nereids.jobs.joinorder.hypergraph.HyperElement;
import org.apache.doris.nereids.jobs.joinorder.hypergraph.bitmap.LongBitmap;
import org.apache.doris.nereids.jobs.joinorder.hypergraph.edge.Edge;
import org.apache.doris.nereids.jobs.joinorder.hypergraph.edge.FilterEdge;
Expand All @@ -33,7 +34,7 @@
/**
* HyperGraph Node.
*/
public class AbstractNode {
public class AbstractNode implements HyperElement {
protected final int index;
protected final List<JoinEdge> joinEdges;
protected final List<FilterEdge> filterEdges;
Expand Down Expand Up @@ -65,6 +66,11 @@ public List<Edge> getEdges() {
.build();
}

@Override
public long getReferenceNodes() {
return getNodeMap();
}

public int getIndex() {
return index;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import org.apache.doris.common.Pair;
import org.apache.doris.nereids.jobs.joinorder.hypergraph.ConflictRulesMaker;
import org.apache.doris.nereids.jobs.joinorder.hypergraph.HyperElement;
import org.apache.doris.nereids.jobs.joinorder.hypergraph.HyperGraph;
import org.apache.doris.nereids.jobs.joinorder.hypergraph.bitmap.LongBitmap;
import org.apache.doris.nereids.jobs.joinorder.hypergraph.edge.Edge;
import org.apache.doris.nereids.jobs.joinorder.hypergraph.edge.FilterEdge;
import org.apache.doris.nereids.jobs.joinorder.hypergraph.edge.JoinEdge;
import org.apache.doris.nereids.jobs.joinorder.hypergraph.node.StructInfoNode;
import org.apache.doris.nereids.rules.exploration.mv.StructInfo.ExpressionPosition;
import org.apache.doris.nereids.rules.rewrite.PushDownFilterThroughJoin;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
Expand All @@ -51,6 +53,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -79,9 +82,9 @@ public class HyperGraphComparator {
private final Map<Edge, List<? extends Expression>> pullUpViewExprWithEdge = new HashMap<>();
private final LogicalCompatibilityContext logicalCompatibilityContext;
// this records the slots which needs to reject null
// the key is the target join which should reject null, the value is a pair, the first value of the pair is the
// join type, the second value is also a pair which left represents the slots in the left of join that should
// reject null, right represents the slots in the right of join that should reject null.
// the key is the view join edge which should reject null, the value is a pair, the first value of the pair is the
// query join type, the second value is also a pair which left represents the slots in the left of view join that
// should reject null, right represents the slots in the right of view join that should reject null.
private final Map<JoinEdge, Pair<JoinType, Pair<Set<Slot>, Set<Slot>>>> inferredViewEdgeWithCond = new HashMap<>();
private List<JoinEdge> viewJoinEdgesAfterInferring;
private List<FilterEdge> viewFilterEdgesAfterInferring;
Expand Down Expand Up @@ -249,9 +252,17 @@ private boolean compareNodeWithExpr(StructInfoNode query, StructInfoNode view) {
}
int size = queryExprSetList.size();
for (int i = 0; i < size; i++) {
Set<Expression> mappingQueryExprSet = queryExprSetList.get(i).stream()
.map(logicalCompatibilityContext::getViewNodeExprFromQuery)
.collect(Collectors.toSet());
Set<Expression> queryExpressions = queryExprSetList.get(i);
Set<Expression> mappingQueryExprSet = new HashSet<>();
for (Expression queryExpression : queryExpressions) {
Optional<Expression> mappingViewExprByQueryExpr = getMappingViewExprByQueryExpr(queryExpression, query,
this.logicalCompatibilityContext,
ExpressionPosition.NODE);
if (!mappingViewExprByQueryExpr.isPresent()) {
return false;
}
mappingQueryExprSet.add(mappingViewExprByQueryExpr.get());
}
if (!mappingQueryExprSet.equals(viewExprSetList.get(i))) {
return false;
}
Expand Down Expand Up @@ -407,7 +418,10 @@ private Map<Edge, Edge> constructQueryToViewJoinMapWithExpr() {
if (edgeMap.containsKey(entry.getValue())) {
continue;
}
Expression viewExpr = logicalCompatibilityContext.getViewJoinExprFromQuery(entry.getKey());
Expression viewExpr = getMappingViewExprByQueryExpr(entry.getKey(),
entry.getValue(),
logicalCompatibilityContext,
ExpressionPosition.JOIN_EDGE).orElse(null);
if (viewExprToEdge.containsKey(viewExpr)) {
edgeMap.put(entry.getValue(), Objects.requireNonNull(viewExprToEdge.get(viewExpr)));
}
Expand Down Expand Up @@ -441,15 +455,19 @@ private Map<Edge, Edge> constructQueryToViewFilterMapWithExpr() {

HashMap<Edge, Edge> queryToViewEdgeMap = new HashMap<>();
for (Entry<Expression, Collection<Edge>> entry : queryExprToEdge.asMap().entrySet()) {
Expression queryExprViewBased = logicalCompatibilityContext.getViewFilterExprFromQuery(entry.getKey());
if (queryExprViewBased == null) {
continue;
}
Collection<Edge> viewEdges = viewExprToEdge.get(queryExprViewBased);
if (viewEdges.isEmpty()) {
continue;
}
Expression queryExprViewBased = null;
for (Edge queryEdge : entry.getValue()) {
queryExprViewBased = getMappingViewExprByQueryExpr(entry.getKey(),
queryEdge,
logicalCompatibilityContext,
ExpressionPosition.FILTER_EDGE).orElse(null);
if (queryExprViewBased == null) {
continue;
}
Collection<Edge> viewEdges = viewExprToEdge.get(queryExprViewBased);
if (viewEdges.isEmpty()) {
continue;
}
for (Edge viewEdge : viewEdges) {
if (!isSubTreeNodesEquals(queryEdge, viewEdge, logicalCompatibilityContext)) {
// Such as query filter edge is <{1} --FILTER-- {}> but view filter edge is
Expand Down Expand Up @@ -514,17 +532,17 @@ private boolean compareEdgeWithNode(Edge query, Edge view) {
}

private boolean compareFilterEdgeWithNode(FilterEdge query, FilterEdge view) {
return rewriteQueryNodeMap(query.getReferenceNodes()) == view.getReferenceNodes();
return getViewNodesByQuery(query.getReferenceNodes()) == view.getReferenceNodes();
}

private boolean compareJoinEdgeWithNode(JoinEdge query, JoinEdge view) {
boolean res = false;
if (query.getJoinType().swap() == view.getJoinType()) {
res |= rewriteQueryNodeMap(query.getLeftExtendedNodes()) == view.getRightExtendedNodes()
&& rewriteQueryNodeMap(query.getRightExtendedNodes()) == view.getLeftExtendedNodes();
res |= getViewNodesByQuery(query.getLeftExtendedNodes()) == view.getRightExtendedNodes()
&& getViewNodesByQuery(query.getRightExtendedNodes()) == view.getLeftExtendedNodes();
}
res |= rewriteQueryNodeMap(query.getLeftExtendedNodes()) == view.getLeftExtendedNodes()
&& rewriteQueryNodeMap(query.getRightExtendedNodes()) == view.getRightExtendedNodes();
res |= getViewNodesByQuery(query.getLeftExtendedNodes()) == view.getLeftExtendedNodes()
&& getViewNodesByQuery(query.getRightExtendedNodes()) == view.getRightExtendedNodes();
return res;
}

Expand All @@ -547,8 +565,8 @@ private boolean compareJoinEdgeOrInfer(JoinEdge query, JoinEdge view) {
}

private boolean tryInferEdge(JoinEdge query, JoinEdge view) {
if (rewriteQueryNodeMap(query.getLeftRequiredNodes()) != view.getLeftRequiredNodes()
|| rewriteQueryNodeMap(query.getRightRequiredNodes()) != view.getRightRequiredNodes()) {
if (getViewNodesByQuery(query.getLeftRequiredNodes()) != view.getLeftRequiredNodes()
|| getViewNodesByQuery(query.getRightRequiredNodes()) != view.getRightRequiredNodes()) {
return false;
}
if (!query.getJoinType().equals(view.getJoinType())) {
Expand All @@ -569,7 +587,7 @@ private boolean tryInferEdge(JoinEdge query, JoinEdge view) {
return true;
}

private long rewriteQueryNodeMap(long bitmap) {
private long getViewNodesByQuery(long bitmap) {
long newBitmap = LongBitmap.newBitmap();
for (int i : LongBitmap.getIterator(bitmap)) {
int newIdx = getQueryToViewNodeIdMap().getOrDefault(i, 0);
Expand All @@ -578,14 +596,46 @@ private long rewriteQueryNodeMap(long bitmap) {
return newBitmap;
}

private Optional<Expression> getMappingViewExprByQueryExpr(Expression queryExpression,
HyperElement queryExpressionBelongedHyperElement,
LogicalCompatibilityContext context,
ExpressionPosition expressionPosition) {
Expression queryShuttledExpr;
Collection<Pair<Expression, HyperElement>> viewExpressions;
if (ExpressionPosition.JOIN_EDGE.equals(expressionPosition)) {
queryShuttledExpr = context.getQueryJoinShuttledExpr(queryExpression);
viewExpressions = context.getViewJoinExprFromQuery(queryShuttledExpr);
} else if (ExpressionPosition.FILTER_EDGE.equals(expressionPosition)) {
queryShuttledExpr = context.getQueryFilterShuttledExpr(queryExpression);
viewExpressions = context.getViewFilterExprFromQuery(queryShuttledExpr);
} else {
queryShuttledExpr = context.getQueryNodeShuttledExpr(queryExpression);
viewExpressions = context.getViewNodeExprFromQuery(queryShuttledExpr);
}
if (viewExpressions.size() == 1) {
return Optional.of(viewExpressions.iterator().next().key());
}
long queryReferenceNodes = queryExpressionBelongedHyperElement.getReferenceNodes();
long viewReferenceNodes = getViewNodesByQuery(queryReferenceNodes);
for (Pair<Expression, HyperElement> viewExpressionPair : viewExpressions) {
if (viewExpressionPair.value().getReferenceNodes() == viewReferenceNodes) {
return Optional.of(viewExpressionPair.key());
}
}
return Optional.empty();
}

private void compareJoinEdgeWithExpr(Edge query, Edge view) {
Set<? extends Expression> queryExprSet = query.getExpressionSet();
Set<? extends Expression> viewExprSet = view.getExpressionSet();

Set<Expression> exprMappedOfView = new HashSet<>();
List<Expression> residualQueryExpr = new ArrayList<>();
for (Expression queryExpr : queryExprSet) {
Expression viewExpr = logicalCompatibilityContext.getViewJoinExprFromQuery(queryExpr);
Expression viewExpr = getMappingViewExprByQueryExpr(queryExpr,
query,
logicalCompatibilityContext,
ExpressionPosition.JOIN_EDGE).orElse(null);
if (viewExprSet.contains(viewExpr)) {
exprMappedOfView.add(viewExpr);
} else {
Expand All @@ -604,7 +654,10 @@ private void compareFilterEdgeWithExpr(Edge query, Edge view) {
Set<Expression> exprMappedOfView = new HashSet<>();
List<Expression> residualQueryExpr = new ArrayList<>();
for (Expression queryExpr : queryExprSet) {
Expression viewExpr = logicalCompatibilityContext.getViewFilterExprFromQuery(queryExpr);
Expression viewExpr = getMappingViewExprByQueryExpr(queryExpr,
query,
logicalCompatibilityContext,
ExpressionPosition.FILTER_EDGE).orElse(null);
if (viewExprSet.contains(viewExpr)) {
exprMappedOfView.add(viewExpr);
} else {
Expand Down
Loading

0 comments on commit e70f7f8

Please sign in to comment.