Skip to content

Commit

Permalink
[improvement](mtmv) Refactor the plan getting logic when create mtmv,…
Browse files Browse the repository at this point in the history
… and support to collect view from plan
  • Loading branch information
seawinde authored and morrySnow committed Sep 11, 2024
1 parent c8b5386 commit b67dd37
Show file tree
Hide file tree
Showing 8 changed files with 529 additions and 102 deletions.
50 changes: 28 additions & 22 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import com.google.common.collect.Sets;

import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand All @@ -66,38 +64,37 @@ public static ConnectContext createMTMVContext(MTMV mtmv) {
public static MTMVRelation generateMTMVRelation(MTMV mtmv, ConnectContext ctx) {
// Should not make table without data to empty relation when analyze the related table,
// so add disable rules
SessionVariable sessionVariable = ctx.getSessionVariable();
Set<String> tempDisableRules = sessionVariable.getDisableNereidsRuleNames();
sessionVariable.setDisableNereidsRules(CreateMTMVInfo.MTMV_PLANER_DISABLE_RULES);
if (ctx.getStatementContext() != null) {
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
Plan plan;
try {
plan = getPlanBySql(mtmv.getQuerySql(), ctx);
} finally {
sessionVariable.setDisableNereidsRules(String.join(",", tempDisableRules));
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
Plan plan = getPlanBySql(mtmv.getQuerySql(), ctx, ExplainLevel.ANALYZED_PLAN,
CreateMTMVInfo.MTMV_PLANER_DISABLE_RULES);
return generateMTMVRelation(plan);
}

public static MTMVRelation generateMTMVRelation(Plan plan) {
return new MTMVRelation(getBaseTables(plan, true), getBaseTables(plan, false), getBaseViews(plan));
return new MTMVRelation(getBaseTables(plan, true, true),
getBaseTables(plan, false, true),
getBaseViews(plan, true, true));
}

private static Set<BaseTableInfo> getBaseTables(Plan plan, boolean expand) {
private static Set<BaseTableInfo> getBaseTables(Plan plan, boolean expandMaterializedView,
boolean expandView) {
TableCollectorContext collectorContext =
new TableCollector.TableCollectorContext(
com.google.common.collect.Sets
.newHashSet(TableType.values()), expand);
.newHashSet(TableType.values()), expandMaterializedView, expandView);
plan.accept(TableCollector.INSTANCE, collectorContext);
Set<TableIf> collectedTables = collectorContext.getCollectedTables();
return transferTableIfToInfo(collectedTables);
}

private static Set<BaseTableInfo> getBaseViews(Plan plan) {
return Sets.newHashSet();
private static Set<BaseTableInfo> getBaseViews(Plan plan, boolean expandMaterializedView,
boolean expandView) {
TableCollectorContext collectorContext =
new TableCollector.TableCollectorContext(
com.google.common.collect.Sets
.newHashSet(TableType.VIEW), expandMaterializedView, expandView);
plan.accept(TableCollector.INSTANCE, collectorContext);
Set<TableIf> collectedTables = collectorContext.getCollectedTables();
return transferTableIfToInfo(collectedTables);
}

private static Set<BaseTableInfo> transferTableIfToInfo(Set<TableIf> tables) {
Expand All @@ -108,7 +105,8 @@ private static Set<BaseTableInfo> transferTableIfToInfo(Set<TableIf> tables) {
return result;
}

private static Plan getPlanBySql(String querySql, ConnectContext ctx) {
private static Plan getPlanBySql(String querySql, ConnectContext ctx,
ExplainLevel explainLevel, String disableRules) {
List<StatementBase> statements;
try {
statements = new NereidsParser().parseSQL(querySql);
Expand All @@ -119,11 +117,19 @@ private static Plan getPlanBySql(String querySql, ConnectContext ctx) {
LogicalPlan logicalPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan();
StatementContext original = ctx.getStatementContext();
ctx.setStatementContext(new StatementContext());

Set<String> tempDisableRules = ctx.getSessionVariable().getDisableNereidsRuleNames();
ctx.getSessionVariable().setDisableNereidsRules(disableRules);
if (ctx.getStatementContext() != null) {
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
try {
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
return planner.planWithLock(logicalPlan, PhysicalProperties.ANY, ExplainLevel.NONE);
return planner.planWithLock(logicalPlan, PhysicalProperties.ANY, explainLevel);
} finally {
ctx.setStatementContext(original);
ctx.getSessionVariable().setDisableNereidsRules(String.join(",", tempDisableRules));
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public void initMaterializationContext(CascadesContext cascadesContext) {
*/
protected void doInitMaterializationContext(CascadesContext cascadesContext) {
// Only collect the table or mv which query use directly, to avoid useless mv partition in rewrite
TableCollectorContext collectorContext = new TableCollectorContext(Sets.newHashSet(), false);
TableCollectorContext collectorContext = new TableCollectorContext(Sets.newHashSet(),
false, true);
try {
Plan rewritePlan = cascadesContext.getRewritePlan();
// Keep use one connection context when in query, if new connect context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,19 @@ public void analyzeQuery(ConnectContext ctx, Map<String, String> mvProperties) t
LogicalSink<Plan> logicalSink = new UnboundResultSink<>(logicalQuery);
// must disable constant folding by be, because be constant folding may return wrong type
ctx.getSessionVariable().disableConstantFoldingByBEOnce();
Plan plan = planner.planWithLock(logicalSink, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
// Should not make table without data to empty relation when analyze the related table,
// so add disable rules
Set<String> tempDisableRules = ctx.getSessionVariable().getDisableNereidsRuleNames();
ctx.getSessionVariable().setDisableNereidsRules(CreateMTMVInfo.MTMV_PLANER_DISABLE_RULES);
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
Plan plan;
try {
plan = planner.planWithLock(logicalSink, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
} finally {
// after operate, roll back the disable rules
ctx.getSessionVariable().setDisableNereidsRules(String.join(",", tempDisableRules));
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
// can not contain VIEW or MTMV
analyzeBaseTables(planner.getAnalyzedPlan());
// can not contain Random function
Expand All @@ -252,8 +264,7 @@ public void analyzeQuery(ConnectContext ctx, Map<String, String> mvProperties) t
throw new AnalysisException("can not contain invalid expression");
}
getRelation(planner);
this.mvPartitionInfo = mvPartitionDefinition
.analyzeAndTransferToMTMVPartitionInfo(planner, ctx, logicalQuery);
this.mvPartitionInfo = mvPartitionDefinition.analyzeAndTransferToMTMVPartitionInfo(planner, ctx);
this.partitionDesc = generatePartitionDesc(ctx);
getColumns(plan, ctx, mvPartitionInfo.getPartitionCol(), distribution);
analyzeKeys();
Expand Down Expand Up @@ -298,24 +309,9 @@ private void analyzeKeys() {
}
}

// Should use analyzed plan for collect views and tables
private void getRelation(NereidsPlanner planner) {
// Should not make table without data to empty relation when analyze the related table,
// so add disable rules
ConnectContext ctx = planner.getCascadesContext().getConnectContext();
SessionVariable sessionVariable = ctx.getSessionVariable();
Set<String> tempDisableRules = sessionVariable.getDisableNereidsRuleNames();
sessionVariable.setDisableNereidsRules(CreateMTMVInfo.MTMV_PLANER_DISABLE_RULES);
if (ctx.getStatementContext() != null) {
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
Plan plan;
try {
plan = planner.planWithLock(logicalQuery, PhysicalProperties.ANY, ExplainLevel.NONE);
} finally {
sessionVariable.setDisableNereidsRules(String.join(",", tempDisableRules));
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
this.relation = MTMVPlanUtil.generateMTMVRelation(plan);
this.relation = MTMVPlanUtil.generateMTMVRelation(planner.getAnalyzedPlan());
}

private PartitionDesc generatePartitionDesc(ConnectContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,14 @@
import org.apache.doris.nereids.analyzer.UnboundFunction;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils.RelatedTableInfo;
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.functions.scalar.DateTrunc;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import com.google.common.collect.Sets;

Expand All @@ -71,11 +66,9 @@ public class MTMVPartitionDefinition {
*
* @param planner planner
* @param ctx ctx
* @param logicalQuery logicalQuery
* @return MTMVPartitionInfo
*/
public MTMVPartitionInfo analyzeAndTransferToMTMVPartitionInfo(NereidsPlanner planner, ConnectContext ctx,
LogicalPlan logicalQuery) {
public MTMVPartitionInfo analyzeAndTransferToMTMVPartitionInfo(NereidsPlanner planner, ConnectContext ctx) {
MTMVPartitionInfo mtmvPartitionInfo = new MTMVPartitionInfo(partitionType);
if (this.partitionType == MTMVPartitionType.SELF_MANAGE) {
return mtmvPartitionInfo;
Expand All @@ -99,7 +92,7 @@ public MTMVPartitionInfo analyzeAndTransferToMTMVPartitionInfo(NereidsPlanner pl
timeUnit = null;
}
mtmvPartitionInfo.setPartitionCol(partitionColName);
RelatedTableInfo relatedTableInfo = getRelatedTableInfo(planner, ctx, logicalQuery, partitionColName, timeUnit);
RelatedTableInfo relatedTableInfo = getRelatedTableInfo(planner, ctx, partitionColName, timeUnit);
mtmvPartitionInfo.setRelatedCol(relatedTableInfo.getColumn());
mtmvPartitionInfo.setRelatedTable(relatedTableInfo.getTableInfo());
if (relatedTableInfo.getPartitionExpression().isPresent()) {
Expand All @@ -124,47 +117,33 @@ public MTMVPartitionInfo analyzeAndTransferToMTMVPartitionInfo(NereidsPlanner pl
return mtmvPartitionInfo;
}

private RelatedTableInfo getRelatedTableInfo(NereidsPlanner planner, ConnectContext ctx, LogicalPlan
logicalQuery,
String partitionColName,
String timeUnit) {
// Should use rewritten plan without view and subQuery to get related partition table
private RelatedTableInfo getRelatedTableInfo(NereidsPlanner planner, ConnectContext ctx,
String partitionColName, String timeUnit) {
CascadesContext cascadesContext = planner.getCascadesContext();
SessionVariable sessionVariable = cascadesContext.getConnectContext().getSessionVariable();
Set<String> tempDisableRules = sessionVariable.getDisableNereidsRuleNames();
// Should not make table without data to empty relation when analyze the related table,
// so add disable rules
sessionVariable.setDisableNereidsRules(CreateMTMVInfo.MTMV_PLANER_DISABLE_RULES);
cascadesContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);

RelatedTableInfo relatedTableInfo = MaterializedViewUtils
.getRelatedTableInfo(partitionColName, timeUnit, planner.getRewrittenPlan(), cascadesContext);
if (!relatedTableInfo.isPctPossible()) {
throw new AnalysisException(String.format("Unable to find a suitable base table for partitioning,"
+ " the fail reason is %s", relatedTableInfo.getFailReason()));
}
MTMVRelatedTableIf mtmvBaseRealtedTable = MTMVUtil.getRelatedTable(relatedTableInfo.getTableInfo());
Set<String> partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
try {
Plan mvRewrittenPlan =
planner.planWithLock(logicalQuery, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN);
RelatedTableInfo relatedTableInfo = MaterializedViewUtils
.getRelatedTableInfo(partitionColName, timeUnit, mvRewrittenPlan, cascadesContext);
if (!relatedTableInfo.isPctPossible()) {
throw new AnalysisException(String.format("Unable to find a suitable base table for partitioning,"
+ " the fail reason is %s", relatedTableInfo.getFailReason()));
}
MTMVRelatedTableIf mtmvBaseRealtedTable = MTMVUtil.getRelatedTable(relatedTableInfo.getTableInfo());
Set<String> partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
try {
partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames());
} catch (DdlException e) {
throw new AnalysisException(e.getMessage(), e);
}
partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames());
} catch (DdlException e) {
throw new AnalysisException(e.getMessage(), e);
}

if (!partitionColumnNames.contains(relatedTableInfo.getColumn())) {
throw new AnalysisException("error related column: " + relatedTableInfo.getColumn());
}
if (!(mtmvBaseRealtedTable instanceof HMSExternalTable)
&& partitionColumnNames.size() != 1) {
throw new AnalysisException("only hms table support multi column partition.");
}
return relatedTableInfo;
} finally {
// after operate, roll back the disable rules
sessionVariable.setDisableNereidsRules(String.join(",", tempDisableRules));
cascadesContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
if (!partitionColumnNames.contains(relatedTableInfo.getColumn())) {
throw new AnalysisException("error related column: " + relatedTableInfo.getColumn());
}
if (!(mtmvBaseRealtedTable instanceof HMSExternalTable)
&& partitionColumnNames.size() != 1) {
throw new AnalysisException("only hms table support multi column partition.");
}
return relatedTableInfo;
}

private static List<Expr> convertToLegacyArguments(List<Expression> children) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.View;
import org.apache.doris.mtmv.MTMVCache;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalView;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.visitor.TableCollector.TableCollectorContext;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -69,8 +71,20 @@ public Plan visitPhysicalCatalogRelation(PhysicalCatalogRelation catalogRelation
return catalogRelation;
}

@Override
public Plan visitLogicalView(LogicalView<? extends Plan> logicalView, TableCollectorContext context) {
View view = logicalView.getView();
if (context.getTargetTableTypes().isEmpty() || context.getTargetTableTypes().contains(view.getType())) {
context.getCollectedTables().add(view);
}
if (context.isExpandView()) {
return super.visit(logicalView, context);
}
return logicalView;
}

private void expandMvAndCollect(MTMV mtmv, TableCollectorContext context) {
if (!context.isExpand()) {
if (!context.isExpandMaterializedView()) {
return;
}
// Make sure use only one connection context when in query to avoid ConnectionContext.get() wrong
Expand All @@ -87,12 +101,15 @@ public static final class TableCollectorContext {
private final Set<TableIf> collectedTables = new HashSet<>();
private final Set<TableType> targetTableTypes;
// if expand the mv or not
private final boolean expand;
private final boolean expandMaterializedView;
private final boolean expandView;
private ConnectContext connectContext;

public TableCollectorContext(Set<TableType> targetTableTypes, boolean expand) {
public TableCollectorContext(Set<TableType> targetTableTypes, boolean expandMaterializedView,
boolean expandView) {
this.targetTableTypes = targetTableTypes;
this.expand = expand;
this.expandMaterializedView = expandMaterializedView;
this.expandView = expandView;
}

public Set<TableIf> getCollectedTables() {
Expand All @@ -103,8 +120,8 @@ public Set<TableType> getTargetTableTypes() {
return targetTableTypes;
}

public boolean isExpand() {
return expand;
public boolean isExpandMaterializedView() {
return expandMaterializedView;
}

public ConnectContext getConnectContext() {
Expand All @@ -114,5 +131,9 @@ public ConnectContext getConnectContext() {
public void setConnectContext(ConnectContext connectContext) {
this.connectContext = connectContext;
}

public boolean isExpandView() {
return expandView;
}
}
}
Loading

0 comments on commit b67dd37

Please sign in to comment.