Skip to content

Commit

Permalink
[pick](mtmv) Pick some pr to branch 3.0 #41674 #43095 #44164 (#44290)
Browse files Browse the repository at this point in the history
pick from master

commit id 6fdc8a5
PR #41674


commit id 7c45912
PR #43095


commit id 7aec6ff
PR #44164
  • Loading branch information
seawinde authored Nov 22, 2024
1 parent 2122f13 commit eeda1ce
Show file tree
Hide file tree
Showing 32 changed files with 8,961 additions and 536 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,6 @@ public String getTypeName() {

@Override
public String toString() {
if (!leftRejectEdges.isEmpty() || !rightRejectEdges.isEmpty()) {
return String.format("<%s --%s-- %s>[%s , %s]", LongBitmap.toString(leftExtendedNodes),
this.getTypeName(), LongBitmap.toString(rightExtendedNodes), leftRejectEdges, rightRejectEdges);
}
return String.format("<%s --%s-- %s>", LongBitmap.toString(leftExtendedNodes),
this.getTypeName(), LongBitmap.toString(rightExtendedNodes));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
continue;
}
Plan rewrittenPlan;
Plan mvScan = materializationContext.getScanPlan(queryStructInfo);
Plan mvScan = materializationContext.getScanPlan(queryStructInfo, cascadesContext);
Plan queryPlan = queryStructInfo.getTopPlan();
if (compensatePredicates.isAlwaysTrue()) {
rewrittenPlan = mvScan;
Expand Down Expand Up @@ -262,12 +262,6 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
// Rewrite query by view
rewrittenPlan = rewriteQueryByView(matchMode, queryStructInfo, viewStructInfo, viewToQuerySlotMapping,
rewrittenPlan, materializationContext, cascadesContext);
// If rewrite successfully, try to get mv read lock to avoid data inconsistent,
// try to get lock which should added before RBO
if (materializationContext instanceof AsyncMaterializationContext && !materializationContext.isSuccess()) {
cascadesContext.getStatementContext()
.addTableReadLock(((AsyncMaterializationContext) materializationContext).getMtmv());
}
rewrittenPlan = MaterializedViewUtils.rewriteByRules(cascadesContext,
childContext -> {
Rewriter.getWholeTreeRewriter(childContext).execute();
Expand Down Expand Up @@ -379,9 +373,9 @@ protected List<Plan> doRewrite(StructInfo queryStructInfo, CascadesContext casca
}
trySetStatistics(materializationContext, cascadesContext);
rewriteResults.add(rewrittenPlan);
// if rewrite successfully, try to regenerate mv scan because it maybe used again
materializationContext.tryReGenerateScanPlan(cascadesContext);
recordIfRewritten(queryStructInfo.getOriginalPlan(), materializationContext, cascadesContext);
// If rewrite successfully, try to clear mv scan currently because it maybe used again
materializationContext.clearScanPlan(cascadesContext);
}
return rewriteResults;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ public class AsyncMaterializationContext extends MaterializationContext {
*/
public AsyncMaterializationContext(MTMV mtmv, Plan mvPlan, Plan mvOriginalPlan, List<Table> baseTables,
List<Table> baseViews, CascadesContext cascadesContext, StructInfo structInfo) {
super(mvPlan, mvOriginalPlan, MaterializedViewUtils.generateMvScanPlan(mtmv, mtmv.getBaseIndexId(),
mtmv.getPartitionIds(), PreAggStatus.on(), cascadesContext),
cascadesContext, structInfo);
super(mvPlan, mvOriginalPlan, cascadesContext, structInfo);
this.mtmv = mtmv;
}

Expand Down Expand Up @@ -110,7 +108,7 @@ public Optional<Pair<Id, Statistics>> getPlanStatistics(CascadesContext cascades
return Optional.empty();
}
RelationId relationId = null;
Optional<LogicalOlapScan> logicalOlapScan = this.getScanPlan(null)
Optional<LogicalOlapScan> logicalOlapScan = this.getScanPlan(null, cascadesContext)
.collectFirst(LogicalOlapScan.class::isInstance);
if (logicalOlapScan.isPresent()) {
relationId = logicalOlapScan.get().getRelationId();
Expand All @@ -132,7 +130,13 @@ boolean isFinalChosen(Relation relation) {
}

@Override
public Plan getScanPlan(StructInfo queryInfo) {
public Plan getScanPlan(StructInfo queryInfo, CascadesContext cascadesContext) {
// If try to get scan plan or rewrite successfully, try to get mv read lock to avoid meta data inconsistent,
// try to get lock which should added before RBO
if (!this.isSuccess()) {
cascadesContext.getStatementContext().addTableReadLock(this.getMtmv());
}
super.getScanPlan(queryInfo, cascadesContext);
return scanPlan;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,9 @@ private boolean compareEdgeWithNode(Edge query, Edge view) {
if (query instanceof FilterEdge && view instanceof FilterEdge) {
return compareFilterEdgeWithNode((FilterEdge) query, viewFilterEdgesAfterInferring.get(view.getIndex()));
} else if (query instanceof JoinEdge && view instanceof JoinEdge) {
return compareJoinEdgeWithNode((JoinEdge) query, viewJoinEdgesAfterInferring.get(view.getIndex()));
// compare original or inferred join edge
return compareJoinEdgeWithNode((JoinEdge) query, viewJoinEdgesAfterInferring.get(view.getIndex()))
|| compareJoinEdgeWithNode((JoinEdge) query, (JoinEdge) view);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,33 +105,20 @@ public abstract class MaterializationContext {
/**
* MaterializationContext, this contains necessary info for query rewriting by materialization
*/
public MaterializationContext(Plan plan, Plan originalPlan, Plan scanPlan,
public MaterializationContext(Plan plan, Plan originalPlan,
CascadesContext cascadesContext, StructInfo structInfo) {
this.plan = plan;
this.originalPlan = originalPlan;
this.scanPlan = scanPlan;

StatementBase parsedStatement = cascadesContext.getStatementContext().getParsedStatement();
this.enableRecordFailureDetail = parsedStatement != null && parsedStatement.isExplain()
&& ExplainLevel.MEMO_PLAN == parsedStatement.getExplainOptions().getExplainLevel();
List<Slot> originalPlanOutput = originalPlan.getOutput();
List<Slot> scanPlanOutput = this.scanPlan.getOutput();
if (originalPlanOutput.size() == scanPlanOutput.size()) {
for (int slotIndex = 0; slotIndex < originalPlanOutput.size(); slotIndex++) {
this.exprToScanExprMapping.put(originalPlanOutput.get(slotIndex), scanPlanOutput.get(slotIndex));
}
}
// Construct materialization struct info, catch exception which may cause planner roll back
this.structInfo = structInfo == null
? constructStructInfo(plan, originalPlan, cascadesContext, new BitSet()).orElseGet(() -> null)
: structInfo;
this.available = this.structInfo != null;
if (available) {
this.planOutputShuttledExpressions = this.structInfo.getPlanOutputShuttledExpressions();
// materialization output expression shuttle, this will be used to expression rewrite
this.shuttledExprToScanExprMapping = ExpressionMapping.generate(
this.planOutputShuttledExpressions,
scanPlanOutput);
}
}

Expand Down Expand Up @@ -176,17 +163,19 @@ public void addMatchedGroup(GroupId groupId, boolean rewriteSuccess) {
* if MaterializationContext is already rewritten successfully, then should generate new scan plan in later
* query rewrite, because one plan may hit the materialized view repeatedly and the materialization scan output
* should be different.
* This method should be called when query rewrite successfully
*/
public void tryReGenerateScanPlan(CascadesContext cascadesContext) {
public void tryGenerateScanPlan(CascadesContext cascadesContext) {
if (!this.isAvailable()) {
return;
}
this.scanPlan = doGenerateScanPlan(cascadesContext);
// materialization output expression shuttle, this will be used to expression rewrite
this.shuttledExprToScanExprMapping = ExpressionMapping.generate(
this.planOutputShuttledExpressions,
this.scanPlan.getOutput());
// Materialization output expression shuttle, this will be used to expression rewrite
List<Slot> scanPlanOutput = this.scanPlan.getOutput();
this.shuttledExprToScanExprMapping = ExpressionMapping.generate(this.planOutputShuttledExpressions,
scanPlanOutput);
// This is used by normalize statistics column expression
Map<Expression, Expression> regeneratedMapping = new HashMap<>();
List<Slot> originalPlanOutput = originalPlan.getOutput();
List<Slot> scanPlanOutput = this.scanPlan.getOutput();
if (originalPlanOutput.size() == scanPlanOutput.size()) {
for (int slotIndex = 0; slotIndex < originalPlanOutput.size(); slotIndex++) {
regeneratedMapping.put(originalPlanOutput.get(slotIndex), scanPlanOutput.get(slotIndex));
Expand All @@ -195,6 +184,17 @@ public void tryReGenerateScanPlan(CascadesContext cascadesContext) {
this.exprToScanExprMapping = regeneratedMapping;
}

/**
* Should clear scan plan after materializationContext is already rewritten successfully,
* Because one plan may hit the materialized view repeatedly and the materialization scan output
* should be different.
*/
public void clearScanPlan(CascadesContext cascadesContext) {
this.scanPlan = null;
this.shuttledExprToScanExprMapping = null;
this.exprToScanExprMapping = null;
}

public void addSlotMappingToCache(RelationMapping relationMapping, SlotMapping slotMapping) {
queryToMaterializationSlotMappingCache.put(relationMapping, slotMapping);
}
Expand Down Expand Up @@ -275,7 +275,11 @@ public Plan getOriginalPlan() {
return originalPlan;
}

public Plan getScanPlan(StructInfo queryStructInfo) {
public Plan getScanPlan(StructInfo queryStructInfo, CascadesContext cascadesContext) {
if (this.scanPlan == null || this.shuttledExprToScanExprMapping == null
|| this.exprToScanExprMapping == null) {
tryGenerateScanPlan(cascadesContext);
}
return scanPlan;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,28 +210,25 @@ private static boolean collectStructInfoFromGraph(HyperGraph hyperGraph,
});
// Collect expression from join condition in hyper graph
for (JoinEdge edge : hyperGraph.getJoinEdges()) {
List<Expression> hashJoinConjuncts = edge.getHashJoinConjuncts();
List<? extends Expression> joinConjunctExpressions = edge.getExpressions();
// shuttle expression in edge for the build of LogicalCompatibilityContext later.
// Record the exprId to expr map in the processing to strut info
// TODO get exprId to expr map when complex project is ready in join dege
hashJoinConjuncts.forEach(conjunctExpr -> {
ExpressionLineageReplacer.ExpressionReplaceContext replaceContext =
new ExpressionLineageReplacer.ExpressionReplaceContext(
Lists.newArrayList(conjunctExpr), ImmutableSet.of(),
ImmutableSet.of(), new BitSet());
topPlan.accept(ExpressionLineageReplacer.INSTANCE, replaceContext);
// Replace expressions by expression map
List<Expression> replacedExpressions = replaceContext.getReplacedExpressions();
ExpressionLineageReplacer.ExpressionReplaceContext replaceContext =
new ExpressionLineageReplacer.ExpressionReplaceContext(
joinConjunctExpressions.stream().map(expr -> (Expression) expr)
.collect(Collectors.toList()),
ImmutableSet.of(), ImmutableSet.of(), new BitSet());
topPlan.accept(ExpressionLineageReplacer.INSTANCE, replaceContext);
// Replace expressions by expression map
List<Expression> replacedExpressions = replaceContext.getReplacedExpressions();
for (int i = 0; i < replacedExpressions.size(); i++) {
putShuttledExpressionsToExpressionsMap(shuttledExpressionsToExpressionsMap,
ExpressionPosition.JOIN_EDGE, replacedExpressions.get(0), conjunctExpr);
// Record this, will be used in top level expression shuttle later, see the method
// ExpressionLineageReplacer#visitGroupPlan
namedExprIdAndExprMapping.putAll(replaceContext.getExprIdExpressionMap());
});
List<Expression> otherJoinConjuncts = edge.getOtherJoinConjuncts();
if (!otherJoinConjuncts.isEmpty()) {
return false;
ExpressionPosition.JOIN_EDGE, replacedExpressions.get(i), joinConjunctExpressions.get(i));
}
// Record this, will be used in top level expression shuttle later, see the method
// ExpressionLineageReplacer#visitGroupPlan
namedExprIdAndExprMapping.putAll(replaceContext.getExprIdExpressionMap());
}
// Collect expression from where in hyper graph
hyperGraph.getFilterEdges().forEach(filterEdge -> {
Expand Down Expand Up @@ -621,9 +618,6 @@ public Boolean visitLogicalJoin(LogicalJoin<? extends Plan, ? extends Plan> join
if (!checkContext.getSupportJoinTypes().contains(join.getJoinType())) {
return false;
}
if (!join.getOtherJoinConjuncts().isEmpty()) {
return false;
}
return visit(join, checkContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.nereids.trees.plans.algebra.Relation;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
Expand Down Expand Up @@ -55,9 +56,7 @@ public class SyncMaterializationContext extends MaterializationContext {
*/
public SyncMaterializationContext(Plan mvPlan, Plan mvOriginalPlan, OlapTable olapTable,
long indexId, String indexName, CascadesContext cascadesContext, Statistics statistics) {
super(mvPlan, mvOriginalPlan,
MaterializedViewUtils.generateMvScanPlan(olapTable, indexId, olapTable.getPartitionIds(),
PreAggStatus.unset(), cascadesContext), cascadesContext, null);
super(mvPlan, mvOriginalPlan, cascadesContext, null);
this.olapTable = olapTable;
this.indexId = indexId;
this.indexName = indexName;
Expand Down Expand Up @@ -100,7 +99,7 @@ String getStringInfo() {
@Override
Optional<Pair<Id, Statistics>> getPlanStatistics(CascadesContext cascadesContext) {
RelationId relationId = null;
Optional<LogicalOlapScan> scanObj = this.getScanPlan(null)
Optional<LogicalOlapScan> scanObj = this.getScanPlan(null, cascadesContext)
.collectFirst(LogicalOlapScan.class::isInstance);
if (scanObj.isPresent()) {
relationId = scanObj.get().getRelationId();
Expand All @@ -109,19 +108,27 @@ Optional<Pair<Id, Statistics>> getPlanStatistics(CascadesContext cascadesContext
}

@Override
public Plan getScanPlan(StructInfo queryStructInfo) {
public Plan getScanPlan(StructInfo queryStructInfo, CascadesContext cascadesContext) {
// Already get lock if sync mv, doesn't need to get lock
super.getScanPlan(queryStructInfo, cascadesContext);
if (queryStructInfo == null) {
return scanPlan;
}
if (queryStructInfo.getRelations().size() == 1
&& queryStructInfo.getRelations().get(0) instanceof LogicalOlapScan
&& !((LogicalOlapScan) queryStructInfo.getRelations().get(0)).getSelectedPartitionIds().isEmpty()) {
List<CatalogRelation> queryStructInfoRelations = queryStructInfo.getRelations();
if (queryStructInfoRelations.size() == 1
&& queryStructInfoRelations.get(0) instanceof LogicalOlapScan
&& !((LogicalOlapScan) queryStructInfoRelations.get(0)).getSelectedPartitionIds().isEmpty()) {
// Partition prune if sync materialized view
return scanPlan.accept(new DefaultPlanRewriter<Void>() {
@Override
public Plan visitLogicalOlapScan(LogicalOlapScan olapScan, Void context) {
if (!queryStructInfoRelations.get(0).getTable().getFullQualifiers().equals(
olapScan.getTable().getFullQualifiers())) {
// Only the same table, we can do partition prue
return olapScan;
}
return olapScan.withSelectedPartitionIds(
((LogicalOlapScan) queryStructInfo.getRelations().get(0)).getSelectedPartitionIds());
((LogicalOlapScan) queryStructInfoRelations.get(0)).getSelectedPartitionIds());
}
}, null);
}
Expand Down
Loading

0 comments on commit eeda1ce

Please sign in to comment.