Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement](mtmv) Refactor the plan getting logic when create mtmv,and support to collect view from plan #40428

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,18 @@ public class MTMVCache {
// The materialized view plan which should be optimized by the same rules to query
// and will remove top sink and unused sort
private final Plan logicalPlan;
// The original plan of mv def sql
// The original rewritten plan of mv def sql
private final Plan originalPlan;
// The analyzed plan of mv def sql, which is used by tableCollector,should not be optimized by rbo
private final Plan analyzedPlan;
private final Statistics statistics;
private final StructInfo structInfo;

public MTMVCache(Plan logicalPlan, Plan originalPlan, Statistics statistics, StructInfo structInfo) {
public MTMVCache(Plan logicalPlan, Plan originalPlan, Plan analyzedPlan,
Statistics statistics, StructInfo structInfo) {
this.logicalPlan = logicalPlan;
this.originalPlan = originalPlan;
this.analyzedPlan = analyzedPlan;
this.statistics = statistics;
this.structInfo = structInfo;
}
Expand All @@ -71,6 +75,10 @@ public Plan getOriginalPlan() {
return originalPlan;
}

public Plan getAnalyzedPlan() {
return analyzedPlan;
}

public Statistics getStatistics() {
return statistics;
}
Expand Down Expand Up @@ -117,7 +125,7 @@ public Plan visitLogicalResultSink(LogicalResultSink<? extends Plan> logicalResu
Optional<StructInfo> structInfoOptional = MaterializationContext.constructStructInfo(mvPlan, originPlan,
planner.getCascadesContext(),
new BitSet());
return new MTMVCache(mvPlan, originPlan, needCost
return new MTMVCache(mvPlan, originPlan, planner.getAnalyzedPlan(), needCost
? planner.getCascadesContext().getMemo().getRoot().getStatistics() : null,
structInfoOptional.orElseGet(() -> null));
}
Expand Down
45 changes: 20 additions & 25 deletions fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,12 @@
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.commands.info.CreateMTMVInfo;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.visitor.TableCollector;
import org.apache.doris.nereids.trees.plans.visitor.TableCollector.TableCollectorContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;

import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -98,38 +95,36 @@ private static void setCatalogAndDb(ConnectContext ctx, MTMV mtmv) {
public static MTMVRelation generateMTMVRelation(MTMV mtmv, ConnectContext ctx) {
// Should not make table without data to empty relation when analyze the related table,
// so add disable rules
SessionVariable sessionVariable = ctx.getSessionVariable();
Set<String> tempDisableRules = sessionVariable.getDisableNereidsRuleNames();
sessionVariable.setDisableNereidsRules(CreateMTMVInfo.MTMV_PLANER_DISABLE_RULES);
if (ctx.getStatementContext() != null) {
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
Plan plan;
try {
plan = getPlanBySql(mtmv.getQuerySql(), ctx);
} finally {
sessionVariable.setDisableNereidsRules(String.join(",", tempDisableRules));
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
return generateMTMVRelation(plan);
Plan plan = getAnalyzePlanBySql(mtmv.getQuerySql(), ctx);
return generateMTMVRelation(plan, ctx);
}

public static MTMVRelation generateMTMVRelation(Plan plan) {
return new MTMVRelation(getBaseTables(plan, true), getBaseTables(plan, false), getBaseViews(plan));
public static MTMVRelation generateMTMVRelation(Plan plan, ConnectContext connectContext) {
return new MTMVRelation(getBaseTables(plan, true, true, connectContext),
getBaseTables(plan, false, true, connectContext),
getBaseViews(plan, true, true, connectContext));
}

private static Set<BaseTableInfo> getBaseTables(Plan plan, boolean expand) {
private static Set<BaseTableInfo> getBaseTables(Plan plan, boolean expandMaterializedView,
boolean expandView, ConnectContext connectContext) {
TableCollectorContext collectorContext =
new TableCollector.TableCollectorContext(
com.google.common.collect.Sets
.newHashSet(TableType.values()), expand);
.newHashSet(TableType.values()), connectContext, expandMaterializedView, expandView);
plan.accept(TableCollector.INSTANCE, collectorContext);
Set<TableIf> collectedTables = collectorContext.getCollectedTables();
return transferTableIfToInfo(collectedTables);
}

private static Set<BaseTableInfo> getBaseViews(Plan plan) {
return Sets.newHashSet();
private static Set<BaseTableInfo> getBaseViews(Plan plan, boolean expandMaterializedView,
boolean expandView, ConnectContext connectContext) {
TableCollectorContext collectorContext =
new TableCollector.TableCollectorContext(
com.google.common.collect.Sets
.newHashSet(TableType.VIEW), connectContext, expandMaterializedView, expandView);
plan.accept(TableCollector.INSTANCE, collectorContext);
Set<TableIf> collectedTables = collectorContext.getCollectedTables();
return transferTableIfToInfo(collectedTables);
}

private static Set<BaseTableInfo> transferTableIfToInfo(Set<TableIf> tables) {
Expand All @@ -140,7 +135,7 @@ private static Set<BaseTableInfo> transferTableIfToInfo(Set<TableIf> tables) {
return result;
}

private static Plan getPlanBySql(String querySql, ConnectContext ctx) {
private static Plan getAnalyzePlanBySql(String querySql, ConnectContext ctx) {
List<StatementBase> statements;
try {
statements = new NereidsParser().parseSQL(querySql);
Expand All @@ -153,7 +148,7 @@ private static Plan getPlanBySql(String querySql, ConnectContext ctx) {
ctx.setStatementContext(new StatementContext());
try {
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
return planner.planWithLock(logicalPlan, PhysicalProperties.ANY, ExplainLevel.NONE);
return planner.planWithLock(logicalPlan, PhysicalProperties.ANY, ExplainLevel.ANALYZED_PLAN);
} finally {
ctx.setStatementContext(original);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,13 @@ public void initMaterializationContext(CascadesContext cascadesContext) {
*/
protected void doInitMaterializationContext(CascadesContext cascadesContext) {
// Only collect the table or mv which query use directly, to avoid useless mv partition in rewrite
TableCollectorContext collectorContext = new TableCollectorContext(Sets.newHashSet(), false);
// Keep use one connection context when in query, if new connect context,
// the ConnectionContext.get() will change
TableCollectorContext collectorContext = new TableCollectorContext(Sets.newHashSet(),
cascadesContext.getConnectContext(),
false, true);
try {
Plan rewritePlan = cascadesContext.getRewritePlan();
// Keep use one connection context when in query, if new connect context,
// the ConnectionContext.get() will change
collectorContext.setConnectContext(cascadesContext.getConnectContext());
rewritePlan.accept(TableCollector.INSTANCE, collectorContext);
} catch (Exception e) {
LOG.warn(String.format("MaterializationContext init table collect fail, current queryId is %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public Plan visitLogicalResultSink(LogicalResultSink<? extends Plan> logicalResu
ImmutableList.of(Rewriter.custom(RuleType.ELIMINATE_SORT, EliminateSort::new))).execute();
return childContext.getRewritePlan();
}, mvPlan, originPlan);
return new MTMVCache(mvPlan, originPlan,
return new MTMVCache(mvPlan, originPlan, planner.getAnalyzedPlan(),
planner.getCascadesContext().getMemo().getRoot().getStatistics(), null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,21 @@ public void analyzeQuery(ConnectContext ctx, Map<String, String> mvProperties) t
NereidsPlanner planner = new NereidsPlanner(statementContext);
// this is for expression column name infer when not use alias
LogicalSink<Plan> logicalSink = new UnboundResultSink<>(logicalQuery);
// must disable constant folding by be, because be constant folding may return wrong type
ctx.getSessionVariable().setVarOnce(SessionVariable.ENABLE_FOLD_CONSTANT_BY_BE, "false");
Plan plan = planner.planWithLock(logicalSink, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
// Should not make table without data to empty relation when analyze the related table,
// so add disable rules
Set<String> tempDisableRules = ctx.getSessionVariable().getDisableNereidsRuleNames();
ctx.getSessionVariable().setDisableNereidsRules(CreateMTMVInfo.MTMV_PLANER_DISABLE_RULES);
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
Plan plan;
try {
// must disable constant folding by be, because be constant folding may return wrong type
ctx.getSessionVariable().setVarOnce(SessionVariable.ENABLE_FOLD_CONSTANT_BY_BE, "false");
plan = planner.planWithLock(logicalSink, PhysicalProperties.ANY, ExplainLevel.ALL_PLAN);
} finally {
// after operate, roll back the disable rules
ctx.getSessionVariable().setDisableNereidsRules(String.join(",", tempDisableRules));
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
// can not contain VIEW or MTMV
analyzeBaseTables(planner.getAnalyzedPlan());
// can not contain Random function
Expand All @@ -265,8 +277,7 @@ public void analyzeQuery(ConnectContext ctx, Map<String, String> mvProperties) t
throw new AnalysisException("can not contain invalid expression");
}
getRelation(planner);
this.mvPartitionInfo = mvPartitionDefinition
.analyzeAndTransferToMTMVPartitionInfo(planner, ctx, logicalQuery);
this.mvPartitionInfo = mvPartitionDefinition.analyzeAndTransferToMTMVPartitionInfo(planner, ctx);
this.partitionDesc = generatePartitionDesc(ctx);
getColumns(plan, ctx, mvPartitionInfo.getPartitionCol(), distribution);
analyzeKeys();
Expand Down Expand Up @@ -311,24 +322,9 @@ private void analyzeKeys() {
}
}

// Should use analyzed plan for collect views and tables
private void getRelation(NereidsPlanner planner) {
// Should not make table without data to empty relation when analyze the related table,
// so add disable rules
ConnectContext ctx = planner.getCascadesContext().getConnectContext();
SessionVariable sessionVariable = ctx.getSessionVariable();
Set<String> tempDisableRules = sessionVariable.getDisableNereidsRuleNames();
sessionVariable.setDisableNereidsRules(CreateMTMVInfo.MTMV_PLANER_DISABLE_RULES);
if (ctx.getStatementContext() != null) {
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
Plan plan;
try {
plan = planner.planWithLock(logicalQuery, PhysicalProperties.ANY, ExplainLevel.NONE);
} finally {
sessionVariable.setDisableNereidsRules(String.join(",", tempDisableRules));
ctx.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
this.relation = MTMVPlanUtil.generateMTMVRelation(plan);
this.relation = MTMVPlanUtil.generateMTMVRelation(planner.getAnalyzedPlan(), planner.getConnectContext());
}

private PartitionDesc generatePartitionDesc(ConnectContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,14 @@
import org.apache.doris.nereids.analyzer.UnboundFunction;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils.RelatedTableInfo;
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.functions.scalar.DateTrunc;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import com.google.common.collect.Sets;

Expand All @@ -72,11 +67,9 @@ public class MTMVPartitionDefinition {
*
* @param planner planner
* @param ctx ctx
* @param logicalQuery logicalQuery
* @return MTMVPartitionInfo
*/
public MTMVPartitionInfo analyzeAndTransferToMTMVPartitionInfo(NereidsPlanner planner, ConnectContext ctx,
LogicalPlan logicalQuery) {
public MTMVPartitionInfo analyzeAndTransferToMTMVPartitionInfo(NereidsPlanner planner, ConnectContext ctx) {
MTMVPartitionInfo mtmvPartitionInfo = new MTMVPartitionInfo(partitionType);
if (this.partitionType == MTMVPartitionType.SELF_MANAGE) {
return mtmvPartitionInfo;
Expand All @@ -100,7 +93,7 @@ public MTMVPartitionInfo analyzeAndTransferToMTMVPartitionInfo(NereidsPlanner pl
timeUnit = null;
}
mtmvPartitionInfo.setPartitionCol(partitionColName);
RelatedTableInfo relatedTableInfo = getRelatedTableInfo(planner, ctx, logicalQuery, partitionColName, timeUnit);
RelatedTableInfo relatedTableInfo = getRelatedTableInfo(planner, ctx, partitionColName, timeUnit);
mtmvPartitionInfo.setRelatedCol(relatedTableInfo.getColumn());
mtmvPartitionInfo.setRelatedTable(relatedTableInfo.getTableInfo());
if (relatedTableInfo.getPartitionExpression().isPresent()) {
Expand All @@ -125,47 +118,33 @@ public MTMVPartitionInfo analyzeAndTransferToMTMVPartitionInfo(NereidsPlanner pl
return mtmvPartitionInfo;
}

private RelatedTableInfo getRelatedTableInfo(NereidsPlanner planner, ConnectContext ctx, LogicalPlan
logicalQuery,
String partitionColName,
String timeUnit) {
// Should use rewritten plan without view and subQuery to get related partition table
private RelatedTableInfo getRelatedTableInfo(NereidsPlanner planner, ConnectContext ctx,
String partitionColName, String timeUnit) {
CascadesContext cascadesContext = planner.getCascadesContext();
SessionVariable sessionVariable = cascadesContext.getConnectContext().getSessionVariable();
Set<String> tempDisableRules = sessionVariable.getDisableNereidsRuleNames();
// Should not make table without data to empty relation when analyze the related table,
// so add disable rules
sessionVariable.setDisableNereidsRules(CreateMTMVInfo.MTMV_PLANER_DISABLE_RULES);
cascadesContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);

RelatedTableInfo relatedTableInfo = MaterializedViewUtils
.getRelatedTableInfo(partitionColName, timeUnit, planner.getRewrittenPlan(), cascadesContext);
if (!relatedTableInfo.isPctPossible()) {
throw new AnalysisException(String.format("Unable to find a suitable base table for partitioning,"
+ " the fail reason is %s", relatedTableInfo.getFailReason()));
}
MTMVRelatedTableIf mtmvBaseRealtedTable = MTMVUtil.getRelatedTable(relatedTableInfo.getTableInfo());
Set<String> partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
try {
Plan mvRewrittenPlan =
planner.planWithLock(logicalQuery, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN);
RelatedTableInfo relatedTableInfo = MaterializedViewUtils
.getRelatedTableInfo(partitionColName, timeUnit, mvRewrittenPlan, cascadesContext);
if (!relatedTableInfo.isPctPossible()) {
throw new AnalysisException(String.format("Unable to find a suitable base table for partitioning,"
+ " the fail reason is %s", relatedTableInfo.getFailReason()));
}
MTMVRelatedTableIf mtmvBaseRealtedTable = MTMVUtil.getRelatedTable(relatedTableInfo.getTableInfo());
Set<String> partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
try {
partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames(Optional.empty()));
} catch (DdlException e) {
throw new AnalysisException(e.getMessage(), e);
}
partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames(Optional.empty()));
} catch (DdlException e) {
throw new AnalysisException(e.getMessage(), e);
}

if (!partitionColumnNames.contains(relatedTableInfo.getColumn())) {
throw new AnalysisException("error related column: " + relatedTableInfo.getColumn());
}
if (!(mtmvBaseRealtedTable instanceof HMSExternalTable)
&& partitionColumnNames.size() != 1) {
throw new AnalysisException("only hms table support multi column partition.");
}
return relatedTableInfo;
} finally {
// after operate, roll back the disable rules
sessionVariable.setDisableNereidsRules(String.join(",", tempDisableRules));
cascadesContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
if (!partitionColumnNames.contains(relatedTableInfo.getColumn())) {
throw new AnalysisException("error related column: " + relatedTableInfo.getColumn());
}
if (!(mtmvBaseRealtedTable instanceof HMSExternalTable)
&& partitionColumnNames.size() != 1) {
throw new AnalysisException("only hms table support multi column partition.");
}
return relatedTableInfo;
}

private static List<Expr> convertToLegacyArguments(List<Expression> children) {
Expand Down
Loading